Apache-kafka-integration-spark
Apache Kafka-Sparkとの統合
この章では、Apache KafkaをSpark Streaming APIと統合する方法について説明します。
Sparkについて
Spark Streaming APIは、ライブデータストリームのスケーラブルで高スループットのフォールトトレラントストリーム処理を可能にします。 データは、Kafka、Flume、Twitterなどの多くのソースから取得でき、map、reduce、join、windowなどの高レベル関数などの複雑なアルゴリズムを使用して処理できます。 最後に、処理されたデータをファイルシステム、データベース、ライブダッシュボードにプッシュできます。 弾力性のある分散データセット(RDD)は、Sparkの基本的なデータ構造です。 オブジェクトの不変の分散コレクションです。 RDDの各データセットは論理パーティションに分割され、クラスターの異なるノードで計算できます。
Sparkとの統合
Kafkaは、Sparkストリーミング用の潜在的なメッセージングおよび統合プラットフォームです。 Kafkaは、リアルタイムのデータストリームの中央ハブとして機能し、Spark Streamingの複雑なアルゴリズムを使用して処理されます。 データが処理されると、Spark Streamingはさらに別のKafkaトピックに結果を公開したり、HDFS、データベース、ダッシュボードに保存したりできます。 次の図は、概念的なフローを示しています。
それでは、Kafka-Spark APIの詳細を見ていきましょう。
SparkConf API
Sparkアプリケーションの構成を表します。 さまざまなSparkパラメーターをキーと値のペアとして設定するために使用されます。
「 SparkConf
」クラスには次のメソッドがあります-
- * set(string key、string value)*-設定変数を設定します。
- * remove(string key)*-設定からキーを削除します。
- * setAppName(string name)*-アプリケーションのアプリケーション名を設定します。
- * get(string key)*-キーを取得
StreamingContext API
これは、Spark機能の主要なエントリポイントです。 SparkContextは、Sparkクラスターへの接続を表し、クラスター上でRDD、アキュムレーター、およびブロードキャスト変数を作成するために使用できます。 署名は次のように定義されます。
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
- master -接続先のクラスターURL(例: mesos://host:port、spark://host:port、local [4])。
- appName -クラスターWeb UIに表示するジョブの名前
- batchDuration -ストリーミングデータがバッチに分割される時間間隔
public StreamingContext(SparkConf conf, Duration batchDuration)
新しいSparkContextに必要な構成を提供して、StreamingContextを作成します。
- conf -Sparkパラメーター
- batchDuration -ストリーミングデータがバッチに分割される時間間隔
KafkaUtils API
KafkaUtils APIは、KafkaクラスターをSparkストリーミングに接続するために使用されます。 このAPIには、以下のように定義されたsignifi-cantメソッド「 createStream
」署名があります。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上記のメソッドは、Kafka Brokersからメッセージをプルする入力ストリームを作成するために使用されます。
- ssc -StreamingContextオブジェクト。
- zkQuorum -Zookeeperクォーラム。
- groupId -このコンシューマのグループID。
- * topics-消費するトピックのマップを返します。
- storageLevel -受信したオブジェクトの保存に使用するストレージレベル。
KafkaUtils APIには別のメソッドcreateDirectStreamがあります。これは、レシーバーを使用せずにKafka Brokersからメッセージを直接プルする入力ストリームを作成するために使用されます。 このストリームは、Kafkaからの各メッセージが1回だけ変換に含まれることを保証できます。
サンプルアプリケーションはScalaで実行されます。 アプリケーションをコンパイルするには、 "` sbt` "、scalaビルドツール(mavenに類似)をダウンロードしてインストールしてください。 メインアプリケーションコードを以下に示します。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
構築スクリプト
spark-kafka統合は、spark、sparkストリーミング、およびspark kafka統合jarに依存しています。 新しいファイル「 build.sbt
」を作成し、アプリケーションの詳細とその依存関係を指定します。 「 sbt
」は、アプリケーションをコンパイルおよびパックするときに必要なjarをダウンロードします。
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
コンパイル/パッケージング
次のコマンドを実行して、アプリケーションのjarファイルをコンパイルおよびパッケージ化します。 アプリケーションを実行するには、jarファイルをスパークコンソールに送信する必要があります。
sbt package
Sparkに送信する
Kafka Producer CLI(前の章で説明)を起動し、「 my-first-topic
」という新しいトピックを作成し、以下に示すサンプルメッセージを提供します。
Another spark test message
次のコマンドを実行して、アプリケーションをスパークコンソールに送信します。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
このアプリケーションのサンプル出力を以下に示します。
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..