Spark-sql-dataframes

提供:Dev Guides
移動先:案内検索

Spark SQL-データフレーム

DataFrameは、名前付き列に編成されたデータの分散コレクションです。 概念的には、優れた最適化手法を備えたリレーショナルテーブルと同等です。

DataFrameは、Hiveテーブル、構造化データファイル、外部データベース、既存のRDDなど、さまざまなソースの配列から構築できます。 このAPIは、Rプログラミングの DataFrame およびPythonの Pandas からインスピレーションを得た、現代のビッグデータおよびデータサイエンスアプリケーション向けに設計されました。

DataFrameの機能

ここにDataFrameのいくつかの特徴的な機能のセットがあります-

  • 単一ノードクラスターから大規模クラスターで、キロバイトからペタバイトのサイズのデータ​​を処理する機能。
  • さまざまなデータ形式(Avro、csv、エラスティック検索、Cassandra)およびストレージシステム(HDFS、HIVEテーブル、mysqlなど)をサポートします。
  • Spark SQL Catalystオプティマイザー(ツリー変換フレームワーク)による最先端の最適化とコード生成。
  • Spark-Coreを介して、すべてのビッグデータツールおよびフレームワークと簡単に統合できます。
  • Python、Java、Scala、およびRプログラミング用のAPIを提供します。

SQLContext

SQLContextはクラスであり、Spark SQLの機能を初期化するために使用されます。 SQLContextクラスオブジェクトを初期化するには、SparkContextクラスオブジェクト(sc)が必要です。

次のコマンドは、spark-shellを介してSparkContextを初期化するために使用されます。

$ spark-shell

デフォルトでは、SparkContextオブジェクトは、spark-shellの起動時に sc という名前で初期化されます。

次のコマンドを使用して、SQLContextを作成します。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

*employee.json* という名前のJSONファイルの従業員レコードの例を考えてみましょう。 次のコマンドを使用して、DataFrame(df)を作成し、次の内容の *employee.json* という名前のJSONドキュメントを読み取ります。
*employee.json* -このファイルを、現在の *scala>* ポインターがあるディレクトリに配置します。
{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

データフレーム操作

DataFrameは、構造化データ操作のためのドメイン固有の言語を提供します。 ここでは、DataFrameを使用した構造化データ処理の基本的な例をいくつか示します。

以下の手順に従って、DataFrame操作を実行します-

JSONドキュメントを読む

最初に、JSONドキュメントを読む必要があります。 これに基づいて、(dfs)という名前のDataFrameを生成します。

次のコマンドを使用して、 employee.json という名前のJSONドキュメントを読み取ります。 データは、id、name、およびageのフィールドを持つテーブルとして表示されます。

scala> val dfs = sqlContext.read.json("employee.json")

出力-フィールド名は employee.json から自動的に取得されます。

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

データを表示する

DataFrameのデータを表示する場合は、次のコマンドを使用します。

scala> dfs.show()

出力-従業員データを表形式で表示できます。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

printSchemaメソッドを使用する

DataFrameの構造(スキーマ)を表示するには、次のコマンドを使用します。

scala> dfs.printSchema()

出力

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Selectメソッドを使用

次のコマンドを使用して、DataFrameの3つの列から name -columnを取得します。

scala> dfs.select("name").show()

出力-*名前*列の値を見ることができます。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

年齢フィルターを使用する

次のコマンドを使用して、年齢が23歳(23歳以上)の従業員を見つけます。

scala> dfs.filter(dfs("age") > 23).show()

出力

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

groupByメソッドを使用する

同じ年齢の従業員の数をカウントするには、次のコマンドを使用します。

scala> dfs.groupBy("age").count().show()

出力-2人の従業員は23歳です。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

プログラムによるSQLクエリの実行

SQLContextを使用すると、アプリケーションはSQL関数の実行中にプログラムでSQLクエリを実行し、結果をDataFrameとして返します。

一般に、バックグラウンドで、SparkSQLは既存のRDDをDataFrameに変換するための2つの異なる方法をサポートします-

Sr. No Methods & Description
1

Inferring the Schema using Reflection

このメソッドは、リフレクションを使用して、特定のタイプのオブジェクトを含むRDDのスキーマを生成します。

2

Programmatically Specifying the Schema

DataFrameを作成する2番目の方法は、スキーマを構築し、それを既存のRDDに適用できるプログラムインターフェイスを使用することです。