Spark-sql-dataframes
Spark SQL-データフレーム
DataFrameは、名前付き列に編成されたデータの分散コレクションです。 概念的には、優れた最適化手法を備えたリレーショナルテーブルと同等です。
DataFrameは、Hiveテーブル、構造化データファイル、外部データベース、既存のRDDなど、さまざまなソースの配列から構築できます。 このAPIは、Rプログラミングの DataFrame およびPythonの Pandas からインスピレーションを得た、現代のビッグデータおよびデータサイエンスアプリケーション向けに設計されました。
DataFrameの機能
ここにDataFrameのいくつかの特徴的な機能のセットがあります-
- 単一ノードクラスターから大規模クラスターで、キロバイトからペタバイトのサイズのデータを処理する機能。
- さまざまなデータ形式(Avro、csv、エラスティック検索、Cassandra)およびストレージシステム(HDFS、HIVEテーブル、mysqlなど)をサポートします。
- Spark SQL Catalystオプティマイザー(ツリー変換フレームワーク)による最先端の最適化とコード生成。
- Spark-Coreを介して、すべてのビッグデータツールおよびフレームワークと簡単に統合できます。
- Python、Java、Scala、およびRプログラミング用のAPIを提供します。
SQLContext
SQLContextはクラスであり、Spark SQLの機能を初期化するために使用されます。 SQLContextクラスオブジェクトを初期化するには、SparkContextクラスオブジェクト(sc)が必要です。
次のコマンドは、spark-shellを介してSparkContextを初期化するために使用されます。
$ spark-shell
デフォルトでは、SparkContextオブジェクトは、spark-shellの起動時に sc という名前で初期化されます。
次のコマンドを使用して、SQLContextを作成します。
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
例
*employee.json* という名前のJSONファイルの従業員レコードの例を考えてみましょう。 次のコマンドを使用して、DataFrame(df)を作成し、次の内容の *employee.json* という名前のJSONドキュメントを読み取ります。
*employee.json* -このファイルを、現在の *scala>* ポインターがあるディレクトリに配置します。
{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}
データフレーム操作
DataFrameは、構造化データ操作のためのドメイン固有の言語を提供します。 ここでは、DataFrameを使用した構造化データ処理の基本的な例をいくつか示します。
以下の手順に従って、DataFrame操作を実行します-
JSONドキュメントを読む
最初に、JSONドキュメントを読む必要があります。 これに基づいて、(dfs)という名前のDataFrameを生成します。
次のコマンドを使用して、 employee.json という名前のJSONドキュメントを読み取ります。 データは、id、name、およびageのフィールドを持つテーブルとして表示されます。
scala> val dfs = sqlContext.read.json("employee.json")
出力-フィールド名は employee.json から自動的に取得されます。
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
データを表示する
DataFrameのデータを表示する場合は、次のコマンドを使用します。
scala> dfs.show()
出力-従業員データを表形式で表示できます。
<console>:22, took 0.052610 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
| 23 | 1204 | javed |
| 23 | 1205 | prudvi |
+----+------+--------+
printSchemaメソッドを使用する
DataFrameの構造(スキーマ)を表示するには、次のコマンドを使用します。
scala> dfs.printSchema()
出力
root
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
Selectメソッドを使用
次のコマンドを使用して、DataFrameの3つの列から name -columnを取得します。
scala> dfs.select("name").show()
出力-*名前*列の値を見ることができます。
<console>:22, took 0.044023 s
+--------+
| name |
+--------+
| satish |
| krishna|
| amith |
| javed |
| prudvi |
+--------+
年齢フィルターを使用する
次のコマンドを使用して、年齢が23歳(23歳以上)の従業員を見つけます。
scala> dfs.filter(dfs("age") > 23).show()
出力
<console>:22, took 0.078670 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
+----+------+--------+
groupByメソッドを使用する
同じ年齢の従業員の数をカウントするには、次のコマンドを使用します。
scala> dfs.groupBy("age").count().show()
出力-2人の従業員は23歳です。
<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 | 2 |
| 25 | 1 |
| 28 | 1 |
| 39 | 1 |
+----+-----+
プログラムによるSQLクエリの実行
SQLContextを使用すると、アプリケーションはSQL関数の実行中にプログラムでSQLクエリを実行し、結果をDataFrameとして返します。
一般に、バックグラウンドで、SparkSQLは既存のRDDをDataFrameに変換するための2つの異なる方法をサポートします-
Sr. No | Methods & Description |
---|---|
1 |
Inferring the Schema using Reflection このメソッドは、リフレクションを使用して、特定のタイプのオブジェクトを含むRDDのスキーマを生成します。 |
2 |
Programmatically Specifying the Schema DataFrameを作成する2番目の方法は、スキーマを構築し、それを既存のRDDに適用できるプログラムインターフェイスを使用することです。 |