Apache-spark-core-programming

提供:Dev Guides
移動先:案内検索

Apache Spark-コアプログラミング

Spark Coreはプロジェクト全体の基盤です。 分散タスクのディスパッチ、スケジューリング、および基本的なI/O機能を提供します。 SparkはRDD(Resilient Distributed Datasets)と呼ばれる特殊な基本データ構造を使用します。これは、マシン間でパーティション分割されたデータの論理的なコレクションです。 RDDは2つの方法で作成できます。 1つは外部ストレージシステムのデータセットを参照する方法で、もう1つは変換を適用する方法です(例: 既存のRDDでマップ、フィルター、レデューサー、結合)。

RDD抽象化は、言語統合APIを通じて公開されます。 これにより、アプリケーションがRDDを操作する方法がデータのローカルコレクションを操作することに似ているため、プログラミングの複雑さが簡素化されます。

スパークシェル

Sparkはインタラクティブなシェルを提供します-データをインタラクティブに分析するための強力なツールです。 ScalaまたはPython言語で利用可能です。 Sparkの主要な抽象化は、Resilient Distributed Dataset(RDD)と呼ばれるアイテムの分散コレクションです。 RDDは、Hadoop入力形式(HDFSファイルなど)から、または他のRDDを変換して作成できます。

Spark Shellを開く

次のコマンドは、Sparkシェルを開くために使用されます。

$ spark-shell

シンプルなRDDを作成する

テキストファイルから単純なRDDを作成します。 次のコマンドを使用して、単純なRDDを作成します。

scala> val inputfile = sc.textFile(“input.txt”)

上記のコマンドの出力は次のとおりです。

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD APIは、RDDを操作するための Transformations および Actions をほとんど導入していません。

RDD変換

RDD変換は、新しいRDDへのポインターを返し、RDD間の依存関係を作成できるようにします。 依存関係チェーン(依存関係の文字列)の各RDDには、そのデータを計算する機能があり、親RDDへのポインター(依存関係)があります。

Sparkは遅延であるため、ジョブの作成と実行をトリガーする変換またはアクションを呼び出さない限り、何も実行されません。 単語カウントの例の次のスニペットを見てください。

したがって、RDD変換はデータのセットではなく、データの取得方法とその処理方法をSparkに指示するプログラム内のステップ(唯一のステップである可能性があります)です。

以下に、RDD変換のリストを示します。

S.No Transformations & Meaning
1

map(func)

ソースの各要素を関数 func に渡すことにより形成された、新しい分散データセットを返します。

2

filter(func)

  • func* がtrueを返すソースの要素を選択して形成された新しいデータセットを返します。
3

flatMap(func)

mapと似ていますが、各入力アイテムは0個以上の出力アイテムにマップできます(したがって、_func_は単一のアイテムではなくSeqを返す必要があります)。

4

mapPartitions(func)

マップに似ていますが、RDDの各パーティション(ブロック)で個別に実行されるため、タイプTのRDDで実行する場合、 func はタイプIterator <T>⇒Iterator <U>でなければなりません。

5

mapPartitionsWithIndex(func)

マップパーティションに似ていますが、パーティションのインデックスを表す整数値を func に提供するため、 func はタイプ(Int、Iterator <T>)でなければなりません⇒タイプのRDDで実行する場合はIterator <U> T.

6

sample(withReplacement, fraction, seed)

特定の乱数ジェネレータシードを使用して、置換の有無にかかわらず、データの*割合*をサンプリングします。

7

union(otherDataset)

ソースデータセットの要素と引数の和集合を含む新しいデータセットを返します。

8

intersection(otherDataset)

ソースデータセットの要素と引数の共通部分を含む新しいRDDを返します。

9

distinct([numTasks])

ソースデータセットの個別の要素を含む新しいデータセットを返します。

10

groupByKey([numTasks])

(K、V)ペアのデータセットで呼び出されると、(K、Iterable <V>)ペアのデータセットを返します。

-各キーで集計(平均や平均など)を実行するためにグループ化する場合、reduceByKeyまたはaggregateByKeyを使用するとパフォーマンスが大幅に向上します。

11

reduceByKey(func, [numTasks])

(K、V)ペアのデータセットで呼び出された場合、(K、V)ペアのデータセットを返します。各キーの値は、(V、V)⇒V型でなければならない指定されたリデュース関数_func_を使用して集計されます。 groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数を使用して構成できます。

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

(K、V)ペアのデータセットで呼び出されると、(K、U)ペアのデータセットを返します。各データセットは、指定された結合関数とニュートラルな「ゼロ」値を使用して集計されます。 不要な割り当てを避けながら、入力値タイプとは異なる集約値タイプを許可します。 groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数を使用して構成できます。

