Apache-kafka-quick-guide

提供:Dev Guides
移動先:案内検索

Apache Kafka-はじめに

ビッグデータでは、膨大な量のデータが使用されます。 データに関しては、主に2つの課題があります。最初の課題は、大量のデータを収集する方法であり、2番目の課題は、収集したデータを分析することです。 これらの課題を克服するには、メッセージングシステムが必要です。

Kafkaは、分散型の高スループットシステム向けに設計されています。 Kafkaは、従来のメッセージブローカーの代わりとして非常にうまく機能する傾向があります。 他のメッセージングシステムと比較して、Kafkaはスループットが向上し、パーティション分割、複製、および固有のフォールトトレランスが組み込まれているため、大規模なメッセージ処理アプリケーションに最適です。

メッセージングシステムとは

メッセージングシステムは、あるアプリケーションから別のアプリケーションにデータを転送するため、アプリケーションはデータに集中できますが、データの共有方法については心配しません。 分散メッセージングは​​、信頼できるメッセージキューイングの概念に基づいています。 メッセージは、クライアントアプリケーションとメッセージングシステムの間で非同期的にキューに入れられます。 2種類のメッセージングパターンを使用できます。1つはポイントツーポイント、もう1つはパブリッシュ/サブスクライブ(pub-sub)メッセージングシステムです。 ほとんどのメッセージングパターンは pub-sub に従います。

ポイントツーポイントメッセージングシステム

ポイントツーポイントシステムでは、メッセージはキューに保持されます。 1つ以上のコンシューマーがキュー内のメッセージを消費できますが、特定のメッセージは最大1つのコンシューマーのみが消費できます。 コンシューマは、キュー内のメッセージを読み取ると、そのキューから消えます。 このシステムの典型的な例は、各注文が1つの注文処理者によって処理される注文処理システムですが、複数の注文処理者が同時に機能することもできます。 次の図は、構造を示しています。

ポイントツーポイントメッセージングシステム

パブリッシュ/サブスクライブメッセージングシステム

パブリッシュ/サブスクライブシステムでは、メッセージはトピックに保持されます。 ポイントツーポイントシステムとは異なり、消費者は1つ以上のトピックをサブスクライブし、そのトピックのすべてのメッセージを消費できます。 パブリッシュ/サブスクライブシステムでは、メッセージプロデューサーはパブリッシャーと呼ばれ、メッセージコンシューマーはサブスクライバーと呼ばれます。 実際の例としては、Dish TVがあります。これは、スポーツ、映画、音楽などのさまざまなチャンネルを公開し、誰でも自分のチャンネルセットに登録して、登録済みのチャンネルが利用可能になるといつでも入手できます。

Publish-Subscribe Messaging system

カフカとは何ですか?

Apache Kafkaは、分散パブリッシュ/サブスクライブメッセージングシステムであり、大量のデータを処理でき、あるエンドポイントから別のエンドポイントにメッセージを渡すことができる堅牢なキューです。 Kafkaは、オフラインとオンラインの両方のメッセージ消費に適しています。 データの損失を防ぐため、Kafkaメッセージはディスクに保存され、クラスター内で複製されます。 Kafkaは、ZooKeeper同期サービスの上に構築されています。 リアルタイムストリーミングデータ分析のために、Apache StormおよびSparkと非常によく統合されます。

利点

以下はカフカのいくつかの利点です-

  • 信頼性-Kafkaは分散、パーティション分割、複製、およびフォールトトレランスです。
  • スケーラビリティ-Kafkaメッセージングシステムは、ダウンタイムなしで簡単にスケーリングできます。
  • 耐久性-Kafkaは「 Distributed commit log」を使用します。これは、メッセージが可能な限り高速にディスクに保持されるため、耐久性があることを意味します。
  • パフォーマンス-Kafkaは、メッセージの発行とサブスクライブの両方のスループットが高くなっています。 多くのTBのメッセージが保存されても、安定したパフォーマンスを維持します。

Kafkaは非常に高速で、ダウンタイムとデータ損失をゼロに保証します。

ユースケース

Kafkaは多くのユースケースで使用できます。 それらのいくつかは以下にリストされています-

  • メトリック-Kafkaは、運用監視データによく使用されます。 これには、分散アプリケーションから統計を集約して、運用データの集中フィードを生成することが含まれます。
  • ログ集約ソリューション-組織全体でKafkaを使用して、複数のサービスからログを収集し、それらを標準形式で複数の消費者が利用できるようにすることができます。
  • ストリーム処理-StormやSpark Streamingなどの人気のあるフレームワークは、トピックからデータを読み取り、処理し、処理されたデータを新しいトピックに書き込み、ユーザーとアプリケーションで利用できるようにします。 Kafkaの強力な耐久性は、ストリーム処理のコンテキストでも非常に役立ちます。

カフカの必要性

Kafkaは、すべてのリアルタイムデータフィードを処理するための統合プラットフォームです。 Kafkaは低遅延メッセージ配信をサポートし、マシン障害が発生した場合のフォールトトレランスを保証します。 多数の多様な消費者を処理する能力があります。 Kafkaは非常に高速で、毎秒200万回の書き込みを実行します。 Kafkaはすべてのデータをディスクに保持します。つまり、本質的にはすべての書き込みがOS(RAM)のページキャッシュに送られます。 これにより、ページキャッシュからネットワークソケットにデータを転送するのが非常に効率的になります。

Apache Kafka-基礎

Kafkaに深く入り込む前に、トピック、ブローカー、プロデューサー、コンシューマーなどの主要な用語に注意する必要があります。 次の図は主な用語を示し、表は図のコンポーネントを詳細に説明しています。

基本

上の図では、トピックは3つのパーティションに構成されています。 パーティション1には2つのオフセット係数0と1があります。 パーティション2には、4つのオフセット係数0、1、2、および3があります。 パーティション3には1つのオフセット係数0があります。 レプリカのIDは、レプリカをホストするサーバーのIDと同じです。

トピックのレプリケーション係数が3に設定されている場合、Kafkaは各パーティションの3つの同一レプリカを作成し、クラスターに配置して、すべての操作に使用できるようにします。 クラスター内の負荷を分散するために、各ブローカーはそれらのパーティションの1つ以上を格納します。 複数のプロデューサーとコンシューマーが同時にメッセージを公開および取得できます。

S.No Components and Description
1

Topics

特定のカテゴリに属する​​メッセージのストリームは、トピックと呼ばれます。 データはトピックに保存されます。

トピックはパーティションに分割されます。 各トピックについて、Kafkaは最小1つのパーティションを保持します。 このような各パーティションには、不変の順序でメッセージが含まれています。 パーティションは、同じサイズのセグメントファイルのセットとして実装されます。

2

Partition

トピックには多くのパーティションがあるため、任意の量のデータを処理できます。

3

Partition offset

分割された各メッセージには、「 offset」と呼ばれる一意のシーケンスIDがあります。

4

Replicas of partition

レプリカは、パーティションの「バックアップ」に他なりません。 レプリカは決してデータを読み書きしません。 データの損失を防ぐために使用されます。

5

