Apache-spark-quick-guide

提供:Dev Guides
移動先:案内検索

Apache Spark-はじめに

業界では、Hadoopを広範囲に使用してデータセットを分析しています。 その理由は、Hadoopフレームワークは単純なプログラミングモデル(MapReduce)に基づいており、スケーラブルで柔軟性があり、フォールトトレラントで費用効果の高いコンピューティングソリューションを可能にするためです。 ここでの主な関心事は、クエリ間の待機時間とプログラム実行の待機時間に関して、大きなデータセットの処理速度を維持することです。

Sparkは、Hadoop計算コンピューティングソフトウェアプロセスを高速化するためにApache Software Foundationによって導入されました。

一般的な考え方に反して、 SparkはHadoop の修正バージョンではなく、独自のクラスター管理があるため、実際にはHadoopに依存していません。 Hadoopは、Sparkを実装する方法の1つにすぎません。

Sparkは2つの方法でHadoopを使用します。1つは storage で、もう1つは processing です。 Sparkには独自のクラスター管理計算があるため、Hadoopはストレージ目的にのみ使用されます。

Apache Spark

Apache Sparkは、高速計算用に設計された超高速クラスターコンピューティングテクノロジーです。 Hadoop MapReduceに基づいており、MapReduceモデルを拡張して、インタラクティブクエリやストリーム処理など、より多くの種類の計算に効率的に使用します。 Sparkの主な機能は、*インメモリクラスターコンピューティング*であり、アプリケーションの処理速度を向上させます。

Sparkは、バッチアプリケーション、反復アルゴリズム、インタラクティブクエリ、ストリーミングなどの幅広いワークロードをカバーするように設計されています。 それぞれのシステムでこれらすべてのワークロードをサポートするだけでなく、個別のツールを維持する管理負担を軽減します。

Apache Sparkの進化

Sparkは、2009年にUC BerkeleyのAMPLabでMatei Zahariaによって開発されたHadoopのサブプロジェクトの1つです。 2010年にBSDライセンスの下でオープンソース化されました。 2013年にApacheソフトウェア財団に寄付され、現在、Apache Sparkは2014年2月からトップレベルのApacheプロジェクトになりました。

Apache Sparkの機能

Apache Sparkには次の機能があります。

  • 速度-Sparkは、Hadoopクラスターでアプリケーションを実行し、メモリで最大100倍、ディスクで実行すると10倍高速になります。 これは、ディスクへの読み取り/書き込み操作の数を減らすことで可能です。 中間処理データをメモリに保存します。
  • 複数の言語をサポート-Sparkは、Java、Scala、またはPythonの組み込みAPIを提供します。 したがって、異なる言語でアプリケーションを作成できます。 Sparkには、インタラクティブなクエリのための80の高レベル演算子が用意されています。
  • 高度な分析-Sparkは「マップ」と「削減」のみをサポートしていません。 また、SQLクエリ、ストリーミングデータ、機械学習(ML)、グラフアルゴリズムもサポートしています。

Hadoopに組み込まれたSpark

次の図は、Hadoopコンポーネントを使用してSparkを構築する3つの方法を示しています。

Spark Built on Hadoop

以下に説明するように、Sparkの展開には3つの方法があります。

  • スタンドアロン-Sparkスタンドアロン展開とは、SparkがHDFS(Hadoop Distributed File System)の上にある場所を占有し、HDFSに明示的にスペースが割り当てられることを意味します。 ここでは、SparkとMapReduceが並行して実行され、クラスター上のすべてのSparkジョブをカバーします。
  • Hadoop Yarn -Hadoop Yarnの展開とは、事前インストールやルートアクセスを必要とせずに、単純にSparkがYarnで実行されることを意味します。 SparkをHadoopエコシステムまたはHadoopスタックに統合するのに役立ちます。 他のコンポーネントをスタック上で実行できます。
  • * MapReduceのスパーク(SIMR)*-MapReduceのスパークは、スタンドアロン展開に加えてスパークジョブを起動するために使用されます。 SIMRを使用すると、ユーザーは管理アクセスなしでSparkを起動し、そのシェルを使用できます。

Sparkのコンポーネント

次の図は、Sparkのさまざまなコンポーネントを示しています。

Sparkのコンポーネント

Apache Sparkコア

Spark Coreは、他のすべての機能が構築されているsparkプラットフォームの基礎となる一般的な実行エンジンです。 インメモリコンピューティングと外部ストレージシステムの参照データセットを提供します。

Spark SQL

Spark SQLは、Spark Coreの上にあるコンポーネントであり、SchemaRDDと呼ばれる新しいデータ抽象化を導入し、構造化データおよび半構造化データのサポートを提供します。

