Spark-sql-quick-guide

提供:Dev Guides
移動先:案内検索

スパーク-はじめに

業界では、Hadoopを広範囲に使用してデータセットを分析しています。 その理由は、Hadoopフレームワークは単純なプログラミングモデル(MapReduce)に基づいており、スケーラブルで柔軟性があり、フォールトトレラントで費用効果の高いコンピューティングソリューションを可能にするためです。 ここでの主な関心事は、クエリ間の待機時間とプログラム実行の待機時間に関して、大きなデータセットの処理速度を維持することです。

Sparkは、Hadoop計算コンピューティングソフトウェアプロセスを高速化するためにApache Software Foundationによって導入されました。

一般的な考え方に反して、 SparkはHadoop の修正バージョンではなく、独自のクラスター管理があるため、実際にはHadoopに依存していません。 Hadoopは、Sparkを実装する方法の1つにすぎません。

Sparkは2つの方法でHadoopを使用します。1つは storage で、もう1つは processing です。 Sparkには独自のクラスター管理計算があるため、Hadoopはストレージ目的にのみ使用されます。

Apache Spark

Apache Sparkは、高速計算用に設計された超高速クラスターコンピューティングテクノロジーです。 Hadoop MapReduceに基づいており、MapReduceモデルを拡張して、インタラクティブクエリやストリーム処理など、より多くの種類の計算に効率的に使用します。 Sparkの主な機能は、*インメモリクラスターコンピューティング*であり、アプリケーションの処理速度を向上させます。

Sparkは、バッチアプリケーション、反復アルゴリズム、インタラクティブクエリ、ストリーミングなどの幅広いワークロードをカバーするように設計されています。 それぞれのシステムでこれらすべてのワークロードをサポートするだけでなく、個別のツールを維持する管理負担を軽減します。

Apache Sparkの進化

Sparkは、2009年にUC BerkeleyのAMPLabでMatei Zahariaによって開発されたHadoopのサブプロジェクトの1つです。 2010年にBSDライセンスの下でオープンソース化されました。 2013年にApacheソフトウェア財団に寄付され、現在、Apache Sparkは2014年2月からトップレベルのApacheプロジェクトになりました。

Apache Sparkの機能

Apache Sparkには次の機能があります。

  • 速度-Sparkは、Hadoopクラスターでアプリケーションを実行し、メモリで最大100倍、ディスクで実行すると10倍高速になります。 これは、ディスクへの読み取り/書き込み操作の数を減らすことで可能です。 中間処理データをメモリに保存します。
  • 複数の言語をサポート-Sparkは、Java、Scala、またはPythonの組み込みAPIを提供します。 したがって、異なる言語でアプリケーションを作成できます。 Sparkには、インタラクティブなクエリのための80の高レベル演算子が用意されています。
  • 高度な分析-Sparkは「マップ」と「削減」のみをサポートしていません。 また、SQLクエリ、ストリーミングデータ、機械学習(ML)、グラフアルゴリズムもサポートしています。

Hadoopに組み込まれたSpark

次の図は、Hadoopコンポーネントを使用してSparkを構築する3つの方法を示しています。

Spark Built on Hadoop

以下に説明するように、Sparkの展開には3つの方法があります。

  • スタンドアロン-Sparkスタンドアロン展開とは、SparkがHDFS(Hadoop Distributed File System)の上にある場所を占有し、HDFSに明示的にスペースが割り当てられることを意味します。 ここでは、SparkとMapReduceが並行して実行され、クラスター上のすべてのSparkジョブをカバーします。
  • Hadoop Yarn -Hadoop Yarnの展開とは、事前インストールやルートアクセスを必要とせずに、単純にSparkがYarnで実行されることを意味します。 SparkをHadoopエコシステムまたはHadoopスタックに統合するのに役立ちます。 他のコンポーネントをスタック上で実行できます。
  • * MapReduceのスパーク(SIMR)*-MapReduceのスパークは、スタンドアロン展開に加えてスパークジョブを起動するために使用されます。 SIMRを使用すると、ユーザーは管理アクセスなしでSparkを起動し、そのシェルを使用できます。

Sparkのコンポーネント

次の図は、Sparkのさまざまなコンポーネントを示しています。

Sparkのコンポーネント

Apache Sparkコア

Spark Coreは、他のすべての機能が構築されているsparkプラットフォームの基礎となる一般的な実行エンジンです。 インメモリコンピューティングと外部ストレージシステムの参照データセットを提供します。