Brokers

  • ブローカーは、公開されたデータを管理するシンプルなシステムです。 各ブローカーには、トピックごとに0個以上のパーティションがあります。 トピックにN個のパーティションがあり、N個のブローカーがある場合、各ブローカーには1つのパーティションがあります。
  • トピックにN個のパーティションがあり、N個を超えるブローカー(n + m)がある場合、最初のN個のブローカーには1つのパーティションがあり、次のMブローカーにはその特定のトピックのパーティションはありません。 *トピックにN個のパーティションがあり、N個未満のブローカー(n-m)がある場合、各ブローカーは1つ以上のパーティションを共有します。 このシナリオは、ブローカー間で負荷が不均等に分配されるため推奨されません。
6
  • Kafka Cluster*

Kafkaが複数のブローカーを持っていることをKafkaクラスターと呼びます。 Kafkaクラスターは、ダウンタイムなしで拡張できます。 これらのクラスターは、メッセージデータの永続性とレプリケーションを管理するために使用されます。

7

Producers

プロデューサーは、1つ以上のKafkaトピックへのメッセージの発行者です。 プロデューサーは、Kafkaブローカーにデータを送信します。 プロデューサーがブローカーにメッセージを発行するたびに、ブローカーは最後のセグメントファイルにメッセージを単に追加します。 実際には、メッセージはパーティションに追加されます。 プロデューサーは、選択したパーティションにメッセージを送信することもできます。

8

Consumers

消費者はブローカーからデータを読み取ります。 消費者は、1つ以上のトピックにサブスクライブし、ブローカーからデータをプルすることにより、公開されたメッセージを消費します。

9

Leader

「リーダー」は、指定されたパーティションのすべての読み取りと書き込みを担当するノードです。 すべてのパーティションには、リーダーとして機能するサーバーが1つあります。

10

Follower

リーダーの指示に従うノードは、フォロワーと呼ばれます。 リーダーが失敗すると、フォロワーの1人が自動的に新しいリーダーになります。 フォロワーは、通常のコンシューマーとして機能し、メッセージをプルし、独自のデータストアを更新します。

Apache Kafka-クラスターアーキテクチャ

次の図をご覧ください。 Kafkaのクラスター図を示しています。

クラスターアーキテクチャ

次の表は、上の図に示されている各コンポーネントについて説明しています。

S.No Components and Description
1

Broker

Kafkaクラスターは通常、負荷分散を維持するために複数のブローカーで構成されます。 Kafkaブローカーはステートレスなので、ZooKeeperを使用してクラスターの状態を維持します。 1つのKafkaブローカーインスタンスは1秒あたり数十万の読み取りと書き込みを処理でき、各ブローカーはパフォーマンスに影響を与えることなくTBのメッセージを処理できます。 Kafkaブローカーリーダーの選出は、ZooKeeperで行うことができます。

2

ZooKeeper

ZooKeeperは、Kafkaブローカーの管理と調整に使用されます。 ZooKeeperサービスは主に、Kafkaシステム内の新しいブローカーの存在またはKafkaシステム内のブローカーの障害について、生産者と消費者に通知するために使用されます。 ブローカーの存在または障害に関してZookeeperが受信した通知に従って、生産者と消費者が決定を下し、他のブローカーとのタスクの調整を開始します。

3

Producers

プロデューサーはブローカーにデータをプッシュします。 新しいブローカーが開始されると、すべてのプロデューサーがそれを検索し、その新しいブローカーにメッセージを自動的に送信します。 Kafkaプロデューサーは、ブローカーからの確認を待たずに、ブローカーが処理できる速度でメッセージを送信します。

4

Consumers

Kafkaブローカーはステートレスであるため、消費者はパーティションオフセットを使用して消費されたメッセージの数を維持する必要があります。 コンシューマが特定のメッセージオフセットを確認した場合、それはコンシューマが以前のすべてのメッセージを消費したことを意味します。 コンシューマは、非同期プル要求をブローカに発行して、バイトのバッファをすぐに使用できるようにします。 コンシューマは、オフセット値を指定するだけで、パーティション内の任意のポイントに巻き戻したりスキップしたりできます。 コンシューマオフセット値は、ZooKeeperによって通知されます。

Apache Kafka-ワークフロー

今のところ、Kafkaのコアコンセプトについて説明しました。 次に、Kafkaのワークフローに光を当てましょう。

Kafkaは、1つ以上のパーティションに分割されたトピックのコレクションです。 Kafkaパーティションは、線形に順序付けられた一連のメッセージであり、各メッセージはインデックス(オフセットと呼ばれる)によって識別されます。 Kafkaクラスター内のすべてのデータは、分割されたパーティションの結合です。 着信メッセージはパーティションの最後に書き込まれ、メッセージはコンシューマーによって順番に読み取られます。 異なるブローカーにメッセージを複製することにより、耐久性が提供されます。

Kafkaは、pub-subとキューベースの両方のメッセージングシステムを、高速で信頼性の高い永続的なフォールトトレランスとゼロダウンタイムで提供します。 どちらの場合も、プロデューサーはトピックにメッセージを送信するだけで、コンシューマーは必要に応じて任意の種類のメッセージングシステムを選択できます。 次のセクションの手順に従って、消費者が選択したメッセージングシステムを選択する方法を理解しましょう。

Pub-Subメッセージングのワークフロー

以下は、Pub-Subメッセージングの段階的なワークフローです-

  • プロデューサーは定期的にトピックにメッセージを送信します。
  • Kafkaブローカーは、その特定のトピック用に構成されたパーティションにすべてのメッセージを保存します。 これにより、メッセージがパーティション間で均等に共有されます。 プロデューサーが2つのメッセージを送信し、2つのパーティションがある場合、Kafkaは1つのメッセージを最初のパーティションに保存し、2番目のメッセージを2番目のパーティションに保存します。
  • 消費者は特定のトピックにサブスクライブします。
  • 消費者がトピックをサブスクライブすると、Kafkaは消費者にトピックの現在のオフセットを提供し、Zookeeperアンサンブルにオフセットを保存します。
  • 消費者は、新しいメッセージについて定期的に(100 Msなど)Kafkaを要求します。
  • Kafkaはプロデューサーからメッセージを受信すると、これらのメッセージをコンシューマーに転送します。
  • 消費者はメッセージを受信して​​処理します。
  • メッセージが処理されると、コンシューマーはKafkaブローカーに確認を送信します。
  • Kafkaは確認を受け取ると、オフセットを新しい値に変更し、Zookeeperで更新します。 Zookeeperではオフセットが維持されるため、消費者はサーバーの怒りがあっても次のメッセージを正しく読むことができます。
  • このフローは、コンシューマーがリクエストを停止するまで繰り返されます。
  • 消費者は、いつでもトピックの目的のオフセットに巻き戻し/スキップして、後続のすべてのメッセージを読むことができます。

キューメッセージング/コンシューマグループのワークフロー