スパークストリーミング

Spark Streamingは、Spark Coreの高速スケジューリング機能を活用して、ストリーミング分析を実行します。 ミニバッチでデータを取り込み、それらのミニバッチでRDD(Resilient Distributed Datasets)変換を実行します。

MLlib(機械学習ライブラリ)

MLlibは、分散メモリベースのSparkアーキテクチャのため、Sparkの上の分散型機械学習フレームワークです。 ベンチマークによれば、MLlib開発者は代替最小二乗(ALS)実装に対して行います。 Spark MLlibは、 Apache Mahout のHadoopディスクベースバージョン(MahoutがSparkインターフェースを取得する前)の9倍の速度です。

GraphX

GraphXは、Sparkの上にある分散グラフ処理フレームワークです。 Pregel抽象化APIを使用してユーザー定義グラフをモデル化できるグラフ計算を表現するためのAPIを提供します。 また、この抽象化のために最適化されたランタイムも提供します。

Apache Spark-RDD

弾力性のある分散データセット

弾力性のある分散データセット(RDD)は、Sparkの基本的なデータ構造です。 オブジェクトの不変の分散コレクションです。 RDDの各データセットは論理パーティションに分割され、クラスターの異なるノードで計算できます。 RDDには、ユーザー定義クラスを含む、あらゆるタイプのPython、Java、またはScalaオブジェクトを含めることができます。

正式には、RDDは読み取り専用のパーティション化されたレコードのコレクションです。 RDDは、安定したストレージ上のデータまたは他のRDDのいずれかの決定論的操作によって作成できます。 RDDは、並列で操作できる要素のフォールトトレラントなコレクションです。

RDDを作成するには2つの方法があります-ドライバープログラムの既存のコレクションを並列化する*、または共有ファイルシステム、HDFS、HBase、またはHadoop入力を提供するデータソースなどの外部ストレージシステムで*データセットを参照する*フォーマット。

SparkはRDDの概念を利用して、より高速で効率的なMapReduce操作を実現します。 まず、MapReduceの操作がどのように行われ、なぜそれほど効率的でないのかを説明しましょう。

MapReduceでのデータ共有が遅い

MapReduceは、クラスター上の並列分散アルゴリズムを使用して大規模なデータセットを処理および生成するために広く採用されています。 ユーザーは、作業の分散とフォールトトレランスを心配することなく、一連の高レベル演算子を使用して並列計算を作成できます。

残念ながら、現在のほとんどのフレームワークでは、計算間(Ex-2つのMapReduceジョブ間)でデータを再利用する唯一の方法は、外部の安定したストレージシステム(Ex-HDFS)に書き込むことです。 このフレームワークは、クラスターの計算リソースにアクセスするための多数の抽象化を提供しますが、ユーザーはさらに多くのものを求めています。

反復*および*インタラクティブ*アプリケーションの両方で、並列ジョブ間でより高速なデータ共有が必要です。 *Replication、Serialization 、および disk IO により、MapReduceでのデータ共有が遅くなります。 ほとんどのHadoopアプリケーションであるストレージシステムに関しては、HDFSの読み取り/書き込み操作に90%以上の時間を費やしています。

MapReduceの反復操作

多段階アプリケーションでの複数の計算で中間結果を再利用します。 次の図は、MapReduceで反復操作を実行しながら、現在のフレームワークがどのように機能するかを説明しています。 これにより、データ複製、ディスクI/O、およびシリアル化のためにかなりのオーバーヘッドが発生し、システムが遅くなります。

MapReduceの反復操作

MapReduceのインタラクティブな操作

ユーザーは、データの同じサブセットでアドホッククエリを実行します。 各クエリは、安定したストレージでディスクI/Oを実行します。これにより、アプリケーションの実行時間が支配されます。

次の図は、MapReduceでインタラクティブクエリを実行しているときに、現在のフレームワークがどのように機能するかを説明しています。

MapReduceのインタラクティブな操作

Spark RDDを使用したデータ共有

*Replication、Serialization* 、および *disk IO* により、MapReduceでのデータ共有が遅くなります。 ほとんどのHadoopアプリケーションでは、90%以上の時間をHDFS読み取り/書き込み操作に費やしています。

この問題を認識して、研究者はApache Sparkと呼ばれる特別なフレームワークを開発しました。 スパークの重要なアイデアは、* R 耐障害性 D 配布 D *アセット(RDD)です。インメモリ処理の計算をサポートします。 つまり、メモリの状態をジョブ全体のオブジェクトとして保存し、オブジェクトはそれらのジョブ間で共有可能です。 メモリ内のデータ共有は、ネットワークおよびディスクよりも10〜100倍高速です。