Spark SQL

Spark SQLは、Spark Coreの上にあるコンポーネントであり、SchemaRDDと呼ばれる新しいデータ抽象化を導入し、構造化データおよび半構造化データのサポートを提供します。

スパークストリーミング

Spark Streamingは、Spark Coreの高速スケジューリング機能を活用して、ストリーミング分析を実行します。 ミニバッチでデータを取り込み、それらのミニバッチでRDD(Resilient Distributed Datasets)変換を実行します。

MLlib(機械学習ライブラリ)

MLlibは、分散メモリベースのSparkアーキテクチャのため、Sparkの上の分散型機械学習フレームワークです。 ベンチマークによれば、MLlib開発者は代替最小二乗(ALS)実装に対して行います。 Spark MLlibは、 Apache Mahout のHadoopディスクベースバージョン(MahoutがSparkインターフェースを取得する前)の9倍の速度です。

GraphX

GraphXは、Sparkの上にある分散グラフ処理フレームワークです。 Pregel抽象化APIを使用してユーザー定義グラフをモデル化できるグラフ計算を表現するためのAPIを提供します。 また、この抽象化のために最適化されたランタイムも提供します。

スパーク-RDD

弾力性のある分散データセット

弾力性のある分散データセット(RDD)は、Sparkの基本的なデータ構造です。 オブジェクトの不変の分散コレクションです。 RDDの各データセットは論理パーティションに分割され、クラスターの異なるノードで計算できます。 RDDには、ユーザー定義クラスを含む、あらゆるタイプのPython、Java、またはScalaオブジェクトを含めることができます。

正式には、RDDは読み取り専用のパーティション化されたレコードのコレクションです。 RDDは、安定したストレージ上のデータまたは他のRDDのいずれかの決定論的操作によって作成できます。 RDDは、並列で操作できる要素のフォールトトレラントなコレクションです。

RDDを作成するには2つの方法があります-ドライバープログラムの既存のコレクションを並列化する*、または共有ファイルシステム、HDFS、HBase、またはHadoop入力を提供するデータソースなどの外部ストレージシステムで*データセットを参照する*フォーマット。

SparkはRDDの概念を利用して、より高速で効率的なMapReduce操作を実現します。 まず、MapReduceの操作がどのように行われ、なぜそれほど効率的でないのかを説明しましょう。

MapReduceでのデータ共有が遅い

MapReduceは、クラスター上の並列分散アルゴリズムを使用して大規模なデータセットを処理および生成するために広く採用されています。 ユーザーは、作業の分散とフォールトトレランスを心配することなく、一連の高レベル演算子を使用して並列計算を作成できます。

残念ながら、ほとんどの現在のフレームワークでは、計算間(例:2つのMapReduceジョブ間)でデータを再利用する唯一の方法は、外部の安定したストレージシステム(例:HDFS)に書き込むことです。 このフレームワークは、クラスターの計算リソースにアクセスするための多数の抽象化を提供しますが、ユーザーはさらに多くのものを求めています。

反復*および*インタラクティブ*アプリケーションの両方で、並列ジョブ間でより高速なデータ共有が必要です。 *Replicationserialization 、および disk IO により、MapReduceでのデータ共有が遅くなります。 ほとんどのHadoopアプリケーションであるストレージシステムに関しては、HDFSの読み取り/書き込み操作に90%以上の時間を費やしています。

MapReduceの反復操作

多段階アプリケーションでの複数の計算で中間結果を再利用します。 次の図は、MapReduceで反復操作を実行しながら、現在のフレームワークがどのように機能するかを説明しています。 これにより、データ複製、ディスクI/O、およびシリアル化のためにかなりのオーバーヘッドが発生し、システムが遅くなります。

MapReduceの反復操作

MapReduceのインタラクティブな操作

ユーザーは、データの同じサブセットでアドホッククエリを実行します。 各クエリは、安定したストレージでディスクI/Oを実行します。これにより、アプリケーションの実行時間が支配されます。

次の図は、MapReduceでインタラクティブクエリを実行しているときに、現在のフレームワークがどのように機能するかを説明しています。

MapReduceのインタラクティブな操作

Spark RDDを使用したデータ共有

*Replication* 、 *serialization* 、および *disk IO* により、MapReduceでのデータ共有が遅くなります。 ほとんどのHadoopアプリケーションでは、90%以上の時間をHDFS読み取り/書き込み操作に費やしています。