単一のコンシューマではなくキューメッセージングシステムでは、同じ「グループID」を持つコンシューマのグループがトピックをサブスクライブします。 簡単に言えば、同じ「グループID」を持つトピックにサブスクライブしているコンシューマーは単一のグループと見なされ、メッセージはそれらの間で共有されます。 このシステムの実際のワークフローを確認しましょう。

  • プロデューサーは、定期的にトピックにメッセージを送信します。
  • Kafkaは、以前のシナリオと同様に、特定のトピック用に構成されたパーティションにすべてのメッセージを保存します。
  • 単一のコンシューマーは特定のトピックにサブスクライブし、「 Group ID」が「` Group-1`」である「 Topic-01」を想定しています。
  • Kafkaは、新しいコンシューマが「 Group-1」と同じ「` Group ID`」で同じトピック「 Topic-01」をサブスクライブするまで、Pub-Subメッセージングと同じ方法でコンシューマと対話します。
  • 新しい消費者が到着すると、Kafkaは動作を共有モードに切り替え、2つの消費者間でデータを共有します。 この共有は、消費者の数がその特定のトピック用に構成されたパーティションの数に達するまで続きます。
  • コンシューマの数がパーティションの数を超えると、既存のコンシューマのいずれかがサブスクライブを解除するまで、新しいコンシューマはそれ以上メッセージを受信しません。 このシナリオは、Kafkaの各コンシューマーに少なくとも1つのパーティションが割り当てられ、すべてのパーティションが既存のコンシューマーに割り当てられると、新しいコンシューマーが待機する必要があるために発生します。
  • この機能は、「 Consumer Group」とも呼ばれます。 同様に、Kafkaは両方のシステムの最高の機能を非常にシンプルかつ効率的な方法で提供します。

ZooKeeperの役割

Apache Kafkaの重要な依存関係は、分散構成および同期サービスであるApache Zookeeperです。 Zookeeperは、Kafkaブローカーと消費者の間の調整インターフェイスとして機能します。 Kafkaサーバーは、Zookeeperクラスターを介して情報を共有します。 Kafkaは、トピック、ブローカー、コンシューマーオフセット(キューリーダー)などに関する情報などの基本的なメタデータをZookeeperに保存します。

すべての重要な情報はZookeeperに保存され、通常はアンサンブル全体でこのデータを複製するため、Kafkaブローカー/Zookeeperの障害はKafkaクラスターの状態に影響しません。 Zookeeperが再起動すると、Kafkaは状態を復元します。 これにより、Kafkaのダウンタイムはゼロになります。 Kafkaブローカー間のリーダー選出も、リーダーが失敗した場合にZookeeperを使用して行われます。

Zookeeperの詳細については、リンクを参照してください:/zookeeper/index [zookeeper]

次の章で、Java、ZooKeeper、Kafkaをマシンにインストールする方法についてさらに説明します。

Apache Kafka-インストール手順

Javaをマシンにインストールする手順は次のとおりです。

ステップ1-Javaインストールの検証

マシンにすでにjavaがインストールされていることが望ましいので、次のコマンドを使用して検証するだけです。

$ java -version

Javaがマシンに正常にインストールされている場合、インストールされているJavaのバージョンを確認できます。

ステップ1.1-JDKのダウンロード

Javaがダウンロードされていない場合は、次のリンクにアクセスして最新バージョンのJDKをダウンロードし、最新バージョンをダウンロードしてください。

http://www.oracle.com/technetwork/java/javase/downloads/indexl

現在、最新バージョンはJDK 8u 60で、ファイルは「jdk-8u60-linux-x64.tar.gz」です。 マシンにファイルをダウンロードしてください。

ステップ1.2-ファイルの抽出

通常、ダウンロードされるファイルはダウンロードフォルダーに保存され、それを確認し、次のコマンドを使用してtarセットアップを抽出します。

$ cd/go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

ステップ1.3-最適化ディレクトリに移動する

すべてのユーザーがJavaを使用できるようにするには、抽出されたJavaコンテンツを「 usr/local/java」/フォルダーに移動します。

$ su
password: (type password of root user)
$ mkdir/opt/jdk
$ mv jdk-1.8.0_60/opt/jdk/

ステップ1.4-パスを設定する

パスとJAVA_HOME変数を設定するには、次のコマンドを〜/.bashrcファイルに追加します。

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

すべての変更を現在実行中のシステムに適用します。

$ source ~/.bashrc

ステップ1.5-Javaの代替

次のコマンドを使用して、Java Alternativesを変更します。

update-alternatives --install/usr/bin/java java/opt/jdk/jdk1.8.0_60/bin/java 100
  • ステップ1.6 *-ステップ1で説明した検証コマンド(java -version)を使用してJavaを検証します。

ステップ2-ZooKeeper Frameworkのインストール

ステップ2.1-ZooKeeperのダウンロード

ZooKeeperフレームワークをマシンにインストールするには、次のリンクにアクセスして、ZooKeeperの最新バージョンをダウンロードしてください。

http://zookeeper.apache.org/releasesl

現在、ZooKeeperの最新バージョンは3.4.6(ZooKeeper-3.4.6.tar.gz)です。

ステップ2.2-tarファイルを抽出する

次のコマンドを使用してtarファイルを抽出します

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

ステップ2.3-構成ファイルの作成

コマンドvi“ conf/zoo.cfg”と開始点として設定する以下のすべてのパラメーターを使用して、「 conf/zoo.cfg」という名前の設定ファイルを開きます。

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

構成ファイルが正常に保存され、再びターミナルに戻ったら、zookeeperサーバーを起動できます。

ステップ2.4-ZooKeeper Serverを起動します

$ bin/zkServer.sh start

このコマンドを実行した後、以下に示すように応答を取得します-

$ JMX enabled by default
$ Using config:/Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

ステップ2.5-CLIの開始

$ bin/zkCli.sh

上記のコマンドを入力すると、zookeeperサーバーに接続され、以下の応答が返されます。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

ステップ2.6-Zookeeperサーバーを停止する

サーバーを接続し、すべての操作を実行した後、次のコマンドでzookeeperサーバーを停止することができます-

$ bin/zkServer.sh stop

これで、JavaとZooKeeperがマシンに正常にインストールされました。 Apache Kafkaをインストールする手順を見てみましょう。

ステップ3-Apache Kafkaのインストール

次の手順に進んで、Kafkaをマシンにインストールします。

ステップ3.1-Kafkaのダウンロード

あなたのマシンにKafkaをインストールするには、以下のリンクをクリックしてください-

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

これで、最新バージョン、つまり、 kafka_2.11_0.9.0.0.tgz がマシンにダウンロードされます。

ステップ3.2-tarファイルを抽出する

次のコマンドを使用してtarファイルを抽出します-

$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

これで、マシンにKafkaの最新バージョンがダウンロードされました。

ステップ3.3-サーバーの起動

次のコマンドを与えることにより、サーバーを起動することができます-

$ bin/kafka-server-start.sh config/server.properties

サーバーが起動すると、画面に次の応答が表示されます-

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

ステップ4-サーバーの停止

すべての操作を実行した後、次のコマンドを使用してサーバーを停止できます-

$ bin/kafka-server-stop.sh config/server.properties

Kafkaのインストールについてはすでに説明したので、次の章でKafkaで基本的な操作を実行する方法を学習できます。

Apache Kafka-基本操作

最初に「単一ノード-単一ブローカー」構成の実装を開始してから、セットアップを単一ノード-複数ブローカー構成に移行します。

これで、Java、ZooKeeper、Kafkaがマシンにインストールされたことを願っています。 Kafka ClusterはZooKeeperを使用するため、Kafka Cluster Setupに移行する前に、まずZooKeeperを起動する必要があります。

ZooKeeperを起動します

新しいターミナルを開き、次のコマンドを入力します-

bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka Brokerを起動するには、次のコマンドを入力します-

bin/kafka-server-start.sh config/server.properties

Kafka Brokerを起動した後、ZooKeeperターミナルでコマンド「 jps」を入力すると、次の応答が表示されます-

821 QuorumPeerMain
928 Kafka
931 Jps