13

sortByKey([ascending], [numTasks])

KがOrderedを実装する(K、V)ペアのデータセットで呼び出されると、ブール昇順引数で指定されているように、キーで昇順または降順でソートされた(K、V)ペアのデータセットを返します。

14

join(otherDataset, [numTasks])

タイプ(K、V)および(K、W)のデータセットで呼び出されると、各キーのすべての要素のペアを持つ(K、(V、W))ペアのデータセットを返します。 外部結合は、leftOuterJoin、rightOuterJoin、fullOuterJoinを通じてサポートされます。

15

cogroup(otherDataset, [numTasks])

タイプ(K、V)および(K、W)のデータセットで呼び出されると、(K、(Iterable <V>、Iterable <W>))タプルのデータセットを返します。 この操作は、グループWithとも呼ばれます。

16

cartesian(otherDataset)

タイプTおよびUのデータセットで呼び出されると、(T、U)ペア(すべての要素ペア)のデータセットを返します。

17

pipe(command, [envVars])

RDDの各パーティションをシェルコマンド(たとえば、 Perlまたはbashスクリプト。 RDD要素はプロセスの標準入力に書き込まれ、その標準出力への行出力は文字列のRDDとして返されます。

18

coalesce(numPartitions)

RDDのパーティションの数をnumPartitionsに減らします。 大きなデータセットをフィルタリングした後、操作をより効率的に実行するのに役立ちます。

19

repartition(numPartitions)

RDD内のデータをランダムにシャッフルし、より多くまたはより少ないパーティションを作成し、それらの間でバランスを取ります。 これにより、ネットワーク上のすべてのデータが常にシャッフルされます。

20

repartitionAndSortWithinPartitions(partitioner)

指定されたパーティショナーに従ってRDDを再パーティション化し、結果の各パーティション内で、キーでレコードをソートします。 これは、再パーティションを呼び出してから各パーティション内でソートするよりも効率的です。これは、ソートをシャッフルマシンにプッシュダウンできるためです。

行動

次の表に、値を返すアクションのリストを示します。

S.No Action & Meaning
1

reduce(func)

関数 func (2つの引数を取り、1つを返す)を使用して、データセットの要素を集約します。 関数は、並列で正しく計算できるように、可換および関連性が必要です。

2

collect()

データセットのすべての要素をドライバープログラムの配列として返します。 これは通常、フィルターまたはデータの十分に小さなサブセットを返す他の操作の後に役立ちます。

3

count()

データセット内の要素の数を返します。

4

first()

データセットの最初の要素を返します(テイク(1)と同様)。

5

take(n)

データセットの最初の n 要素を持つ配列を返します。

6

takeSample (withReplacement,num, [seed])

データセットの num 要素のランダムサンプルを含む配列を返します。置換の有無にかかわらず、オプションで乱数ジェネレータシードを事前に指定します。

7

takeOrdered(n, [ordering])

自然順序またはカスタムコンパレータのいずれかを使用して、RDDの最初の n 要素を返します。

8

saveAsTextFile(path)

データセットの要素をテキストファイル(またはテキストファイルのセット)として、ローカルファイルシステム、HDFS、またはその他のHadoopでサポートされているファイルシステムの特定のディレクトリに書き込みます。 Sparkは、各要素でtoStringを呼び出して、ファイル内のテキスト行に変換します。

9

saveAsSequenceFile(path) (Java and Scala)

データセットの要素を、Hadoop SequenceFileとして、ローカルファイルシステム、HDFS、またはその他のHadoopでサポートされているファイルシステムの特定のパスに書き込みます。 これは、Hadoopの書き込み可能なインターフェイスを実装するキーと値のペアのRDDで利用できます。 Scalaでは、暗黙的にWritableに変換可能な型でも使用できます(Sparkには、Int、Double、Stringなどの基本型の変換が含まれます)。

10

saveAsObjectFile(path) (Java and Scala)

Javaシリアル化を使用してデータセットの要素を単純な形式で書き込みます。その後、SparkContext.objectFile()を使用して読み込むことができます。

11

countByKey()

タイプ(K、V)のRDDでのみ使用可能です。 (K、Int)ペアと各キーのカウントのハッシュマップを返します。

12

foreach(func)

データセットの各要素で関数 func を実行します。 これは通常、アキュムレータの更新や外部ストレージシステムとの対話などの副作用のために行われます。

注意-foreach()以外のAccumulators以外の変数を変更すると、未定義の動作が発生する場合があります。 詳細については、クロージャーについてを参照してください。

RDDを使用したプログラミング

例の助けを借りて、RDDプログラミングでのいくつかのRDD変換とアクションの実装を見てみましょう。

単語数の例を考えてみましょう-文書に現れる各単語を数えます。 次のテキストを入力として考え、ホームディレクトリに input.txt ファイルとして保存されます。

*input.txt* -入力ファイル。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

以下の手順に従って、指定された例を実行します。

スパークシェルを開く

次のコマンドは、sparkシェルを開くために使用されます。 通常、sparkはScalaを使用して構築されます。 したがって、SparkプログラムはScala環境で実行されます。

$ spark-shell

Sparkシェルが正常に開くと、次の出力が表示されます。 出力の最後の行「scとして使用可能なスパークコンテキスト」を見ると、 sc という名前のスパークコンテキストオブジェクトが自動的に作成されていることを意味します。 プログラムの最初のステップを開始する前に、SparkContextオブジェクトを作成する必要があります。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
      ____              __
    /__/__  ___ _____//__
    _\ \/_ \/_ `/__/ '_/
  /___/.__/\_,_/_//_/\_\   version 1.4.0
     /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

RDDを作成する

まず、Spark-Scala APIを使用して入力ファイルを読み取り、RDDを作成する必要があります。

次のコマンドは、指定された場所からファイルを読み取るために使用されます。 ここでは、入力ファイルの名前で新しいRDDが作成されます。 textFile(“”)メソッドで引数として指定される文字列は、入力ファイル名の絶対パスです。 ただし、ファイル名のみが指定されている場合、入力ファイルが現在の場所にあることを意味します。

scala> val inputfile = sc.textFile("input.txt")

単語数変換を実行する

私たちの目的は、ファイル内の単語を数えることです。 各行を単語に分割するためのフラットマップを作成します(* flatMap(line⇒line.split(“”)*)。

次に、マップ関数(* map(word⇒(word、1))を使用して、各単語を値 *’1’ (<key、value> = <word、1>)を持つキーとして読み取ります。

最後に、同様のキーの値を追加してこれらのキーを減らします(* reduceByKey(_&plus; _)*)。

次のコマンドは、ワードカウントロジックの実行に使用されます。 これを実行した後、これはアクションではなく、変換であるため、出力は見つかりません。新しいRDDを指すか、指定されたデータをどうするかを火花に伝えます)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

現在のRDD

RDDの操作中に、現在のRDDについて知りたい場合は、次のコマンドを使用します。 現在のRDDとデバッグのための依存関係に関する説明が表示されます。

scala> counts.toDebugString

変換のキャッシュ

persist()またはcache()メソッドを使用して、RDDを永続化するようにマークできます。 アクションで最初に計算されるとき、ノード上のメモリに保持されます。 次のコマンドを使用して、中間変換をメモリに保存します。

scala> counts.cache()

アクションを適用する

すべての変換を保存するなどのアクションを適用すると、結果がテキストファイルになります。 saveAsTextFile(“”)メソッドのString引数は、出力フォルダーの絶対パスです。 次のコマンドを試して、出力をテキストファイルに保存します。 次の例では、「出力」フォルダーは現在の場所にあります。

scala> counts.saveAsTextFile("output")

出力の確認

別のターミナルを開いてホームディレクトリに移動します(sparkは他のターミナルで実行されます)。 出力ディレクトリを確認するには、次のコマンドを使用します。

[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1

part-00000
part-00001
_SUCCESS
*Part-00000* ファイルからの出力を表示するには、次のコマンドを使用します。
[hadoop@localhost output]$ cat part-00000

出力

(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
*Part-00001* ファイルからの出力を表示するには、次のコマンドを使用します。
[hadoop@localhost output]$ cat part-00001

出力

(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

国連ストレージの永続化

UN永続化の前に、このアプリケーションに使用されているストレージスペースを表示する場合は、ブラウザで次のURLを使用します。

http://localhost:4040

次の画面が表示されます。この画面には、Sparkシェルで実行されているアプリケーションに使用されるストレージスペースが表示されます。

ストレージスペース

特定のRDDのストレージスペースをUNパーシストにしたい場合は、次のコマンドを使用します。

Scala> counts.unpersist()

次のように出力が表示されます-

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

ブラウザのストレージスペースを確認するには、次のURLを使用します。

http://localhost:4040/

次の画面が表示されます。 Sparkシェルで実行されているアプリケーションに使用されるストレージスペースが表示されます。

アプリケーションのストレージスペース