ここで、Spark RDDで反復的かつインタラクティブな操作がどのように行われるかを調べてみましょう。

Spark RDDの反復操作

以下の図は、Spark RDDの反復操作を示しています。 安定したストレージ(ディスク)ではなく、分散メモリに中間結果を保存し、システムを高速化します。

注意-分散メモリ(RAM)が中間結果(JOBの状態)を格納するのに十分でない場合、それらの結果はディスクに格納されます。

Spark RDDの反復操作

Spark RDDでのインタラクティブな操作

この図は、Spark RDDでのインタラクティブな操作を示しています。 同じデータセットに対して異なるクエリを繰り返し実行すると、この特定のデータをメモリに保持して実行時間を短縮できます。

Spark RDDのインタラクティブ操作

デフォルトでは、変換された各RDDは、アクションを実行するたびに再計算されます。 ただし、RDDをメモリに*永続化*することもできます。この場合、Sparkは次回クエリを実行するときに、より高速なアクセスのために要素をクラスタ上に保持します。 また、ディスク上のRDDの永続化、または複数のノード間での複製のサポートもあります。

Apache Spark-インストール

SparkはHadoopのサブプロジェクトです。 したがって、SparkをLinuxベースのシステムにインストールすることをお勧めします。 次の手順は、Apache Sparkのインストール方法を示しています。

ステップ1:Javaインストールの検証

Javaのインストールは、Sparkのインストールに必須の要素の1つです。 次のコマンドを試して、JAVAバージョンを確認してください。

$java -version

Javaがすでにシステムにインストールされている場合、次の応答が表示されます-

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

システムにJavaがインストールされていない場合は、次のステップに進む前にJavaをインストールします。

ステップ2:Scalaインストールの検証

Sparkを実装するにはScala言語を使用する必要があります。 次のコマンドを使用してScalaのインストールを確認しましょう。

$scala -version

Scalaがシステムに既にインストールされている場合、次の応答が表示されます-

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

システムにScalaがインストールされていない場合は、Scalaのインストールの次の手順に進みます。

ステップ3:Scalaをダウンロードする

次のリンクhttp://www.scala-lang.org/download/[Scalaのダウンロード]にアクセスして、Scalaの最新バージョンをダウンロードします。 このチュートリアルでは、scala-2.11.6バージョンを使用しています。 ダウンロード後、ダウンロードフォルダーにScala tarファイルがあります。

ステップ4:Scalaをインストールする

Scalaをインストールするには、以下の手順に従ってください。

Scala tarファイルを抽出します

Scala tarファイルを抽出するには、次のコマンドを入力します。

$ tar xvf scala-2.11.6.tgz

Scalaソフトウェアファイルを移動する

Scalaソフトウェアファイルをそれぞれのディレクトリ*(/usr/local/scala)*に移動するには、次のコマンドを使用します。

$ su –
Password:
# cd/home/Hadoop/Downloads/
# mv scala-2.11.6/usr/local/scala
# exit

ScalaのPATHを設定

ScalaのPATHを設定するには、次のコマンドを使用します。

$ export PATH = $PATH:/usr/local/scala/bin

Scalaインストールの検証

インストール後、確認することをお勧めします。 Scalaのインストールを確認するには、次のコマンドを使用します。

$scala -version

Scalaがシステムに既にインストールされている場合、次の応答が表示されます-

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

ステップ5:Apache Sparkのダウンロード

次のリンクhttps://spark.apache.org/downloadsl[Download Spark]にアクセスして、Sparkの最新バージョンをダウンロードします。 このチュートリアルでは、 spark-1.3.1-bin-hadoop2.6 バージョンを使用しています。 ダウンロード後、Spark tarファイルはダウンロードフォルダーにあります。

ステップ6:Sparkをインストールする

Sparkをインストールするには、以下の手順に従ってください。

Spark tarの抽出

次のコマンドは、spark tarファイルを抽出します。

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Sparkソフトウェアファイルの移動

Sparkソフトウェアファイルをそれぞれのディレクトリ*(/usr/local/spark)*に移動するための次のコマンド。

$ su –
Password:

# cd/home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6/usr/local/spark
# exit

Sparkの環境をセットアップする

〜* /。bashrc *ファイルに次の行を追加します。 スパークソフトウェアファイルがある場所をPATH変数に追加することを意味します。

export PATH=$PATH:/usr/local/spark/bin

〜/.bashrcファイルを入手するには、次のコマンドを使用します。

$ source ~/.bashrc

ステップ7:Sparkインストールの検証

Sparkシェルを開くための次のコマンドを記述します。

$spark-shell