これで、QuorumPeerMainがZooKeeperデーモンで、もう1つがKafkaデーモンであるターミナルで実行されている2つのデーモンを確認できます。

単一ノード-単一ブローカー構成

この構成では、単一のZooKeeperおよびブローカーIDインスタンスがあります。 それを設定する手順は次のとおりです-

  • Kafkaトピックの作成*-Kafkaは、サーバー上にトピックを作成するための「 kafka-topics.sh」という名前のコマンドラインユーティリティを提供します。 新しいターミナルを開き、次の例を入力します。

構文

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka

1つのパーティションと1つのレプリカファクターを持つ「 Hello-Kafka」という名前のトピックを作成しました。 上記の作成された出力は、次の出力に似ています-

出力-作成されたトピック「 Hello-Kafka

トピックが作成されると、Kafkaブローカーターミナルウィンドウで通知を取得し、config/server.propertiesファイルの「/tmp/kafka-logs/」で指定された作成済みトピックのログを取得できます。

トピックス一覧

Kafkaサーバーのトピックのリストを取得するには、次のコマンドを使用できます-

構文

bin/kafka-topics.sh --list --zookeeper localhost:2181

出力

Hello-Kafka

トピックを作成したので、「 Hello-Kafka」のみがリストされます。 複数のトピックを作成する場合、出力でトピック名を取得するとします。

Producerを起動してメッセージを送信する

構文

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

上記の構文から、プロデューサーコマンドラインクライアントには2つの主要なパラメーターが必要です-

*Broker-list* -メッセージの送信先のブローカーのリスト。 この場合、ブローカーは1つしかありません。 Config/server.propertiesファイルにはブローカーポートIDが含まれています。これは、ブローカーがポート9092でリッスンしていることがわかっているため、直接指定できます。

トピック名-これはトピック名の例です。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

プロデューサーは、stdinからの入力を待機し、Kafkaクラスターに公開します。 デフォルトでは、すべての新しい行は新しいメッセージとして公開され、デフォルトのプロデューサーのプロパティは「 config/producer.properties」ファイルで指定されます。 次のように、ターミナルで数行のメッセージを入力できます。

出力

$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

コンシューマーを起動してメッセージを受信する

プロデューサーと同様に、デフォルトのコンシューマープロパティは「 config/consumer.proper-ties」ファイルで指定されます。 新しいターミナルを開き、メッセージを消費するための以下の構文を入力します。

構文

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning

出力

Hello
My first message
My second message

最後に、生産者の端末からメッセージを入力し、消費者の端末に表示されるのを確認できます。 現時点では、単一のブローカーを持つ単一ノードクラスターについて非常によく理解しています。 次に、複数のブローカー構成に進みましょう。

単一ノード-複数ブローカー構成

複数のブローカーのクラスター設定に進む前に、まずZooKeeperサーバーを起動します。

複数のKafkaブローカーを作成-con-fig/server.propertiesにすでに1つのKafkaブローカーインスタンスがあります。 複数のブローカーインスタンスが必要になったため、既存のserver.prop-ertiesファイルを2つの新しい構成ファイルにコピーし、server-one.propertiesおよびserver-two.prop-ertiesに名前を変更します。 次に、両方の新しいファイルを編集し、次の変更を割り当てます-

config/server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config/server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

複数のブローカーを開始-3つのサーバーですべての変更を行った後、3つの新しいターミナルを開いて各ブローカーを1つずつ開始します。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

これで、3つの異なるブローカーがマシン上で実行されました。 ZooKeeperターミナルで jps と入力して、自分ですべてのデーモンをチェックしてみてください。応答が表示されます。

トピックを作成する

3つの異なるブローカーが実行されているため、このトピックのレプリケーション係数値を3として割り当てましょう。 2つのブローカーがある場合、割り当てられたレプリカ値は2になります。

構文

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication

出力

created topic “Multibrokerapplication”

Describe」コマンドは、以下に示すように、現在作成されているトピックをリッスンしているブローカーを確認するために使用されます-

bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation

出力

bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1
ReplicationFactor:3 Configs:

Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1

上記の出力から、最初の行はすべてのパーティションの概要を提供し、トピック名、パーティション数、および既に選択したレプリケーション係数を示していると結論付けることができます。 2行目では、各ノードがパーティションのランダムに選択された部分のリーダーになります。

この場合、最初のブローカー(broker.id 0)がリーダーであることがわかります。 レプリカ:0,2,1は、すべてのブローカーがトピックを最終的に複製することを意味します。最終的に「 Isr」は「` in-sync`」レプリカのセットです。 まあ、これは現在生きており、リーダーに追いついたレプリカのサブセットです。

Producerを起動してメッセージを送信する

この手順は、単一ブローカーのセットアップの場合と同じです。

bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication

出力

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

コンシューマーを起動してメッセージを受信する

この手順は、単一ブローカーのセットアップで示したものと同じままです。

bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning

出力

bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

基本的なトピック操作

この章では、さまざまな基本的なトピック操作について説明します。

トピックの変更

Kafka Clusterでトピックを作成する方法を既に理解しているように。 次のコマンドを使用して、作成したトピックを変更しましょう

構文

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count

We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2

出力

WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

トピックを削除する

トピックを削除するには、次の構文を使用できます。

構文

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

出力

> Topic Hello-kafka marked for deletion

注- delete.topic.enable がtrueに設定されていない場合、影響はありません

Apache Kafka-シンプルなプロデューサーの例

Javaクライアントを使用してメッセージをパブリッシュおよびコンシュームするためのアプリケーションを作成してみましょう。 Kafkaプロデューサークライアントは、次のAPIで構成されています。

KafkaProducer API

このセクションでKafkaプロデューサーAPIの最も重要なセットを理解しましょう。 KafkaProducer APIの中心部分は「 KafkaProducer」クラスです。 KafkaProducerクラスは、コンストラクターでKafkaブローカーを次のメソッドに接続するオプションを提供します。

  • KafkaProducerクラスは、トピックに非同期でメッセージを送信するためのsendメソッドを提供します。 send()の署名は次のとおりです。
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
  • ProducerRecord -プロデューサーは、送信を待機しているレコードのバッファーを管理します。
  • Callback -レコードがサーバーによって承認されたときに実行するユーザー指定のコールバック(nullはコールバックがないことを示します)。
  • KafkaProducerクラスは、以前に送信されたすべてのメッセージが実際に完了したことを確認するフラッシュメソッドを提供します。 フラッシュメソッドの構文は次のとおりです-
public void flush()
  • KafkaProducerクラスは、特定のトピックのパーティションメタデータを取得するのに役立つpartitionForメソッドを提供します。 これは、カスタムパーティションに使用できます。 このメソッドのシグネチャは次のとおりです-
public Map metrics()

プロデューサーによって維持されている内部メトリックのマップを返します。

  • public void close()-KafkaProducerクラスは、以前に送信されたすべての要求が完了するまで、closeメソッドブロックを提供します。

プロデューサーAPI

Producer APIの中心部分は「 Producer」クラスです。 Producerクラスには、次のメソッドによってコンストラクターでKafkaブローカーを接続するオプションがあります。

プロデューサークラス

プロデューサークラスは、次の署名を使用して、単一または複数のトピックにメッセージを送信するためのsendメソッドを提供します。

public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