この問題を認識して、研究者はApache Sparkと呼ばれる特別なフレームワークを開発しました。 スパークの重要なアイデアは、* R 耐障害性 D 配布 D *アセット(RDD)です。インメモリ処理の計算をサポートします。 つまり、メモリの状態をジョブ全体のオブジェクトとして保存し、オブジェクトはそれらのジョブ間で共有可能です。 メモリ内のデータ共有は、ネットワークおよびディスクよりも10〜100倍高速です。

ここで、Spark RDDで反復的かつインタラクティブな操作がどのように行われるかを調べてみましょう。

Spark RDDの反復操作

以下の図は、Spark RDDの反復操作を示しています。 安定したストレージ(ディスク)ではなく、分散メモリに中間結果を保存し、システムを高速化します。

注意-分散メモリ(RAM)が中間結果(JOBの状態)を保存するのに十分でない場合、それらの結果はディスクに保存されます。

Spark RDDの反復操作

Spark RDDでのインタラクティブな操作

この図は、Spark RDDでのインタラクティブな操作を示しています。 同じデータセットに対して異なるクエリを繰り返し実行すると、この特定のデータをメモリに保持して実行時間を短縮できます。

Spark RDDのインタラクティブ操作

デフォルトでは、変換された各RDDは、アクションを実行するたびに再計算されます。 ただし、RDDをメモリに*永続化*することもできます。この場合、Sparkは次回クエリを実行するときに、より高速なアクセスのために要素をクラスタ上に保持します。 また、ディスク上のRDDの永続化、または複数のノード間での複製のサポートもあります。

Spark-インストール

SparkはHadoopのサブプロジェクトです。 したがって、SparkをLinuxベースのシステムにインストールすることをお勧めします。 次の手順は、Apache Sparkのインストール方法を示しています。

ステップ1:Javaインストールの検証

Javaのインストールは、Sparkのインストールに必須の要素の1つです。 次のコマンドを試して、JAVAバージョンを確認してください。

$java -version

Javaがすでにシステムにインストールされている場合、次の応答が表示されます-

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

システムにJavaがインストールされていない場合は、次のステップに進む前にJavaをインストールします。

ステップ2:Scalaインストールの検証

Sparkを実装するにはScala言語を使用する必要があります。 次のコマンドを使用してScalaのインストールを確認しましょう。

$scala -version

Scalaがシステムに既にインストールされている場合、次の応答が表示されます-

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

システムにScalaがインストールされていない場合は、Scalaのインストールの次の手順に進みます。

ステップ3:Scalaのダウンロード

次のリンクhttp://www.scala-lang.org/download[Download Scala]にアクセスして、Scalaの最新バージョンをダウンロードしてください。 このチュートリアルでは、scala-2.11.6バージョンを使用しています。 ダウンロード後、ダウンロードフォルダーにScala tarファイルがあります。

ステップ4:Scalaのインストール

Scalaをインストールするには、以下の手順に従ってください。

Scala tarファイルを抽出します

Scala tarファイルを抽出するには、次のコマンドを入力します。

$ tar xvf scala-2.11.6.tgz

Scalaソフトウェアファイルを移動する

Scalaソフトウェアファイルをそれぞれのディレクトリ*(/usr/local/scala)*に移動するには、次のコマンドを使用します。

$ su –
Password:
# cd/home/Hadoop/Downloads/
# mv scala-2.11.6/usr/local/scala
# exit

ScalaのPATHを設定

ScalaのPATHを設定するには、次のコマンドを使用します。

$ export PATH = $PATH:/usr/local/scala/bin

Scalaインストールの検証

インストール後、確認することをお勧めします。 Scalaのインストールを確認するには、次のコマンドを使用します。

$scala -version

Scalaがシステムに既にインストールされている場合、次の応答が表示されます-

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

ステップ5:Apache Sparkのダウンロード

次のリンクhttps://spark.apache.org/downloadsl[Download Spark]にアクセスして、Sparkの最新バージョンをダウンロードします。 このチュートリアルでは、 spark-1.3.1-bin-hadoop2.6 バージョンを使用しています。 ダウンロード後、Spark tarファイルはダウンロードフォルダーにあります。

ステップ6:Sparkのインストール

Sparkをインストールするには、以下の手順に従ってください。

Spark tarの抽出

次のコマンドは、spark tarファイルを抽出します。

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Sparkソフトウェアファイルの移動