sparkが正常にインストールされると、次の出力が見つかります。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
      ____              __
    /__/__  ___ _____//__
    _\ \/_ \/_ `/__/ '_/
  /___/.__/\_,_/_//_/\_\   version 1.4.0
     /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Apache Spark-コアプログラミング

Spark Coreはプロジェクト全体の基盤です。 分散タスクのディスパッチ、スケジューリング、および基本的なI/O機能を提供します。 SparkはRDD(Resilient Distributed Datasets)と呼ばれる特殊な基本データ構造を使用します。これは、マシン間でパーティション分割されたデータの論理的なコレクションです。 RDDは2つの方法で作成できます。 1つは外部ストレージシステムのデータセットを参照する方法で、もう1つは変換を適用する方法です(例: 既存のRDDでマップ、フィルター、レデューサー、結合)。

RDD抽象化は、言語統合APIを通じて公開されます。 これにより、アプリケーションがRDDを操作する方法がデータのローカルコレクションを操作することに似ているため、プログラミングの複雑さが簡素化されます。

スパークシェル

Sparkはインタラクティブなシェルを提供します-データをインタラクティブに分析するための強力なツールです。 ScalaまたはPython言語で利用可能です。 Sparkの主要な抽象化は、Resilient Distributed Dataset(RDD)と呼ばれるアイテムの分散コレクションです。 RDDは、Hadoop入力形式(HDFSファイルなど)から、または他のRDDを変換して作成できます。

Spark Shellを開く

次のコマンドは、Sparkシェルを開くために使用されます。

$ spark-shell

シンプルなRDDを作成する

テキストファイルから単純なRDDを作成します。 次のコマンドを使用して、単純なRDDを作成します。

scala> val inputfile = sc.textFile(“input.txt”)

上記のコマンドの出力は次のとおりです。

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD APIは、RDDを操作するための Transformations および Actions をほとんど導入していません。

RDD変換

RDD変換は、新しいRDDへのポインターを返し、RDD間の依存関係を作成できるようにします。 依存関係チェーン(依存関係の文字列)の各RDDには、そのデータを計算する機能があり、親RDDへのポインター(依存関係)があります。

Sparkは遅延であるため、ジョブの作成と実行をトリガーする変換またはアクションを呼び出さない限り、何も実行されません。 単語カウントの例の次のスニペットを見てください。

したがって、RDD変換はデータのセットではなく、データの取得方法とその処理方法をSparkに指示するプログラム内のステップ(唯一のステップである可能性があります)です。

以下に、RDD変換のリストを示します。

S.No Transformations & Meaning
1

map(func)

ソースの各要素を関数 func に渡すことにより形成された、新しい分散データセットを返します。

2

filter(func)

  • func* がtrueを返すソースの要素を選択して形成された新しいデータセットを返します。
3

flatMap(func)

mapと似ていますが、各入力アイテムは0個以上の出力アイテムにマップできます(したがって、_func_は単一のアイテムではなくSeqを返す必要があります)。

4

mapPartitions(func)

マップに似ていますが、RDDの各パーティション(ブロック)で個別に実行されるため、タイプTのRDDで実行する場合、 func はタイプIterator <T>⇒Iterator <U>でなければなりません。

5

mapPartitionsWithIndex(func)

マップパーティションに似ていますが、パーティションのインデックスを表す整数値を func に提供するため、 func はタイプ(Int、Iterator <T>)でなければなりません⇒タイプのRDDで実行する場合はIterator <U> T.

6

sample(withReplacement, fraction, seed)

特定の乱数ジェネレータシードを使用して、置換の有無にかかわらず、データの*割合*をサンプリングします。

7

union(otherDataset)

ソースデータセットの要素と引数の和集合を含む新しいデータセットを返します。

8

intersection(otherDataset)

ソースデータセットの要素と引数の共通部分を含む新しいRDDを返します。

9

distinct([numTasks])

ソースデータセットの個別の要素を含む新しいデータセットを返します。

10

groupByKey([numTasks])

(K、V)ペアのデータセットで呼び出されると、(K、Iterable <V>)ペアのデータセットを返します。

-各キーで集計(平均や平均など)を実行するためにグループ化する場合、reduceByKeyまたはaggregateByKeyを使用するとパフォーマンスが大幅に向上します。

11

reduceByKey(func, [numTasks])

(K、V)ペアのデータセットで呼び出された場合、(K、V)ペアのデータセットを返します。各キーの値は、(V、V)⇒V型でなければならない指定されたリデュース関数_func_を使用して集計されます。 groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数を使用して構成できます。

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

(K、V)ペアのデータセットで呼び出されると、(K、U)ペアのデータセットを返します。各データセットは、指定された結合関数とニュートラルな「ゼロ」値を使用して集計されます。 不要な割り当てを避けながら、入力値タイプとは異なる集約値タイプを許可します。 groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数を使用して構成できます。

13

sortByKey([ascending], [numTasks])

KがOrderedを実装する(K、V)ペアのデータセットで呼び出されると、ブール昇順引数で指定されているように、キーで昇順または降順でソートされた(K、V)ペアのデータセットを返します。

14

join(otherDataset, [numTasks])

タイプ(K、V)および(K、W)のデータセットで呼び出されると、各キーのすべての要素のペアを持つ(K、(V、W))ペアのデータセットを返します。 外部結合は、leftOuterJoin、rightOuterJoin、fullOuterJoinを通じてサポートされます。

15

cogroup(otherDataset, [numTasks])

タイプ(K、V)および(K、W)のデータセットで呼び出されると、(K、(Iterable <V>、Iterable <W>))タプルのデータセットを返します。 この操作は、グループWithとも呼ばれます。

16

cartesian(otherDataset)

タイプTおよびUのデータセットで呼び出されると、(T、U)ペア(すべての要素ペア)のデータセットを返します。

17

pipe(command, [envVars])

RDDの各パーティションをシェルコマンド(たとえば、 Perlまたはbashスクリプト。 RDD要素はプロセスの標準入力に書き込まれ、その標準出力への行出力は文字列のRDDとして返されます。

18

coalesce(numPartitions)

RDDのパーティションの数をnumPartitionsに減らします。 大きなデータセットをフィルタリングした後、操作をより効率的に実行するのに役立ちます。

19

repartition(numPartitions)

RDD内のデータをランダムにシャッフルし、より多くまたはより少ないパーティションを作成し、それらの間でバランスを取ります。 これにより、ネットワーク上のすべてのデータが常にシャッフルされます。

20

repartitionAndSortWithinPartitions(partitioner)

指定されたパーティショナーに従ってRDDを再パーティション化し、結果の各パーティション内で、キーでレコードをソートします。 これは、再パーティションを呼び出してから各パーティション内でソートするよりも効率的です。これは、ソートをシャッフルマシンにプッシュダウンできるためです。

行動

次の表に、値を返すアクションのリストを示します。

S.No Action & Meaning
1

reduce(func)

関数 func (2つの引数を取り、1つを返す)を使用して、データセットの要素を集約します。 関数は、並列で正しく計算できるように、可換および関連性が必要です。

2

collect()

データセットのすべての要素をドライバープログラムの配列として返します。 これは通常、フィルターまたはデータの十分に小さなサブセットを返す他の操作の後に役立ちます。

3

count()

データセット内の要素の数を返します。

4

first()

データセットの最初の要素を返します(テイク(1)と同様)。

5

take(n)

データセットの最初の n 要素を持つ配列を返します。

6

takeSample (withReplacement,num, [seed])

データセットの num 要素のランダムサンプルを含む配列を返します。置換の有無にかかわらず、オプションで乱数ジェネレータシードを事前に指定します。

7

takeOrdered(n, [ordering])

自然順序またはカスタムコンパレータのいずれかを使用して、RDDの最初の n 要素を返します。

8

saveAsTextFile(path)

データセットの要素をテキストファイル(またはテキストファイルのセット)として、ローカルファイルシステム、HDFS、またはその他のHadoopでサポートされているファイルシステムの特定のディレクトリに書き込みます。 Sparkは、各要素でtoStringを呼び出して、ファイル内のテキスト行に変換します。

9

saveAsSequenceFile(path) (Java and Scala)

データセットの要素を、Hadoop SequenceFileとして、ローカルファイルシステム、HDFS、またはその他のHadoopでサポートされているファイルシステムの特定のパスに書き込みます。 これは、Hadoopの書き込み可能なインターフェイスを実装するキーと値のペアのRDDで利用できます。 Scalaでは、暗黙的にWritableに変換可能な型でも使用できます(Sparkには、Int、Double、Stringなどの基本型の変換が含まれます)。

10

saveAsObjectFile(path) (Java and Scala)

Javaシリアル化を使用してデータセットの要素を単純な形式で書き込みます。その後、SparkContext.objectFile()を使用して読み込むことができます。

11

countByKey()

タイプ(K、V)のRDDでのみ使用可能です。 (K、Int)ペアと各キーのカウントのハッシュマップを返します。

12

foreach(func)

データセットの各要素で関数 func を実行します。 これは通常、アキュムレータの更新や外部ストレージシステムとの対話などの副作用のために行われます。

注意-foreach()以外のAccumulators以外の変数を変更すると、未定義の動作が発生する場合があります。 詳細については、クロージャーについてを参照してください。

RDDを使用したプログラミング

例の助けを借りて、RDDプログラミングでのいくつかのRDD変換とアクションの実装を見てみましょう。

単語数の例を考えてみましょう-文書に現れる各単語を数えます。 次のテキストを入力として考え、ホームディレクトリに input.txt ファイルとして保存されます。

*input.txt* -入力ファイル。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

以下の手順に従って、指定された例を実行します。

スパークシェルを開く

次のコマンドは、sparkシェルを開くために使用されます。 通常、sparkはScalaを使用して構築されます。 したがって、SparkプログラムはScala環境で実行されます。

$ spark-shell

Sparkシェルが正常に開くと、次の出力が表示されます。 出力の最後の行「scとして使用可能なスパークコンテキスト」を見ると、 sc という名前のスパークコンテキストオブジェクトが自動的に作成されていることを意味します。 プログラムの最初のステップを開始する前に、SparkContextオブジェクトを作成する必要があります。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
      ____              __
    /__/__  ___ _____//__
    _\ \/_ \/_ `/__/ '_/
  /___/.__/\_,_/_//_/\_\   version 1.4.0
     /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