プロデューサーには、 SyncAsync の2つのタイプがあります。

同じAPI設定が「 Sync」プロデューサーにも適用されます。 それらの違いは、同期プロデューサーはメッセージを直接送信しますが、メッセージをバックグラウンドで送信することです。 より高いスループットが必要な場合は、非同期プロデューサが優先されます。 0.8などの以前のリリースでは、非同期プロデューサーには、エラーハンドラーを登録するためのsend()のコールバックがありません。 これは、0.9の現在のリリースでのみ使用可能です。

public void close()

プロデューサークラスは、すべてのKafkaブローカーへのプロデューサープール接続を閉じる close メソッドを提供します。

構成設定

Producer APIの主要な構成設定は、理解を深めるために次の表にリストされています-

S.No Configuration Settings and Description
1

client.id

プロデューサーアプリケーションを識別します

2

producer.type

同期または非同期

3

acks

acks configは、プロデューサーリクエストが完全であると見なされる条件を制御します。

4

retries

プロデューサーのリクエストが失敗した場合、特定の値で自動的に再試行します。

5

bootstrap.servers

ブローカーのブートストラップリスト。

6

linger.ms

リクエストの数を減らしたい場合は、linger.msをある値より大きい値に設定できます。

7

key.serializer

シリアライザーインターフェイスのキー。

8

value.serializer

シリアライザーインターフェイスの値。

9

batch.size

バッファサイズ。

10

buffer.memory

プロデューサーがバフリングに使用できるメモリの総量を制御します。

ProducerRecord API

ProducerRecordは、Kafka cluster.ProducerRecordクラスコンストラクターに送信されるキー/値のペアで、次の署名を使用してパーティション、キー、および値のペアを含むレコードを作成します。

public ProducerRecord (string topic, int partition, k key, v value)
  • トピック-レコードに追加されるユーザー定義のトピック名。
  • パーティション-パーティション数
  • Key -レコードに含まれるキー。
  • -記録内容
public ProducerRecord (string topic, k key, v value)

ProducerRecordクラスコンストラクターは、キー、値のペア、およびパーティションなしでレコードを作成するために使用されます。

  • トピック-レコードを割り当てるトピックを作成します。
  • Key -レコードのキー。
  • -内容を記録します。
public ProducerRecord (string topic, v value)

ProducerRecordクラスは、パーティションとキーなしでレコードを作成します。

  • トピック-トピックを作成します。
  • -内容を記録します。

ProducerRecordクラスのメソッドは、次の表に記載されています-

S.No Class Methods and Description
1

public string topic()

トピックがレコードに追加されます。

2

public K key()

レコードに含まれるキー。 そのようなキーがない場合、ここでnullが返されます。

3

public V value()

記録内容。

4

partition()

レコードのパーティション数

SimpleProducerアプリケーション

アプリケーションを作成する前に、まずZooKeeperとKafkaブローカーを起動してから、create topicコマンドを使用してKafkaブローカーに独自のトピックを作成します。 その後、「 Sim-pleProducer.java」という名前のJavaクラスを作成し、次のコーディングを入力します。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {

   public static void main(String[] args) throws Exception{

     //Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }

     //Assign topicName to string variable
      String topicName = args[0].toString();

     //create instance for properties to access producer configs
      Properties props = new Properties();

     //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");

     //Set acknowledgements for producer requests.
      props.put("acks", “all");

     //If the request fails, the producer can automatically retry,
      props.put("retries", 0);

     //Specify buffer size in config
      props.put("batch.size", 16384);

     //Reduce the no of requests less than 0
      props.put("linger.ms", 1);

     //The buffer.memory controls the total amount of memory available to the producer for buffering.
      props.put("buffer.memory", 33554432);

      props.put("key.serializer",
         "org.apache.kafka.common.serializa-tion.StringSerializer");

      props.put("value.serializer",
         "org.apache.kafka.common.serializa-tion.StringSerializer");

      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);

      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName,
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

コンパイル-アプリケーションは、次のコマンドを使用してコンパイルできます。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

実行-アプリケーションは、次のコマンドを使用して実行できます。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

出力

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

単純な消費者の例

現在のところ、Kafkaクラスターにメッセージを送信するプロデューサーを作成しています。 次に、Kafkaクラスターからのメッセージを消費するコンシューマーを作成します。 KafkaConsumer APIは、Kafkaクラスターからのメッセージを消費するために使用されます。 KafkaConsumerクラスのコンストラクターを以下に定義します。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
*configs* -コンシューマー構成のマップを返します。

KafkaConsumerクラスには、以下の表にリストされている以下の重要なメソッドがあります。

S.No Method and Description
1

public java.util.Set<TopicPar-tition> assignment()

消費者によって現在割り当てられているパーティションのセットを取得します。

2

public string subscription()

指定されたトピックのリストをサブスクライブして、割り当てられたパーティションを動的に取得します。

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

指定されたトピックのリストをサブスクライブして、割り当てられたパーティションを動的に取得します。

4

public void unsubscribe()

指定されたパーティションのリストからトピックをサブスクライブ解除します。

5

public void sub-scribe(java.util.List<java.lang.String> topics)

指定されたトピックのリストをサブスクライブして、割り当てられたパーティションを動的に取得します。 指定されたトピックのリストが空の場合、unsubscribe()と同じように扱われます。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

引数パターンは、正規表現の形式でサブスクライブパターンを参照し、リスナー引数はサブスクライブパターンから通知を取得します。

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

パーティションのリストを顧客に手動で割り当てます。

8

poll()

サブスクライブ/割り当てAPIのいずれかを使用して指定されたトピックまたはパーティションのデータを取得します。 データのポーリング前にトピックがサブスクライブされていない場合、これはエラーを返します。

9

public void commitSync()

トピックおよびパーティションのすべてのサブスクライブ済みリストの最後のpoll()で返されたコミットオフセット。 同じ操作がcommitAsyn()に適用されます。

10

public void seek(TopicPartition partition, long offset)

コンシューマが次のpoll()メソッドで使用する現在のオフセット値を取得します。

11

public void resume()

一時停止したパーティションを再開します。

12

public void wakeup()

消費者を起こします。

ConsumerRecord API

ConsumerRecord APIは、Kafkaクラスターからレコードを受信するために使用されます。 このAPIは、レコードの受信元であるトピック名、パーティション番号、およびKafkaパーティション内のレコードを指すオフセットで構成されます。 ConsumerRecordクラスは、特定のトピック名、パーティションカウント、および<key、value>のペアを持つコンシューマレコードを作成するために使用されます。 次のシグネチャがあります。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • トピック-Kafkaクラスターから受信したコンシューマーレコードのトピック名。
  • Partition -トピックのパーティション。
  • Key -キーが存在しない場合はレコードのキーが返されますnull。
  • -内容を記録します。

ConsumerRecords API

ConsumerRecords APIは、ConsumerRecordのコンテナーとして機能します。 このAPIは、特定のトピックのパーティションごとにConsumerRecordのリストを保持するために使用されます。 そのコンストラクタは以下で定義されます。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition -特定のトピックのパーティションのマップを返します。
  • レコード-ConsumerRecordのリストを返します。

ConsumerRecordsクラスには、次のメソッドが定義されています。

S.No Methods and Description
1

public int count()

すべてのトピックのレコード数。

2

public Set partitions()

このレコードセットにデータがあるパーティションのセット(データが返されなかった場合、セットは空です)。

3

public Iterator iterator()

イテレータを使用すると、コレクションを循環して、要素を取得または削除できます。

4

public List records()

指定されたパーティションのレコードのリストを取得します。

構成設定

コンシューマクライアントAPIのメイン構成設定の構成設定は以下のとおりです-

S.No Settings and Description
1

bootstrap.servers

ブローカーのブートストラップリスト。

2

group.id

個々の消費者をグループに割り当てます。

3

enable.auto.commit

値がtrueの場合はオフセットの自動コミットを有効にし、そうでない場合はコミットしません。

4

auto.commit.interval.ms

更新された消費オフセットがZooKeeperに書き込まれる頻度を返します。

5

session.timeout.ms

KafkaがZooKeeperがリクエストに応答する(読み取りまたは書き込み)のを待ってからメッセージを消費し続けるまでのミリ秒数を示します。

SimpleConsumerアプリケーション

プロデューサーアプリケーションの手順はここでも同じです。 まず、ZooKeeperとKafkaブローカーを起動します。 次に、「 SimpleCon-sumer.java」という名前のjavaクラスを使用して「` SimpleConsumer`」アプリケーションを作成し、次のコードを入力します。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
     //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);

     //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))

     //print the topic name
      System.out.println("Subscribed to topic " &plus; topicName);
      int i = 0;

      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)

        //print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n",
            record.offset(), record.key(), record.value());
      }
   }
}

