Spark-sql-inferring-schema-using-reflection

提供:Dev Guides
移動先:案内検索

Reflectionを使用したスキーマの推測

このメソッドは、リフレクションを使用して、特定のタイプのオブジェクトを含むRDDのスキーマを生成します。 Spark SQLのScalaインターフェイスは、ケースクラスを含むRDDのDataFrameへの自動変換をサポートしています。 * caseクラス*は、テーブルのスキーマを定義します。 ケースクラスへの引数の名前はリフレクションを使用して読み取られ、列の名前になります。

ケースクラスは、ネストしたり、シーケンスや配列などの複雑なタイプを含むこともできます。 このRDDは、暗黙的にDataFrameに変換してから、テーブルとして登録できます。 テーブルは、後続のSQLステートメントで使用できます。

*employee.txt* という名前のテキストファイルの従業員レコードの例を考えてみましょう。 テキストファイルからデータを読み取ってRDDを作成し、デフォルトのSQL関数を使用してそれをDataFrameに変換します。

指定されたデータ-スパークシェルポイントが実行されている現在のそれぞれのディレクトリに配置された employee.txt という名前のファイルの次のデータを調べます。

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

次の例は、リフレクションを使用してスキーマを生成する方法を説明しています。

Sparkシェルを起動します

次のコマンドを使用して、Spark Shellを起動します。

$ spark-shell

SQLContextを作成する

次のコマンドを使用してSQLContextを生成します。 ここで、 sc はSparkContextオブジェクトを意味します。

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

SQL関数のインポート

次のコマンドを使用して、RDDをDataFrameに暗黙的に変換するために使用されるすべてのSQL関数をインポートします。

scala> import sqlContext.implicts._

ケースクラスの作成

次に、ケースクラスを使用して従業員レコードデータのスキーマを定義する必要があります。 次のコマンドは、指定されたデータ(id、name、age)に基づいてケースクラスを宣言するために使用されます。

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

RDDを作成して変換を適用する

次のコマンドを使用して、マップ関数を使用して employee.txt からデータを読み取り、それをDataFrameに変換することにより、 empl という名前のRDDを生成します。

ここでは、2つのマップ関数が定義されています。 1つはテキストレコードをフィールド(* .map(_。split(“、”)))に分割し、2つ目のマップ関数は個々のフィールド(id、name、age)を1つのケースクラスオブジェクト( .map (e(0).trim.toInt、e(1)、e(2).trim.toInt)*)。

最後に、スキーマを持つケースクラスオブジェクトをDataFrameに変換するために* toDF()*メソッドが使用されます。

scala> val empl=sc.textFile("employee.txt")
.map(_.split(","))
.map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt))
.toDF()

出力

empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]

DataFrameデータをテーブルに保存する

次のコマンドを使用して、DataFrameデータを employee という名前のテーブルに保存します。 このコマンドの後、すべてのタイプのSQLステートメントを適用できます。

scala> empl.registerTempTable("employee")

従業員テーブルの準備ができました。 * SQLContext.sql()*メソッドを使用して、テーブルでいくつかのSQLクエリを渡します。

DataFrameでクエリを選択

次のコマンドを使用して、 employee テーブルからすべてのレコードを選択します。 ここでは、すべてのレコードデータをキャプチャするために変数 allrecords を使用します。 それらのレコードを表示するには、* show()*メソッドを呼び出します。

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")
*allrecords* DataFrameの結果データを表示するには、次のコマンドを使用します。
scala> allrecords.show()

出力

+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1203 | amith   | 39 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

DataFrameのWhere句SQLクエリ

テーブルの where ステートメントを適用するには、次のコマンドを使用します。 ここで、変数 agefilter は、年齢が20〜35歳の従業員のレコードを格納します。

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")
*agefilter* DataFrameの結果データを表示するには、次のコマンドを使用します。
scala> agefilter.show()

出力

<console>:25, took 0.112757 s
+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

前の2つのクエリは、テーブルDataFrame全体に対して渡されました。 次に、結果のDataFrameに Transformations を適用して、データを取得してみましょう。

列インデックスを使用してagefilter DataFrameからID値を取得します

次のステートメントは、フィールドインデックスを使用して、 agefilter RDDの結果からID値を取得するために使用されます。

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

出力

<console>:25, took 0.093844 s
ID: 1201
ID: 1202
ID: 1204
ID: 1205

このリフレクションベースのアプローチは、より簡潔なコードにつながり、Sparkアプリケーションの作成中にスキーマを既に知っている場合にうまく機能します。