Spark-sql-programmatically-specifying-schema
プログラムによるスキーマの指定
DataFrameを作成する2番目の方法は、スキーマを構築し、それを既存のRDDに適用できるプログラムインターフェイスを使用することです。 次の3つの手順を使用して、プログラムでDataFrameを作成できます。
- 元のRDDから行のRDDを作成します。
- 手順1で作成したRDDの行の構造に一致するStructTypeで表されるスキーマを作成します。
- SQLContextが提供するcreateDataFrameメソッドを介してスキーマを行のRDDに適用します。
例
*employee.txt* という名前のテキストファイルの従業員レコードの例を考えてみましょう。 テキストファイルからデータを読み取って、DataFrameを直接使用してスキーマを作成します。
指定されたデータ-スパークシェルポイントが実行されている現在のそれぞれのディレクトリにある employee.txt という名前のファイルの次のデータを確認します。
1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23
以下の手順に従って、スキーマをプログラムで生成します。
Spark Shellを開く
次の例を使用して、Sparkシェルを起動します。
$ spark-shell
SQLContextオブジェクトを作成する
次のコマンドを使用してSQLContextを生成します。 ここで、 sc はSparkContextオブジェクトを意味します。
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
テキストファイルから入力を読み取る
次のコマンドを使用して employee.txt という名前のテキストファイルからデータを読み取り、RDD DataFrameを作成します。
scala> val employee = sc.textFile("employee.txt")
文字列形式でエンコードされたスキーマを作成する
次のコマンドを使用して、エンコードされたスキーマを文字列形式で作成します。 つまり、テーブルのフィールド構造を想定し、区切り文字を使用してフィールド名を渡します。
scala> val schemaString = "id name age"
出力
schemaString: String = id name age
各APIのインポート
次のコマンドを使用して、行機能とSQLデータ型をインポートします。
scala> import org.apache.spark.sql.Row;
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};
スキーマを生成する
次のコマンドは、 schemaString 変数を読み取ってスキーマを生成するために使用されます。 これは、デフォルトで文字列全体を区切り文字としてスペースで分割して各フィールドを読み取り、各フィールドタイプを文字列タイプにする必要があることを意味します。
scala> val schema = StructType(schemaString.split(" ").map(fieldName ⇒ StructField(fieldName, StringType, true)))
テキストファイルからデータを読み取るための変換を適用する
次のコマンドを使用して、RDD(従業員)を行に変換します。 つまり、ここではRDDデータを読み取り、rowRDDに格納するためのロジックを指定しているということです。 ここでは、2つのマップ関数を使用しています。1つはレコード文字列を分割するための区切り文字(。* map(_。split( "、")))で、2つ目のマップ関数はフィールドインデックス値(。 map(e⇒Row(e(0).trim.toInt、e(1)、e(2).trim.toInt))*)。
scala> val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))
スキーマに基づいて行データにRowRDDを適用する
*rowRDD* データと *schema* (SCHEMA)変数を使用してDataFrameを作成するには、次のステートメントを使用します。
scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)
出力
employeeDF: org.apache.spark.sql.DataFrame = [id: string, name: string, age: string]
DataFrameデータをテーブルに保存する
次のコマンドを使用して、DataFrameを employee という名前のテーブルに保存します。
scala> employeeDF.registerTempTable("employee")
*employee* テーブルの準備ができました。 メソッド* SQLContext.sql()*を使用して、いくつかのSQLクエリをテーブルに渡します。
DataFrameでクエリを選択
*employee* テーブルからすべてのレコードを選択するには、次のステートメントを使用します。 ここでは、すべてのレコードデータをキャプチャするために変数 *allrecords* を使用します。 それらのレコードを表示するには、* show()*メソッドを呼び出します。
scala> val allrecords = sqlContext.sql("SELECT * FROM employee")
*allrecords* DataFrameの結果データを表示するには、次のコマンドを使用します。
scala> allrecords.show()
出力
+------+--------+----+
| id | name |age |
+------+--------+----+
| 1201 | satish | 25 |
| 1202 | krishna| 28 |
| 1203 | amith | 39 |
| 1204 | javed | 23 |
| 1205 | prudvi | 23 |
+------+--------+----+
メソッド sqlContext.sql を使用すると、実行時まで列とその型が不明な場合にDataFramesを構築できます。 これで、さまざまなSQLクエリを実行できます。