RDDを作成する

まず、Spark-Scala APIを使用して入力ファイルを読み取り、RDDを作成する必要があります。

次のコマンドは、指定された場所からファイルを読み取るために使用されます。 ここでは、入力ファイルの名前で新しいRDDが作成されます。 textFile(“”)メソッドで引数として指定される文字列は、入力ファイル名の絶対パスです。 ただし、ファイル名のみが指定されている場合、入力ファイルが現在の場所にあることを意味します。

scala> val inputfile = sc.textFile("input.txt")

単語数変換を実行する

私たちの目的は、ファイル内の単語を数えることです。 各行を単語に分割するためのフラットマップを作成します(* flatMap(line⇒line.split(“”)*)。

次に、マップ関数(* map(word⇒(word、1))を使用して、各単語を値 *’1’ (<key、value> = <word、1>)を持つキーとして読み取ります。

最後に、同様のキーの値を追加してこれらのキーを減らします(* reduceByKey(_&plus; _)*)。

次のコマンドは、ワードカウントロジックの実行に使用されます。 これを実行した後、これはアクションではなく、変換であるため、出力は見つかりません。新しいRDDを指すか、指定されたデータをどうするかを火花に伝えます)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

現在のRDD

RDDの操作中に、現在のRDDについて知りたい場合は、次のコマンドを使用します。 現在のRDDとデバッグのための依存関係に関する説明が表示されます。

scala> counts.toDebugString

変換のキャッシュ

persist()またはcache()メソッドを使用して、RDDを永続化するようにマークできます。 アクションで最初に計算されるとき、ノード上のメモリに保持されます。 次のコマンドを使用して、中間変換をメモリに保存します。

scala> counts.cache()

アクションを適用する

すべての変換を保存するなどのアクションを適用すると、結果がテキストファイルになります。 saveAsTextFile(“”)メソッドのString引数は、出力フォルダーの絶対パスです。 次のコマンドを試して、出力をテキストファイルに保存します。 次の例では、「出力」フォルダーは現在の場所にあります。

scala> counts.saveAsTextFile("output")

出力の確認

別のターミナルを開いてホームディレクトリに移動します(sparkは他のターミナルで実行されます)。 出力ディレクトリを確認するには、次のコマンドを使用します。

[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1

part-00000
part-00001
_SUCCESS
*Part-00000* ファイルからの出力を表示するには、次のコマンドを使用します。
[hadoop@localhost output]$ cat part-00000

出力

(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
*Part-00001* ファイルからの出力を表示するには、次のコマンドを使用します。
[hadoop@localhost output]$ cat part-00001

出力

(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

国連ストレージの永続化

UN永続化の前に、このアプリケーションに使用されているストレージスペースを表示する場合は、ブラウザで次のURLを使用します。

http://localhost:4040

次の画面が表示されます。この画面には、Sparkシェルで実行されているアプリケーションに使用されるストレージスペースが表示されます。

ストレージスペース

特定のRDDのストレージスペースをUNパーシストにしたい場合は、次のコマンドを使用します。

Scala> counts.unpersist()

次のように出力が表示されます-

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

ブラウザのストレージスペースを確認するには、次のURLを使用します。

http://localhost:4040/

次の画面が表示されます。 Sparkシェルで実行されているアプリケーションに使用されるストレージスペースが表示されます。

アプリケーションのストレージスペース

Apache Spark-デプロイメント

spark-submitを使用するSparkアプリケーションは、クラスターにSparkアプリケーションをデプロイするために使用されるシェルコマンドです。 均一なインターフェースを介して、それぞれのクラスターマネージャーをすべて使用します。 したがって、アプリケーションをそれぞれに構成する必要はありません。

シェルコマンドを使用して、前に使用したワードカウントの同じ例を取り上げます。 ここでは、sparkアプリケーションと同じ例を検討します。

サンプル入力

次のテキストは入力データであり、名前の付いたファイルは in.txt です。

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

次のプログラムを見てください-

SparkWordCount.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._

object SparkWordCount {
   def main(args: Array[String]) {

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())

     /*local = master URL; Word Count = application name;*/
     /*/usr/local/spark = Spark Home; Nil = jars; Map = environment*/
     /*Map = variables to work nodes*/
     /*creating an inputRDD to read text file (in.txt) through Spark context*/
      val input = sc.textFile("in.txt")
     /*Transform the inputRDD into countRDD*/

      val count = input.flatMap(line ⇒ line.split(" "))
      .map(word ⇒ (word, 1))
      .reduceByKey(_ + _)

     /*saveAsTextFile method is an action that effects on the RDD*/
      count.saveAsTextFile("outfile")
      System.out.println("OK");
   }
}

上記のプログラムを SparkWordCount.scala という名前のファイルに保存し、 spark-application という名前のユーザー定義ディレクトリに配置します。

-inputRDDをcountRDDに変換する際、flatMap()を使用して、テキストファイルからの行を単語にトークン化し、map()メソッドで単語の頻度をカウントし、reduceByKey()メソッドで各単語の繰り返しをカウントします。

このアプリケーションを送信するには、次の手順を使用します。 ターミナルから spark-application ディレクトリのすべてのステップを実行します。

ステップ1:Spark Jaをダウンロードする

コンパイルにはSparkコアjarが必要です。したがって、次のリンクhttp://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.3からspark-core_2.10-1.3.0.jarをダウンロードしてください。 0 [Spark core jar]そして、jarファイルをダウンロードディレクトリから spark-application ディレクトリに移動します。

ステップ2:プログラムをコンパイルする

以下のコマンドを使用して、上記のプログラムをコンパイルします。 このコマンドは、spark-applicationディレクトリから実行する必要があります。 ここで、 /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar は、Sparkライブラリから取得したHadoopサポートjarです。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

ステップ3:JARを作成する

次のコマンドを使用して、sparkアプリケーションのjarファイルを作成します。 ここで、 wordcount はjarファイルのファイル名です。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

ステップ4:sparkアプリケーションを送信する

次のコマンドを使用してスパークアプリケーションを送信します-

spark-submit --class SparkWordCount --master local wordcount.jar

正常に実行されると、以下の出力が表示されます。 次の出力を許可する OK はユーザー識別用であり、プログラムの最終行です。 次の出力を注意深く読むと、次のようなさまざまなものが見つかります-

  • ポート42954でサービス「sparkDriver」を正常に開始しました
  • MemoryStoreは容量267.3 MBで開始しました
  • [[1]]
  • 追加されたJARファイル:/home/hadoop/piapplication/count.jar
  • ResultStage 1(SparkPi.scala:11のsaveAsTextFile)は0.566秒で終了しました
  • [[2]] Web UIを停止しました
  • メモリストアをクリアしました
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path =/tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

ステップ5:出力の確認

プログラムが正常に実行されると、spark-applicationディレクトリに outfile という名前のディレクトリが見つかります。

次のコマンドは、outfileディレクトリ内のファイルのリストを開いて確認するために使用されます。

$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
*part-00000* ファイルの出力を確認するためのコマンドは次のとおりです-
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

パート-00001ファイルの出力を確認するためのコマンドは次のとおりです-

$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

「spark-submit」コマンドの詳細については、次のセクションをご覧ください。

スパーク送信構文

spark-submit [options] <app jar | python file> [app arguments]

オプション

以下の表は、*オプション*のリストを説明しています-

S.No Option Description
1 --master spark://host:port, mesos://host:port, yarn, or local.
2 --deploy-mode Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client).
3 --class Your application’s main class (for Java/Scala apps).
4 --name A name of your application.
5 --jars Comma-separated list of local jars to include on the driver and executor classpaths.
6 --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths.
7 --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages.
8 --py-files Comma-separated list of .zip, .egg, or .py files to place on the PYTHON PATH for Python apps.
9 --files Comma-separated list of files to be placed in the working directory of each executor.
10 --conf (prop=val) Arbitrary Spark configuration property.
11 --properties-file Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.
12 --driver-memory Memory for driver (e.g. 1000M, 2G) (Default: 512M).
13 --driver-java-options Extra Java options to pass to the driver.
14 --driver-library-path Extra library path entries to pass to the driver.
15 --driver-class-path

Extra class path entries to pass to the driver.

--jarsで追加されたjarは、クラスパスに自動的に含まれることに注意してください。

16 --executor-memory Memory per executor (e.g. 1000M, 2G) (Default: 1G).
17 --proxy-user User to impersonate when submitting the application.
18 --help, -h Show this help message and exit.
19 --verbose, -v Print additional debug output.
20 --version Print the version of current Spark.
21 --driver-cores NUM Cores for driver (Default: 1).
22 --supervise If given, restarts the driver on failure.
23 --kill If given, kills the driver specified.
24 --status If given, requests the status of the driver specified.
25 --total-executor-cores Total cores for all executors.
26 --executor-cores Number of cores per executor. (Default : 1 in YARN mode, or all available cores on the worker in standalone mode).

高度なSparkプログラミング

Sparkには2つの異なるタイプの共有変数が含まれています。1つは* broadcast変数*で、もう1つは accumulators です。

  • ブロードキャスト変数-効率的に大きな値を配布するために使用されます。
  • アキュムレータ-特定のコレクションの情報を集約するために使用されます。

ブロードキャスト変数

ブロードキャスト変数を使用すると、プログラマはタスクのコピーを出荷せずに、読み取り専用変数を各マシンにキャッシュしておくことができます。 たとえば、すべてのノードに大きな入力データセットのコピーを効率的に提供するために使用できます。 Sparkは、通信コストを削減するために効率的なブロードキャストアルゴリズムを使用してブロードキャスト変数を配布しようとします。

スパークアクションは、分散「シャッフル」操作で区切られた一連のステージを通じて実行されます。 Sparkは、各ステージ内のタスクに必要な共通データを自動的にブロードキャストします。

この方法でブロードキャストされたデータは、シリアル化された形式でキャッシュされ、各タスクを実行する前に逆シリアル化されます。 つまり、ブロードキャスト変数を明示的に作成することは、複数のステージにまたがるタスクが同じデータを必要とする場合、またはデータをデシリアライズされた形式でキャッシュすることが重要な場合にのみ有用です。

ブロードキャスト変数は、* SparkContext.broadcast(v)を呼び出すことにより、変数 *v から作成されます。 ブロードキャスト変数は v のラッパーであり、その値は value メソッドを呼び出すことでアクセスできます。 以下に示すコードはこれを示しています-

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

出力-

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

ブロードキャスト変数を作成したら、クラスターで実行される関数で値 v の代わりに使用する必要があります。これにより、 v がノードに複数回出荷されないようになります。 さらに、すべてのノードがブロードキャスト変数の同じ値を取得できるようにするために、オブジェクト v はブロードキャスト後に変更しないでください。

アキュムレーター

アキュムレータは、連想操作を介してのみ「追加」される変数であるため、並行して効率的にサポートできます。 それらを使用して、カウンター(MapReduceなど)または合計を実装できます。 Sparkは数値型のアキュムレーターをネイティブでサポートし、プログラマーは新しい型のサポートを追加できます。 アキュムレータが名前で作成されている場合、それらは SparkのUI に表示されます。 これは、実行中のステージの進行状況を理解するのに役立ちます(注-Pythonではまだサポートされていません)。

アキュムレータは、* SparkContext.accumulator(v)を呼び出すことにより、初期値 *v から作成されます。 クラスターで実行されているタスクは、 add メソッドまたは&plus; =演算子(ScalaおよびPython)を使用して追加できます。 ただし、その値を読み取ることはできません。 ドライバープログラムのみが、 value メソッドを使用してアキュムレーターの値を読み取ることができます。

以下のコードは、配列の要素を加算するために使用されるアキュムレータを示しています-

scala> val accum = sc.accumulator(0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

上記のコードの出力を見たい場合は、次のコマンドを使用します-

scala> accum.value

出力

res2: Int = 10

数値RDD操作

Sparkでは、定義済みのAPIメソッドの1つを使用して、数値データに対してさまざまな操作を実行できます。 Sparkの数値演算は、モデルを一度に1要素ずつ作成できるストリーミングアルゴリズムで実装されます。

これらの操作は、* status()メソッドを呼び出すことで計算され、 *StatusCounter オブジェクトとして返されます。

以下は、 StatusCounter で使用可能な数値メソッドのリストです。

S.No Methods & Meaning
1

count()

RDDの要素の数。

2

Mean()

RDDの要素の平均。

3

Sum()

RDDの要素の合計値。

4

Max()

RDDのすべての要素の最大値。

5

Min()

RDDのすべての要素の最小値。

6

Variance()

要素の分散。

7

Stdev()

標準偏差。

これらのメソッドの1つのみを使用する場合は、RDDで直接対応するメソッドを呼び出すことができます。