コンパイル-アプリケーションは、次のコマンドを使用してコンパイルできます。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
  • 実行-*次のコマンドを使用してアプリケーションを実行できます
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

入力-プロデューサーCLIを開き、トピックにメッセージを送信します。 smple入力を「Hello Consumer」として入力できます。

出力-以下が出力になります。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

Apache Kafka-消費者グループの例

消費者グループは、Kafkaトピックのマルチスレッドまたはマルチマシンの消費です。

消費者グループ

  • 消費者は同じ「 group.id.」を使用してグループに参加できます
  • グループの最大並列性は、グループ内のコンシューマの数←パーティションの数です。
  • Kafkaは、トピックのパーティションをグループ内のコンシューマーに割り当てます。これにより、各パーティションは、グループ内の1つのコンシューマーのみによって消費されます。
  • Kafkaは、グループ内の1人の消費者のみがメッセージを読むことを保証します。
  • 消費者は、ログに保存された順序でメッセージを見ることができます。

消費者のリバランス

さらにプロセス/スレッドを追加すると、Kafkaのバランスが再調整されます。 消費者またはブローカーがZooKeeperへのハートビートの送信に失敗した場合、Kafkaクラスターを介して再構成できます。 この再バランスの間に、Kafkaは使用可能なパーティションを使用可能なスレッドに割り当て、場合によってはパーティションを別のプロセスに移動します。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }

      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " &plus; topic);
      int i = 0;

      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n",
               record.offset(), record.key(), record.value());
      }
   }
}

編集

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

実行

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group

ここでは、2つのコンシューマを持つ「 my-group」としてサンプルグループ名を作成しました。 同様に、グループとグループ内の消費者の数を作成できます。

入力

プロデューサーCLIを開き、次のようなメッセージを送信します-

Test consumer group 01
Test consumer group 02

最初のプロセスの出力

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

2番目のプロセスの出力

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Javaクライアントデモを使用してSimpleConsumerとConsumeGroupを理解できたと思います。 これで、Javaクライアントを使用してメッセージを送受信する方法がわかりました。 次の章で、ビッグデータテクノロジーとのKafkaの統合を続けましょう。

Apache Kafka-Stormとの統合

この章では、KafkaをApache Stormと統合する方法を学びます。

ストームについて

Stormは元々Nathan MarzとBackTypeのチームによって作成されました。 短時間で、Apache Stormは膨大な量のデータを処理できる分散リアルタイム処理システムの標準になりました。 Stormは非常に高速であり、ベンチマークでは、ノードごとに1秒あたり100万を超えるタプルが処理されました。 Apache Stormは継続的に実行され、構成されたソース(Spouts)からデータを消費し、データを処理パイプライン(Bolts)に渡します。 組み合わせて、スパウトとボルトがトポロジを作成します。

Stormとの統合

KafkaとStormは自然にお互いを補完し、強力な協力により、高速で移動するビッグデータのリアルタイムストリーミング分析が可能になります。 KafkaとStormの統合は、開発者がStormトポロジからデータストリームを簡単に取り込み、公開できるようにすることです。

概念フロー

注ぎ口は、ストリームのソースです。 たとえば、スパウトは、Kafkaトピックからタプルを読み取り、ストリームとして放出する場合があります。 ボルトは入力ストリームを消費し、処理し、場合によっては新しいストリームを放出します。 Boltsは、関数の実行、タプルのフィルタリング、ストリーミング集約、ストリーミング結合、データベースとの対話など、あらゆることを実行できます。 Stormトポロジの各ノードは並行して実行されます。 トポロジは、終了するまで無期限に実行されます。 Stormは、失敗したタスクを自動的に再割り当てします。 さらに、Stormは、マシンがダウンしてメッセージがドロップされた場合でも、データの損失がないことを保証します。

Kafka-Storm統合APIの詳細を見ていきましょう。 KafkaをStormに統合するには、3つの主要なクラスがあります。 彼らは次のとおりです-

BrokerHosts-ZkHostsおよびStaticHosts

BrokerHostsはインターフェースであり、ZkHostsとStaticHostsはその2つの主要な実装です。 ZkHostsは、ZooKeeperで詳細を維持することでKafkaブローカーを動的に追跡するために使用され、StaticHostsは、Kafkaブローカーとその詳細を手動/静的に設定するために使用されます。 ZkHostsは、Kafkaブローカーにアクセスするためのシンプルで高速な方法です。

ZkHostsの署名は次のとおりです-

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

brokerZkStrはZooKeeperホストであり、brokerZkPathはKafkaブローカーの詳細を保持するZooKeeperパスです。

KafkaConfig API

このAPIは、Kafkaクラスターの構成設定を定義するために使用されます。 Kafka Con-figの署名は次のように定義されます

public KafkaConfig(BrokerHosts hosts, string topic)

SpoutConfig API

Spoutconfigは、追加のZooKeeper情報をサポートするKafkaConfigの拡張機能です。

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts -BrokerHostsは、BrokerHostsインターフェイスの任意の実装にすることができます
  • トピック-トピック名。
  • zkRoot -ZooKeeperのルートパス。
  • * id-*注ぎ口は、Zookeeperで消費されたオフセットの状態を保存します。 idは、スパウトを一意に識別する必要があります。

SchemeAsMultiScheme

SchemeAsMultiSchemeは、Kafkaから消費されたByteBufferがストームタプルに変換される方法を指示するインターフェイスです。 MultiSchemeから派生し、Schemeクラスの実装を受け入れます。 Schemeクラスには多くの実装があり、そのような実装の1つにStringSchemeがあります。これは、バイトを単純な文字列として解析します。 また、出力フィールドの命名も制御します。 署名は次のように定義されます。

public SchemeAsMultiScheme(Scheme scheme)
  • スキーム-kafkaから消費されたバイトバッファ。

KafkaSpout API

KafkaSpoutは、Stormと統合するスパウト実装です。 kafkaトピックからメッセージを取得し、タプルとしてStormエコシステムに送信します。 KafkaSpoutは、SpoutConfigから構成の詳細を取得します。

