Apache-spark-deployment

提供:Dev Guides
移動先:案内検索

Apache Spark-デプロイメント

spark-submitを使用するSparkアプリケーションは、クラスターにSparkアプリケーションをデプロイするために使用されるシェルコマンドです。 均一なインターフェースを介して、それぞれのクラスターマネージャーをすべて使用します。 したがって、アプリケーションをそれぞれに構成する必要はありません。

シェルコマンドを使用して、前に使用したワードカウントの同じ例を取り上げます。 ここでは、sparkアプリケーションと同じ例を検討します。

サンプル入力

次のテキストは入力データであり、名前の付いたファイルは in.txt です。

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

次のプログラムを見てください-

SparkWordCount.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._

object SparkWordCount {
   def main(args: Array[String]) {

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())

     /*local = master URL; Word Count = application name;*/
     /*/usr/local/spark = Spark Home; Nil = jars; Map = environment*/
     /*Map = variables to work nodes*/
     /*creating an inputRDD to read text file (in.txt) through Spark context*/
      val input = sc.textFile("in.txt")
     /*Transform the inputRDD into countRDD*/

      val count = input.flatMap(line ⇒ line.split(" "))
      .map(word ⇒ (word, 1))
      .reduceByKey(_ + _)

     /*saveAsTextFile method is an action that effects on the RDD*/
      count.saveAsTextFile("outfile")
      System.out.println("OK");
   }
}

上記のプログラムを SparkWordCount.scala という名前のファイルに保存し、 spark-application という名前のユーザー定義ディレクトリに配置します。

-inputRDDをcountRDDに変換する際、flatMap()を使用して、テキストファイルからの行を単語にトークン化し、map()メソッドで単語の頻度をカウントし、reduceByKey()メソッドで各単語の繰り返しをカウントします。

このアプリケーションを送信するには、次の手順を使用します。 ターミナルから spark-application ディレクトリのすべてのステップを実行します。

ステップ1:Spark Jaをダウンロードする

コンパイルにはSparkコアjarが必要です。したがって、次のリンクhttp://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.3からspark-core_2.10-1.3.0.jarをダウンロードしてください。 0 [Spark core jar]そして、jarファイルをダウンロードディレクトリから spark-application ディレクトリに移動します。

ステップ2:プログラムをコンパイルする

以下のコマンドを使用して、上記のプログラムをコンパイルします。 このコマンドは、spark-applicationディレクトリから実行する必要があります。 ここで、 /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar は、Sparkライブラリから取得したHadoopサポートjarです。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

ステップ3:JARを作成する

次のコマンドを使用して、sparkアプリケーションのjarファイルを作成します。 ここで、 wordcount はjarファイルのファイル名です。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

ステップ4:sparkアプリケーションを送信する

次のコマンドを使用してスパークアプリケーションを送信します-

spark-submit --class SparkWordCount --master local wordcount.jar

正常に実行されると、以下の出力が表示されます。 次の出力を許可する OK はユーザー識別用であり、プログラムの最終行です。 次の出力を注意深く読むと、次のようなさまざまなものが見つかります-

  • ポート42954でサービス「sparkDriver」を正常に開始しました
  • MemoryStoreは容量267.3 MBで開始しました
  • [[1]]
  • 追加されたJARファイル:/home/hadoop/piapplication/count.jar
  • ResultStage 1(SparkPi.scala:11のsaveAsTextFile)は0.566秒で終了しました
  • [[2]] Web UIを停止しました
  • メモリストアをクリアしました
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path =/tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

ステップ5:出力の確認

プログラムが正常に実行されると、spark-applicationディレクトリに outfile という名前のディレクトリが見つかります。

次のコマンドは、outfileディレクトリ内のファイルのリストを開いて確認するために使用されます。

$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
*part-00000* ファイルの出力を確認するためのコマンドは次のとおりです-
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

パート-00001ファイルの出力を確認するためのコマンドは次のとおりです-

$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

「spark-submit」コマンドの詳細については、次のセクションをご覧ください。

スパーク送信構文

spark-submit [options] <app jar | python file> [app arguments]

オプション

以下の表は、*オプション*のリストを説明しています-

S.No Option Description
1 --master spark://host:port, mesos://host:port, yarn, or local.
2 --deploy-mode Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client).
3 --class Your application’s main class (for Java/Scala apps).
4 --name A name of your application.
5 --jars Comma-separated list of local jars to include on the driver and executor classpaths.
6 --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths.
7 --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages.
8 --py-files Comma-separated list of .zip, .egg, or .py files to place on the PYTHON PATH for Python apps.
9 --files Comma-separated list of files to be placed in the working directory of each executor.
10 --conf (prop=val) Arbitrary Spark configuration property.
11 --properties-file Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.
12 --driver-memory Memory for driver (e.g. 1000M, 2G) (Default: 512M).
13 --driver-java-options Extra Java options to pass to the driver.
14 --driver-library-path Extra library path entries to pass to the driver.
15 --driver-class-path

Extra class path entries to pass to the driver.

--jarsで追加されたjarは、クラスパスに自動的に含まれることに注意してください。

16 --executor-memory Memory per executor (e.g. 1000M, 2G) (Default: 1G).
17 --proxy-user User to impersonate when submitting the application.
18 --help, -h Show this help message and exit.
19 --verbose, -v Print additional debug output.
20 --version Print the version of current Spark.
21 --driver-cores NUM Cores for driver (Default: 1).
22 --supervise If given, restarts the driver on failure.
23 --kill If given, kills the driver specified.
24 --status If given, requests the status of the driver specified.
25 --total-executor-cores Total cores for all executors.
26 --executor-cores Number of cores per executor. (Default : 1 in YARN mode, or all available cores on the worker in standalone mode).