Sparkソフトウェアファイルをそれぞれのディレクトリ*(/usr/local/spark)*に移動するための次のコマンド。

$ su –
Password:
# cd/home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6/usr/local/spark
# exit

Sparkの環境をセットアップする

〜* /。bashrc *ファイルに次の行を追加します。 スパークソフトウェアファイルがある場所をPATH変数に追加することを意味します。

export PATH = $PATH:/usr/local/spark/bin

〜/.bashrcファイルを入手するには、次のコマンドを使用します。

$ source ~/.bashrc

ステップ7:Sparkのインストールの検証

Sparkシェルを開くための次のコマンドを記述します。

$spark-shell

sparkが正常にインストールされると、次の出力が見つかります。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
    ____             __
  /__/__ ___ _____//__
   _\ \/_ \/_ `/__/'_/
  /___/.__/\_,_/_//_/\_\ version 1.4.0
     /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Spark SQL-はじめに

Sparkは、Spark SQLと呼ばれる構造化データ処理用のプログラミングモジュールを導入しています。 DataFrameと呼ばれるプログラミングの抽象化を提供し、分散SQLクエリエンジンとして機能できます。

Spark SQLの機能

以下は、Spark SQLの機能です-

  • 統合-SQLクエリとSparkプログラムをシームレスに組み合わせます。 Spark SQLでは、Sparkの分散データセット(RDD)として、Python、Scala、およびJavaの統合APIを使用して、構造化データを照会できます。 この緊密な統合により、複雑な分析アルゴリズムとともにSQLクエリを簡単に実行できます。
  • Unified Data Access -さまざまなソースからデータをロードおよびクエリします。 Schema-RDDは、Apache Hiveテーブル、寄木細工ファイル、JSONファイルなどの構造化データを効率的に操作するための単一のインターフェイスを提供します。
  • * Hiveの互換性*-既存のウェアハウスで変更されていないHiveクエリを実行します。 Spark SQLはHiveフロントエンドとMetaStoreを再利用し、既存のHiveデータ、クエリ、UDFとの完全な互換性を提供します。 Hiveと一緒にインストールするだけです。
  • 標準接続-JDBCまたはODBC経由で接続します。 Spark SQLには、業界標準のJDBCおよびODBC接続を備えたサーバーモードが含まれています。
  • スケーラビリティ-対話型クエリと長いクエリの両方に同じエンジンを使用します。 Spark SQLはRDDモデルを利用してクエリ中のフォールトトレランスをサポートし、大規模なジョブにも対応できるようにします。 履歴データに別のエンジンを使用する心配はありません。

Spark SQLアーキテクチャ

次の図は、Spark SQLのアーキテクチャを説明しています-

Spark SQLアーキテクチャ

このアーキテクチャには、言語API、スキーマRDD、データソースの3つのレイヤーが含まれています。

  • Language API -Sparkはさまざまな言語およびSpark SQLと互換性があります。 また、これらの言語API(python、scala、java、HiveQL)によってサポートされています。
  • *スキーマRDD *-SparkコアはRDDと呼ばれる特別なデータ構造で設計されています。 一般に、Spark SQLはスキーマ、テーブル、およびレコードで機能します。 したがって、スキーマRDDを一時テーブルとして使用できます。 このスキーマRDDをデータフレームと呼ぶことができます。
  • データソース-通常、spark-coreのデータソースはテキストファイル、Avroファイルなどです。 ただし、Spark SQLのデータソースは異なります。 それらは、Parquetファイル、JSONドキュメント、HIVEテーブル、およびCassandraデータベースです。

これらの詳細については、後続の章で説明します。

Spark SQL-データフレーム

DataFrameは、名前付き列に編成されたデータの分散コレクションです。 概念的には、優れた最適化手法を備えたリレーショナルテーブルと同等です。

DataFrameは、Hiveテーブル、構造化データファイル、外部データベース、既存のRDDなど、さまざまなソースの配列から構築できます。 このAPIは、Rプログラミングの DataFrame およびPythonの Pandas からインスピレーションを得た、現代のビッグデータおよびデータサイエンスアプリケーション向けに設計されました。

DataFrameの機能

ここにDataFrameのいくつかの特徴的な機能のセットがあります-

  • 単一ノードクラスターから大規模クラスターで、キロバイトからペタバイトのサイズのデータ​​を処理する機能。
  • さまざまなデータ形式(Avro、csv、エラスティック検索、Cassandra)およびストレージシステム(HDFS、HIVEテーブル、mysqlなど)をサポートします。
  • Spark SQL Catalystオプティマイザー(ツリー変換フレームワーク)による最先端の最適化とコード生成。
  • Spark-Coreを介して、すべてのビッグデータツールおよびフレームワークと簡単に統合できます。
  • Python、Java、Scala、およびRプログラミング用のAPIを提供します。

SQLContext

SQLContextはクラスであり、Spark SQLの機能を初期化するために使用されます。 SQLContextクラスオブジェクトを初期化するには、SparkContextクラスオブジェクト(sc)が必要です。

次のコマンドは、spark-shellを介してSparkContextを初期化するために使用されます。

$ spark-shell

デフォルトでは、SparkContextオブジェクトは、spark-shellの起動時に sc という名前で初期化されます。

次のコマンドを使用して、SQLContextを作成します。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

*employee.json* という名前のJSONファイルの従業員レコードの例を考えてみましょう。 次のコマンドを使用して、DataFrame(df)を作成し、次の内容の *employee.json* という名前のJSONドキュメントを読み取ります。
*employee.json* -このファイルを、現在の *scala>* ポインターがあるディレクトリに配置します。
{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

データフレーム操作

DataFrameは、構造化データ操作のためのドメイン固有の言語を提供します。 ここでは、DataFrameを使用した構造化データ処理の基本的な例をいくつか示します。

以下の手順に従って、DataFrame操作を実行します-

JSONドキュメントを読む

最初に、JSONドキュメントを読む必要があります。 これに基づいて、(dfs)という名前のDataFrameを生成します。

次のコマンドを使用して、 employee.json という名前のJSONドキュメントを読み取ります。 データは、id、name、およびageのフィールドを持つテーブルとして表示されます。

scala> val dfs = sqlContext.read.json("employee.json")

出力-フィールド名は employee.json から自動的に取得されます。

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

データを表示する

DataFrameのデータを表示する場合は、次のコマンドを使用します。

scala> dfs.show()

出力-従業員データを表形式で表示できます。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

printSchemaメソッドを使用する

DataFrameの構造(スキーマ)を表示するには、次のコマンドを使用します。

scala> dfs.printSchema()

出力

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Selectメソッドを使用

次のコマンドを使用して、DataFrameの3つの列から name -columnを取得します。

scala> dfs.select("name").show()

出力-*名前*列の値を見ることができます。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

年齢フィルターを使用する

次のコマンドを使用して、年齢が23歳(23歳以上)の従業員を見つけます。

scala> dfs.filter(dfs("age") > 23).show()

出力

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

groupByメソッドを使用する

同じ年齢の従業員の数をカウントするには、次のコマンドを使用します。

scala> dfs.groupBy("age").count().show()

出力-2人の従業員は23歳です。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

プログラムによるSQLクエリの実行

SQLContextを使用すると、アプリケーションはSQL関数の実行中にプログラムでSQLクエリを実行し、結果をDataFrameとして返します。

一般に、バックグラウンドで、SparkSQLは既存のRDDをDataFrameに変換するための2つの異なる方法をサポートします-

Sr. No Methods & Description
1

Inferring the Schema using Reflection

このメソッドは、リフレクションを使用して、特定のタイプのオブジェクトを含むRDDのスキーマを生成します。

2

Programmatically Specifying the Schema

DataFrameを作成する2番目の方法は、スキーマを構築し、それを既存のRDDに適用できるプログラムインターフェイスを使用することです。

Spark SQL-データソース

DataFrameインターフェースにより、さまざまなデータソースをSpark SQLで動作させることができます。 これは一時テーブルであり、通常のRDDとして操作できます。 DataFrameをテーブルとして登録すると、そのデータに対してSQLクエリを実行できます。

この章では、異なるSparkデータソースを使用してデータをロードおよび保存する一般的な方法について説明します。 その後、組み込みデータソースで使用可能な特定のオプションについて詳しく説明します。

SparkSQLで利用可能なデータソースにはさまざまな種類があり、その一部を以下に示します-

Sr. No Data Sources
1

JSON Datasets

Spark SQLは、JSONデータセットのスキーマを自動的にキャプチャし、それをDataFrameとしてロードできます。

2

Hive Tables

Hiveは、SQLContextを継承するHiveContextとしてSparkライブラリにバンドルされています。

3

Parquet Files

寄木細工は、多くのデータ処理システムでサポートされている円柱形式です。