以下は、単純なKafkaスパウトを作成するサンプルコードです。

//ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

ボルト作成

Boltは、タプルを入力として受け取り、タプルを処理し、出力として新しいタプルを生成するコンポーネントです。 ボルトはIRichBoltインターフェイスを実装します。 このプログラムでは、2つのボルトクラスWordSplitter-BoltとWordCounterBoltを使用して操作を実行します。

IRichBoltインターフェイスには次のメソッドがあります-

  • 準備-実行する環境をボルトに提供します。 エグゼキュータはこのメソッドを実行して、スパウトを初期化します。
  • 実行-入力の単一タプルを処理します。
  • クリーンアップ-ボルトがシャットダウンするときに呼び出されます。
  • declareOutputFields -タプルの出力スキーマを宣言します。

文を単語に分割するロジックを実装するSplitBolt.javaと、一意の単語を分離してその出現回数をカウントするロジックを実装するCountBolt.javaを作成しましょう。

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");

      for(String word: words) {
         word = word.trim();

         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }

      }

      collector.ack(input);
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;

   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);

      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }

      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()&plus;" : " &plus; entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {

   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

トポロジへの送信

Stormトポロジは、基本的にThrift構造です。 TopologyBuilderクラスは、複雑なトポロジを作成するためのシンプルで簡単なメソッドを提供します。 TopologyBuilderクラスには、スパウトを設定する(setSpout)メソッドとボルトを設定する(setBolt)メソッドがあります。 最後に、TopologyBuilderには、トポロジを作成するcreateTopologyがあります。 shuffleGroupingおよびfieldsGroupingメソッドは、スパウトとボルトのストリームグループを設定するのに役立ちます。

ローカルクラスター-開発の目的で、「 LocalCluster」オブジェクトを使用してローカルクラスターを作成し、「` LocalCluster`」クラスの「 submitTopology」メソッドを使用してトポロジを送信できます。

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);

      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 *1024* 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 *1024* 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);

      cluster.shutdown();
   }
}

コンパイルを移動する前に、Kakfa-Storm統合にはキュレーターであるZooKeeperクライアントJavaライブラリが必要です。 キュレーターバージョン2.9.1は、Apache Stormバージョン0.9.5(このチュートリアルで使用)をサポートしています。 以下に指定されたjarファイルをダウンロードし、javaクラスパスに配置します。

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

依存関係ファイルを含めた後、次のコマンドを使用してプログラムをコンパイルします。

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

実行

Kafka Producer CLI(前の章で説明)を起動し、「 my-first-topic」という新しいトピックを作成し、以下に示すようにサンプルメッセージを提供します-

hello
kafka
storm
spark
test message
another test message

今、次のコマンドを使用してアプリケーションを実行します-

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

このアプリケーションのサンプル出力は以下に指定されています-

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2

Apache Kafka-Sparkとの統合

この章では、Apache KafkaをSpark Streaming APIと統合する方法について説明します。

Sparkについて

Spark Streaming APIは、ライブデータストリームのスケーラブルで高スループットのフォールトトレラントストリーム処理を可能にします。 データは、Kafka、Flume、Twitterなどの多くのソースから取得でき、map、reduce、join、windowなどの高レベル関数などの複雑なアルゴリズムを使用して処理できます。 最後に、処理されたデータをファイルシステム、データベース、ライブダッシュボードにプッシュできます。 弾力性のある分散データセット(RDD)は、Sparkの基本的なデータ構造です。 オブジェクトの不変の分散コレクションです。 RDDの各データセットは論理パーティションに分割され、クラスターの異なるノードで計算できます。

Sparkとの統合

Kafkaは、Sparkストリーミング用の潜在的なメッセージングおよび統合プラットフォームです。 Kafkaは、リアルタイムのデータストリームの中央ハブとして機能し、Spark Streamingの複雑なアルゴリズムを使用して処理されます。 データが処理されると、Spark Streamingはさらに別のKafkaトピックに結果を公開したり、HDFS、データベース、ダッシュボードに保存したりできます。 次の図は、概念的なフローを示しています。

Sparkとの統合

それでは、Kafka-Spark APIの詳細を見ていきましょう。

SparkConf API

Sparkアプリケーションの構成を表します。 さまざまなSparkパラメーターをキーと値のペアとして設定するために使用されます。

SparkConf」クラスには次のメソッドがあります-

  • * set(string key、string value)*-設定変数を設定します。
  • * remove(string key)*-設定からキーを削除します。
  • * setAppName(string name)*-アプリケーションのアプリケーション名を設定します。
  • * get(string key)*-キーを取得

StreamingContext API

これは、Spark機能の主要なエントリポイントです。 SparkContextは、Sparkクラスターへの接続を表し、クラスター上でRDD、アキュムレーター、およびブロードキャスト変数を作成するために使用できます。 署名は次のように定義されます。

public StreamingContext(String master, String appName, Duration batchDuration,
   String sparkHome, scala.collection.Seq<String> jars,
   scala.collection.Map<String,String> environment)
  • master -接続先のクラスターURL(例: mesos://host:port、spark://host:port、local [4])。
  • appName -クラスターWeb UIに表示するジョブの名前
  • batchDuration -ストリーミングデータがバッチに分割される時間間隔
public StreamingContext(SparkConf conf, Duration batchDuration)

新しいSparkContextに必要な構成を提供して、StreamingContextを作成します。

  • conf -Sparkパラメーター
  • batchDuration -ストリーミングデータがバッチに分割される時間間隔

KafkaUtils API

KafkaUtils APIは、KafkaクラスターをSparkストリーミングに接続するために使用されます。 このAPIには、以下のように定義されたsignifi-cantメソッド「 createStream」署名があります。

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

上記のメソッドは、Kafka Brokersからメッセージをプルする入力ストリームを作成するために使用されます。

  • ssc -StreamingContextオブジェクト。
  • zkQuorum -Zookeeperクォーラム。
  • groupId -このコンシューマのグループID。
  • * topics-消費するトピックのマップを返します。
  • storageLevel -受信したオブジェクトの保存に使用するストレージレベル。

KafkaUtils APIには別のメソッドcreateDirectStreamがあります。これは、レシーバーを使用せずにKafka Brokersからメッセージを直接プルする入力ストリームを作成するために使用されます。 このストリームは、Kafkaからの各メッセージが1回だけ変換に含まれることを保証できます。

サンプルアプリケーションはScalaで実行されます。 アプリケーションをコンパイルするには、 "` sbt` "、scalaビルドツール(mavenに類似)をダウンロードしてインストールしてください。 メインアプリケーションコードを以下に示します。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ &plus; _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

構築スクリプト

spark-kafka統合は、spark、sparkストリーミング、およびspark kafka統合jarに依存しています。 新しいファイル「 build.sbt」を作成し、アプリケーションの詳細とその依存関係を指定します。 「 sbt」は、アプリケーションをコンパイルおよびパックするときに必要なjarをダウンロードします。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

コンパイル/パッケージング

次のコマンドを実行して、アプリケーションのjarファイルをコンパイルおよびパッケージ化します。 アプリケーションを実行するには、jarファイルをスパークコンソールに送信する必要があります。

sbt package

Sparkに送信する

Kafka Producer CLI(前の章で説明)を起動し、「 my-first-topic」という新しいトピックを作成し、以下に示すサンプルメッセージを提供します。

Another spark test message

次のコマンドを実行して、アプリケーションをスパークコンソールに送信します。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

このアプリケーションのサンプル出力を以下に示します。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

リアルタイムアプリケーション(Twitter)

リアルタイムアプリケーションを分析して、最新のtwitterフィードとそのハッシュタグを取得しましょう。 以前、StormおよびSparkとKafkaの統合を見てきました。 どちらのシナリオでも、Kafkaエコシステムにメッセージを送信するために(cliを使用して)Kafkaプロデューサーを作成しました。 次に、ストームとスパークの統合は、Kafkaコンシューマーを使用してメッセージを読み取り、ストームとスパークのエコシステムにそれぞれ注入します。 したがって、実際には、カフカプロデューサーを作成する必要があります。

  • 「Twitter Streaming API」を使用してtwitterフィードを読んでください。
  • フィードを処理し、
  • HashTagsを抽出し、
  • カフカに送ってください。

Kafkaが「ハッシュタグ」を受信すると、Storm/Spark統合が情報を受信し、Storm/Sparkエコシステムに送信します。

Twitter Streaming API

「Twitter Streaming API」には、任意のプログラミング言語でアクセスできます。 「twitter4j」はオープンソースの非公式Javaライブラリであり、「Twitter Streaming API」に簡単にアクセスするためのJavaベースのモジュールを提供します。 「twitter4j」は、ツイートにアクセスするためのリスナーベースのフレームワークを提供します。 「Twitter Streaming API」にアクセスするには、Twitter開発者アカウントにサインインする必要があり、次の OAuth 認証の詳細を取得する必要があります。

  • 顧客キー
  • CustomerSecret
  • アクセストークン
  • AccessTookenSecret

開発者アカウントが作成されたら、「twitter4j」jarファイルをダウンロードして、javaクラスパスに配置します。

完全なTwitter Kafkaプロデューサーコーディング(KafkaTwitterProducer.java)を以下に示します-

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);

      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }

      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {

         @Override
         public void onStatus(Status status) {
            queue.offer(status);

           //System.out.println("@" &plus; status.getUser().getScreenName()
               &plus; " - " &plus; status.getText());
           //System.out.println("@" &plus; status.getUser().getScreen-Name());

           /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

           /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }

         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
           //System.out.println("Got a status deletion notice id:"
               &plus; statusDeletionNotice.getStatusId());
         }

         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
           //System.out.println("Got track limitation notice:" &plus;
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
           //System.out.println("Got scrub_geo event userId:" &plus; userId &plus;
            "upToStatusId:" &plus; upToStatusId);
         }

         @Override
         public void onStallWarning(StallWarning warning) {
           //System.out.println("Got stall warning:" &plus; warning);
         }

         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);

      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);

     //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);

      props.put("key.serializer",
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer",
         "org.apache.kafka.common.serializa-tion.StringSerializer");

      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;

      while(i < 10) {
         Status ret = queue.poll();

         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " &plus; hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

編集

次のコマンドを使用してアプリケーションをコンパイルします-

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

実行

2つのコンソールを開きます。 上記のコンパイル済みアプリケーションを、以下に示すように1つのコンソールで実行します。

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

前の章で説明したSpark/Stormアプリケーションのいずれかを別のwin-dowで実行します。 注意すべき主な点は、どちらの場合も同じトピックを使用する必要があるということです。 ここでは、トピック名として「my-first-topic」を使用しています。

出力

このアプリケーションの出力は、キーワードとTwitterの現在のフィードに依存します。 サンプル出力を以下に指定します(ストーム統合)。

. . .
food : 1
foodie : 2
burger : 1
. . .

Apache Kafka-ツール

「org.apache.kafka.tools。*」の下にパッケージ化されたKafkaツール。 ツールは、システムツールとレプリケーションツールに分類されます。

システムツール

システムツールは、run classスクリプトを使用してコマンドラインから実行できます。 構文は次のとおりです-

bin/kafka-run-class.sh package.class - - options

システムツールのいくつかは以下に記載されています-

  • * Kafka移行ツール*-このツールは、あるバージョンから別のバージョンにブローカーを移行するために使用されます。
  • ミラーメーカー-このツールは、1つのKafkaクラスターを別のクラスターにミラーリングするために使用されます。
  • Consumer Offset Checker -このツールは、指定された一連のトピックおよびコンシューマグループのコンシューマグループ、トピック、パーティション、オフセット、logSize、所有者を表示します。

複製ツール

Kafkaレプリケーションは、高レベルの設計ツールです。 レプリケーションツールを追加する目的は、耐久性と可用性を高めることです。 複製ツールのいくつかは以下に記載されています-

  • トピックツールの作成-これにより、デフォルトのパーティション数、レプリケーションファクターでトピックが作成され、Kafkaのデフォルトスキームを使用してレプリカの割り当てが行われます。
  • トピックトピックツール-このツールは、トピックの特定のリストの情報をリストします。 コマンドラインでトピックが提供されていない場合、ツールはZookeeperに照会してすべてのトピックを取得し、それらの情報をリストします。 ツールが表示するフィールドは、トピック名、パーティション、リーダー、レプリカ、isrです。
  • Add Partition Tool -トピックの作成、トピックのパーティション数を指定する必要があります。 トピックのボリュームが大きくなると、後でトピックにさらにパーティションが必要になる場合があります。 このツールは、特定のトピックにさらにパーティションを追加するのに役立ち、追加されたパーティションのレプリカの手動割り当ても可能にします。

Apache Kafka-アプリケーション

Kafkaは、今日の最高の産業用アプリケーションの多くをサポートしています。 この章では、Kafkaの最も注目すべきアプリケーションの概要を簡単に説明します。

Twitter

Twitterは、ユーザーのツイートを送受信するプラットフォームを提供するオンラインソーシャルネットワーキングサービスです。 登録ユーザーはツイートを読んだり投稿したりできますが、未登録ユーザーはツイートを読むことしかできません。 Twitterは、ストリーム処理インフラストラクチャの一部としてStorm-Kafkaを使用しています。

LinkedIn

Apache Kafkaは、LinkedInでアクティビティストリームデータと運用メトリックに使用されます。 Kafka mes-saging systemは、LinkedIn Newsfeed、LinkedIn Todayなどのさまざまな製品でLinkedInを支援し、オンラインメッセージ消費に加えて、Hadoopなどのオフライン分析システムにも対応しています。 Kafkaの強力な耐久性も、LinkedInに関連する重要な要素の1つです。

Netflix

Netflixは、オンデマンドインターネットストリーミングメディアのアメリカの多国籍プロバイダーです。 Netflixは、リアルタイムの監視とイベント処理にKafkaを使用しています。

Mozilla

Mozillaは、Netscapeのメンバーによって1998年に作成されたフリーソフトウェアコミュニティです。 Kafkaは間もなくMozillaの現在の運用システムの一部を置き換え、テレメトリ、テストパイロットなどのプロジェクトのエンドユーザーのブラウザからパフォーマンスと使用状況のデータを収集します。

オラクル

Oracleは、OSB(Oracle Service Bus)と呼ばれるEnterprise Service Bus製品からKafkaへのネイティブ接続を提供します。これにより、開発者は、OSBの組み込みメディエーション機能を活用して、段階的なデータパイプラインを実装できます。