Apache-storm-quick-guide

提供:Dev Guides
移動先:案内検索

Apache Storm-はじめに

Apache Stormとは何ですか?

Apache Stormは、分散型のリアルタイムビッグデータ処理システムです。 Stormは、耐障害性と水平スケーラブルな方法で大量のデータを処理するように設計されています。 これは、最高の取り込み率の機能を備えたストリーミングデータフレームワークです。 Stormはステートレスですが、Apache ZooKeeperを介して分散環境とクラスターの状態を管理します。 シンプルで、リアルタイムデータに対してあらゆる種類の操作を並行して実行できます。

Apache Stormは、リアルタイムデータ分析のリーダーであり続けています。 Stormはセットアップ、操作が簡単であり、すべてのメッセージが少なくとも1回はトポロジーを介して処理されることを保証します。

Apache StormとHadoop

基本的に、HadoopおよびStormフレームワークは、ビッグデータの分析に使用されます。 両方とも互いに補完し合い、いくつかの点で異なります。 Apache Stormは永続性を除くすべての操作を実行しますが、Hadoopはリアルタイム計算の遅れを除いてすべてに優れています。 次の表は、StormとHadoopの属性を比較しています。

Hadoop

リアルタイムのストリーム処理

バッチ処理

ステートレス

ステートフル

ZooKeeperベースの調整を備えたマスター/スレーブアーキテクチャ。 マスターノードは nimbus と呼ばれ、スレーブは supervisors です。

ZooKeeperベースの調整あり/なしのマスター/スレーブアーキテクチャ。 マスターノードは*ジョブトラッカー*であり、スレーブノードは*タスクトラッカー*です。

Stormストリーミングプロセスは、クラスターで毎秒数万のメッセージにアクセスできます。

Hadoop分散ファイルシステム(HDFS)はMapReduceフレームワークを使用して、数分または数時間かかる膨大な量のデータを処理します。

ストームトポロジは、ユーザーがシャットダウンするか、予期しない回復不能な障害が発生するまで実行されます。

MapReduceジョブは順番に実行され、最終的に完了します。

両方が分散され、フォールトトレラントです

nimbus/スーパバイザが停止した場合、再起動すると停止した場所から継続するため、何も影響を受けません。

JobTrackerが停止すると、実行中のジョブはすべて失われます。

Apache Stormのユースケース

Apache Stormは、リアルタイムのビッグデータストリーム処理で非常に有名です。 このため、ほとんどの企業はStormをシステムの不可欠な部分として使用しています。 いくつかの注目すべき例は次のとおりです-

*Twitter* -Twitterは、「Publisher Analytics製品」の範囲にApache Stormを使用しています。 「Publisher Analytics Products」は、Twitterプラットフォームでのすべてのツイートとクリックを処理します。 Apache StormはTwitterインフラストラクチャと緊密に統合されています。
*NaviSite* -NaviSiteは、イベントログの監視/監査システムにStormを使用しています。 システムで生成されたすべてのログはストームを通過します。 Stormは、構成された正規表現セットに対してメッセージをチェックし、一致する場合、その特定のメッセージはデータベースに保存されます。
*Wego* -Wegoは、シンガポールにある旅行メタ検索エンジンです。 旅行関連のデータは、世界中のさまざまなタイミングからさまざまなタイミングで収集されています。 Stormは、Wegoがリアルタイムデータを検索し、同時実行の問題を解決し、エンドユーザーに最適なものを見つけるのに役立ちます。

Apache Stormの利点

Apache Stormが提供する利点のリストは次のとおりです-

  • Stormはオープンソースで、堅牢で、ユーザーフレンドリーです。 大企業だけでなく中小企業でも利用できます。
  • Stormは耐障害性、柔軟性、信頼性があり、あらゆるプログラミング言語をサポートしています。
  • リアルタイムのストリーム処理を許可します。
  • Stormは、データを処理する能力が非常に大きいため、信じられないほど高速です。
  • Stormは、リソースを直線的に追加することで、負荷が増加してもパフォーマンスを維持できます。 非常にスケーラブルです。
  • Stormはデータの更新とエンドツーエンドの配信応答を問題に応じて数秒または数分で実行します。 レイテンシが非常に低いです。
  • Stormには運用上のインテリジェンスがあります。
  • Stormは、クラスター内の接続されたノードのいずれかが死んだり、メッセージが失われたりしても、保証されたデータ処理を提供します。

Apache Storm-コアコンセプト

Apache Stormは、リアルタイムデータの生ストリームを一方の端から読み取り、それを一連の小さな処理ユニットに渡し、もう一方の端で処理済み/有用な情報を出力します。

次の図は、Apache Stormのコアコンセプトを示しています。

コアコンセプト

Apache Stormのコンポーネントを詳しく見てみましょう-

Components Description
Tuple Tuple is the main data structure in Storm. It is a list of ordered elements. By default, a Tuple supports all data types. Generally, it is modelled as a set of comma separated values and passed to a Storm cluster.
Stream Stream is an unordered sequence of tuples.
Spouts Source of stream. Generally, Storm accepts input data from raw data sources like Twitter Streaming API, Apache Kafka queue, Kestrel queue, etc. Otherwise you can write spouts to read data from datasources. “ISpout" is the core interface for implementing spouts. Some of the specific interfaces are IRichSpout, BaseRichSpout, KafkaSpout, etc.
Bolts Bolts are logical processing units. Spouts pass data to bolts and bolts process and produce a new output stream. Bolts can perform the operations of filtering, aggregation, joining, interacting with data sources and databases. Bolt receives data and emits to one or more bolts. “IBolt” is the core interface for implementing bolts. Some of the common interfaces are IRichBolt, IBasicBolt, etc.

「Twitter分析」のリアルタイムの例を取り上げて、Apache Stormでどのようにモデル化できるかを見てみましょう。 次の図は、構造を示しています。

Twitter分析

「Twitter分析」への入力は、Twitter Streaming APIからのものです。 Spoutは、Twitter Streaming APIを使用してユーザーのツイートを読み取り、タプルのストリームとして出力します。 注ぎ口からの1つのタプルには、twitterユーザー名と、カンマ区切りの値としての1つのツイートがあります。 次に、このタプルのスチームがBoltに転送され、Boltはツイートを個々の単語に分割し、単語数を計算し、構成されたデータソースに情報を保持します。 これで、データソースをクエリすることで簡単に結果を取得できます。

トポロジー

注ぎ口とボルトは互いに接続され、トポロジーを形成します。 リアルタイムアプリケーションロジックは、Stormトポロジ内で指定されます。 簡単に言えば、トポロジは、頂点が計算であり、エッジがデータのストリームである有向グラフです。

単純なトポロジは、スパウトから始まります。 Spoutは、データを1つ以上のボルトに放出します。 ボルトは、最小の処理ロジックを持つトポロジ内のノードを表し、ボルトの出力は入力として別のボルトに出力できます。

Stormは、トポロジを強制終了するまで、常にトポロジを実行し続けます。 Apache Stormの主な仕事は、トポロジーを実行することであり、特定の時間に任意の数のトポロジーを実行します。

タスク

これで、スパウトとボルトの基本的な考え方ができました。 それらはトポロジーの最小の論理ユニットであり、トポロジーは単一のスパウトとボルトの配列を使用して構築されます。 トポロジを正常に実行するには、特定の順序で適切に実行する必要があります。 Stormによるすべてのスパウトとボルトの実行は、「タスク」と呼ばれます。 簡単に言えば、タスクはスパウトまたはボルトの実行です。 所定の時間に、各スパウトとボルトは、複数の個別のスレッドで実行される複数のインスタンスを持つことができます。

労働者

トポロジは、複数のワーカーノードで分散して実行されます。 Stormは、すべてのワーカーノードでタスクを均等に分散します。 ワーカーノードの役割は、ジョブをリッスンし、新しいジョブが到着するたびにプロセスを開始または停止することです。

ストリームのグループ化

データの流れは、スパウトからボルト、またはあるボルトから別のボルトに流れます。 ストリームのグループ化は、トポロジ内でのタプルのルーティング方法を制御し、トポロジ内のタプルフローを理解するのに役立ちます。 以下に説明するように、4つの組み込みグループがあります。

シャッフルグループ

シャッフルグループでは、ボルトを実行するすべてのワーカーに均等な数のタプルがランダムに分散されます。 次の図は、構造を示しています。

シャッフルグループ化

フィールドのグループ化

タプル内の同じ値を持つフィールドはグループ化され、残りのタプルは外部に保持されます。 次に、同じフィールド値を持つタプルが、ボルトを実行している同じワーカーに転送されます。 たとえば、ストリームがフィールド「word」でグループ化されている場合、同じ文字列「Hello」を持つタプルは同じワーカーに移動します。 次の図は、フィールドのグループ化の仕組みを示しています。

フィールドのグループ化

グローバルなグループ化

すべてのストリームをグループ化し、1つのボルトに転送できます。 このグループ化は、ソースのすべてのインスタンスによって生成されたタプルを単一のターゲットインスタンスに送信します(具体的には、IDが最も小さいワーカーを選択します)。

グローバルグループ

すべてのグループ化

すべてのグループ化は、各タプルの単一のコピーを受信ボルトのすべてのインスタンスに送信します。 この種類のグループ化は、ボルトに信号を送信するために使用されます。 すべてのグループ化は、結合操作に役立ちます。

すべてのグループ化

Apache Storm-クラスターアーキテクチャ

Apache Stormの主なハイライトの1つは、フォールトトレラントであり、「Single Point of Failure」(SPOF)分散アプリケーションがなく、高速であることです。 アプリケーションの容量を増やすために必要な数のシステムにApache Stormをインストールできます。

Apache Stormクラスターの設計方法とその内部アーキテクチャを見てみましょう。 次の図は、クラスター設計を示しています。

Zookeeper Framework

Apache Stormには、 Nimbus (マスターノード)と Supervisor (ワーカーノード)の2種類のノードがあります。 NimbusはApache Stormの中心的なコンポーネントです。 Nimbusの主な仕事は、Stormトポロジを実行することです。 Nimbusはトポロジを分析し、実行するタスクを収集します。 次に、使用可能なスーパーバイザーにタスクを配布します。

スーパーバイザには1つ以上のワーカープロセスがあります。 スーパーバイザーはタスクをワーカープロセスに委任します。 ワーカープロセスは、必要な数のエグゼキューターを生成し、タスクを実行します。 Apache Stormは、nimbusとスーパーバイザー間の通信に内部分散メッセージングシステムを使用します。

Components Description
Nimbus Nimbus is a master node of Storm cluster. All other nodes in the cluster are called as worker nodes. Master node is responsible for distributing data among all the worker nodes, assign tasks to worker nodes and monitoring failures.
Supervisor The nodes that follow instructions given by the nimbus are called as Supervisors. A *supervisor *has multiple worker processes and it governs worker processes to complete the tasks assigned by the nimbus.
Worker process A worker process will execute tasks related to a specific topology. A worker process will not run a task by itself, instead it creates* executors* and asks them to perform a particular task. A worker process will have multiple executors.
Executor An executor is nothing but a single thread spawn by a worker process. An executor runs one or more tasks but only for a specific spout or bolt.
Task A task performs actual data processing. So, it is either a spout or a bolt.
ZooKeeper framework

Apache ZooKeeper is a service used by a cluster (group of nodes) to coordinate between themselves and maintaining shared data with robust synchronization techniques. Nimbus is stateless, so it depends on ZooKeeper to monitor the working node status.

ZooKeeperは、スーパーバイザーがnimbusと対話するのを支援します。 Nimbusとスーパーバイザーの状態を維持する責任があります。

ストームは本質的に無国籍です。 ステートレスの性質には欠点がありますが、実際には、Stormがリアルタイムデータを可能な限り迅速に処理するのに役立ちます。

ストームは「完全ではない」状態ではありません。 Apache ZooKeeperに状態を保存します。 状態はApache ZooKeeperで利用できるため、失敗したnimbusを再起動して、元の場所から機能させることができます。 通常、 monit などのサービス監視ツールはNimbusを監視し、障害がある場合は再起動します。

Apache Stormには、状態を維持する Trident Topology と呼ばれる高度なトポロジもあり、Pigなどの高レベルAPIも提供します。 これらすべての機能については、今後の章で説明します。

Apache Storm-ワークフロー

稼働中のStormクラスターには、1つのnimbusと1つ以上のスーパーバイザーが必要です。 もう1つの重要なノードはApache ZooKeeperです。これは、nimbusとスーパーバイザー間の調整に使用されます。

Apache Stormのワークフローを詳しく見てみましょう-

  • 最初に、nimbusは「Storm Topology」が送信されるのを待ちます。
  • トポロジが送信されると、トポロジが処理され、実行されるすべてのタスクとタスクが実行される順序が収集されます。
  • その後、nimbusは、利用可能なすべてのスーパーバイザーにタスクを均等に分散します。
  • 特定の時間間隔で、すべてのスーパーバイザーはハートビートをニンバスに送信して、まだ生きていることを通知します。
  • スーパーバイザーが死亡し、ニンバスにハートビートを送信しない場合、ニンバスは別のスーパーバイザーにタスクを割り当てます。
  • nimbus自体が死ぬと、スーパーバイザーは割り当てられたタスクを問題なく処理します。
  • すべてのタスクが完了すると、スーパーバイザーは新しいタスクが入るのを待ちます。
  • それまでの間、死んだnimbusはサービス監視ツールによって自動的に再起動されます。
  • 再起動したnimbusは、停止した場所から続行します。 同様に、停止したスーパーバイザも自動的に再起動できます。 nimbusとスーパーバイザの両方が自動的に再起動され、両方が以前のように継続するため、Stormはすべてのタスクを少なくとも1回処理することが保証されています。
  • すべてのトポロジーが処理されると、nimbusは新しいトポロジーが到着するのを待ち、同様にスーパーバイザーは新しいタスクを待ちます。

デフォルトでは、Stormクラスタには2つのモードがあります-

  • ローカルモード-このモードは、すべてのトポロジコンポーネントが連携して動作するのを確認する最も簡単な方法であるため、開発、テスト、およびデバッグに使用されます。 このモードでは、さまざまなStorm構成環境でトポロジがどのように実行されるかを確認できるパラメーターを調整できます。 ローカルモードでは、ストームトポロジは単一のJVMのローカルマシンで実行されます。
  • 生産モード-このモードでは、通常は異なるマシンで実行される多くのプロセスで構成される作業ストームクラスターにトポロジを送信します。 stormのワークフローで説明したように、稼働中のクラスターはシャットダウンされるまで無期限に実行されます。

Storm-分散メッセージングシステム

Apache Stormはリアルタイムデータを処理し、通常はメッセージキューシステムから入力されます。 外部の分散メッセージングシステムは、リアルタイムの計算に必要な入力を提供します。 Spoutは、メッセージングシステムからデータを読み取り、それをタプルに変換し、Apache Stormに入力します。 興味深い事実は、Apache Stormはnimbusとスーパーバイザー間の通信に独自の分散メッセージングシステムを内部的に使用していることです。

分散メッセージングシステムとは

分散メッセージングは​​、信頼できるメッセージキューイングの概念に基づいています。 メッセージは、クライアントアプリケーションとメッセージングシステムの間で非同期的にキューに入れられます。 分散メッセージングシステムには、信頼性、スケーラビリティ、および永続性という利点があります。

ほとんどのメッセージングパターンは、 publish-subscribe モデル(単純に Pub-Sub )に従います。メッセージの送信者は publishers 、メッセージを受信したい人は subscribers と呼ばれます。

メッセージが送信者によって公開されると、サブスクライバーはフィルターオプションを使用して、選択したメッセージを受信できます。 通常、2種類のフィルタリングがあります。1つは*トピックベースのフィルタリング*で、もう1つは*コンテンツベースのフィルタリング*です。

pub-subモデルはメッセージを介してのみ通信できることに注意してください。 非常に疎結合のアーキテクチャです。送信者でさえ、サブスクライバーが誰であるかを知りません。 多くのメッセージパターンでは、メッセージブローカーを使用して、多くのサブスクライバーがタイムリーにアクセスできるようにパブリッシュメッセージを交換できます。 実際の例としては、Dish TVがあります。これは、スポーツ、映画、音楽などのさまざまなチャンネルを公開し、誰でも自分のチャンネルセットに登録して、登録済みのチャンネルが利用可能になるといつでも入手できます。

メッセージングシステム

次の表は、いくつかの一般的な高スループットメッセージングシステムについて説明しています-

Distributed messaging system Description
Apache Kafka Kafka was developed at LinkedIn corporation and later it became a sub-project of Apache. Apache Kafka is based on brokerenabled, persistent, distributed publish-subscribe model. Kafka is fast, scalable, and highly efficient.
RabbitMQ RabbitMQ is an open source distributed robust messaging application. It is easy to use and runs on all platforms.
JMS(Java Message Service) JMS is an open source API that supports creating, reading, and sending messages from one application to another. It provides guaranteed message delivery and follows publish-subscribe model.
ActiveMQ ActiveMQ messaging system is an open source API of JMS.
ZeroMQ ZeroMQ is broker-less peer-peer message processing. It provides push-pull, router-dealer message patterns.
Kestrel Kestrel is a fast, reliable, and simple distributed message queue.

スリフトプロトコル

Thriftは、クロスランゲージサービス開発とリモートプロシージャコール(RPC)のためにFacebookで構築されました。 その後、オープンソースのApacheプロジェクトになりました。 Apache Thriftは Interface Definition Language であり、定義されたデータ型の上に簡単に新しいデータ型とサービス実装を定義できます。

Apache Thriftは、組み込みシステム、モバイルアプリケーション、Webアプリケーション、および他の多くのプログラミング言語をサポートする通信フレームワークでもあります。 Apache Thriftに関連する重要な機能のいくつかは、そのモジュール性、柔軟性、および高いパフォーマンスです。 さらに、分散アプリケーションでストリーミング、メッセージング、およびRPCを実行できます。

Stormは、内部通信とデータ定義にThriftプロトコルを広く使用しています。 Stormトポロジは、単純に Thrift Structs です。 Apache Stormでトポロジを実行するStorm Nimbusは* Thriftサービス*です。

Apache Storm-インストール

Apache Stormフレームワークをマシンにインストールする方法を見てみましょう。 ここには3つのマジョーステップがあります-

  • システムにJavaをインストールします(まだインストールしていない場合)。
  • ZooKeeperフレームワークをインストールします。
  • Apache Stormフレームワークをインストールします。

ステップ1-Javaインストールの検証

次のコマンドを使用して、システムにJavaがすでにインストールされているかどうかを確認します。

$ java -version

Javaがすでに存在する場合、そのバージョン番号が表示されます。 それ以外の場合は、JDKの最新バージョンをダウンロードします。

ステップ1.1-JDKのダウンロード

次のリンクを使用して、JDKの最新バージョンをダウンロードします-http://www.oracle.com/technetwork/java/javase/downloads/indexl[www.oracle.com]

最新バージョンはJDK 8u 60で、ファイルは*“ jdk-8u60-linux-x64.tar.gz” *です。 マシンにファイルをダウンロードします。

ステップ1.2-ファイルを抽出する

通常、ファイルは downloads フォルダーにダウンロードされます。 次のコマンドを使用してtarセットアップを抽出します。

$ cd/go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

ステップ1.3-optディレクトリに移動します

すべてのユーザーがJavaを使用できるようにするには、抽出されたJavaコンテンツを「/usr/local/java」フォルダーに移動します。

$ su
password: (type password of root user)
$ mkdir/opt/jdk
$ mv jdk-1.8.0_60/opt/jdk/

ステップ1.4-パスを設定する

パスとJAVA_HOME変数を設定するには、次のコマンドを〜/.bashrcファイルに追加します。

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

ここで、現在実行中のシステムにすべての変更を適用します。

$ source ~/.bashrc

ステップ1.5-Javaの代替

Javaの代替を変更するには、次のコマンドを使用します。

update-alternatives --install/usr/bin/java java/opt/jdk/jdk1.8.0_60/bin/java 100

ステップ1.6

次に、ステップ1で説明した検証コマンド*(java -version)*を使用して、Javaインストールを検証します。

ステップ2-ZooKeeper Frameworkのインストール

ステップ2.1-ZooKeeperをダウンロードする

マシンにZooKeeperフレームワークをインストールするには、次のリンクにアクセスして、ZooKeeperの最新バージョンをダウンロードしてくださいhttp://zookeeper.apache.org/releasesl

現在、ZooKeeperの最新バージョンは3.4.6(ZooKeeper-3.4.6.tar.gz)です。

ステップ2.2-tarファイルを抽出する

次のコマンドを使用してtarファイルを抽出します-

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

ステップ2.3-構成ファイルを作成する

コマンド「vi conf/zoo.cfg」を使用して「conf/zoo.cfg」という名前の構成ファイルを開き、開始点として以下のすべてのパラメーターを設定します。

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

構成ファイルが正常に保存されたら、ZooKeeperサーバーを起動できます。

ステップ2.4-ZooKeeper Serverを起動します

次のコマンドを使用して、ZooKeeperサーバーを起動します。

$ bin/zkServer.sh start

このコマンドを実行した後、次のように応答を取得します-

$ JMX enabled by default
$ Using config:/Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

ステップ2.5-CLIの開始

次のコマンドを使用して、CLIを起動します。

$ bin/zkCli.sh

上記のコマンドを実行すると、ZooKeeperサーバーに接続され、次の応答が返されます。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

ステップ2.6-ZooKeeper Serverを停止します

サーバーを接続してすべての操作を実行した後、次のコマンドを使用してZooKeeperサーバーを停止できます。

bin/zkServer.sh stop

JavaとZooKeeperがマシンに正常にインストールされました。 Apache Stormフレームワークをインストールする手順を見てみましょう。

ステップ3-Apache Storm Frameworkのインストール

ステップ3.1 Stormをダウンロードする

マシンにStormフレームワークをインストールするには、次のリンクにアクセスして、Stormの最新バージョンをダウンロードしてくださいhttp://storm.apache.org/downloadsl

現在、Stormの最新バージョンは「apache-storm-0.9.5.tar.gz」です。

ステップ3.2-tarファイルを抽出する

次のコマンドを使用してtarファイルを抽出します-

$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz
$ cd apache-storm-0.9.5
$ mkdir data

ステップ3.3-構成ファイルを開く

Stormの現在のリリースには、Stormデーモンを構成する「conf/storm.yaml」のファイルが含まれています。 そのファイルに次の情報を追加します。

$ vi conf/storm.yaml
storm.zookeeper.servers:
 - "localhost"
storm.local.dir: “/path/to/storm/data(any path)”
nimbus.host: "localhost"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

すべての変更を適用した後、保存してターミナルに戻ります。

ステップ3.4-Nimbusを開始します

$ bin/storm nimbus

ステップ3.5-スーパーバイザーを起動する

$ bin/storm supervisor

ステップ3.6 UIを開始する

$ bin/storm ui

Stormユーザーインターフェイスアプリケーションを起動した後、お気に入りのブラウザーにURL http://localhost:8080 を入力すると、Stormクラスター情報とその実行トポロジが表示されます。 ページは次のスクリーンショットのようになります。

Strom UI

Apache Storm-作業例

Apache Stormの主要な技術的詳細を確認したので、今度はいくつかの簡単なシナリオをコーディングします。

シナリオ–モバイルコールログアナライザー

モバイルコールとその継続時間は、Apache Stormへの入力として提供され、Stormは同じ発信者と受信者間のコールとそれらの合計コール数を処理およびグループ化します。

スパウト作成

注ぎ口は、データ生成に使用されるコンポーネントです。 基本的に、スパウトはIRichSpoutインターフェイスを実装します。 「IRichSpout」インターフェイスには、次の重要な方法があります-

  • open -実行する環境をスパウトに提供します。 エグゼキュータはこのメソッドを実行して、スパウトを初期化します。
  • nextTuple -コレクターを介して生成されたデータを発行します。
  • close -このメソッドは、スパウトがシャットダウンするときに呼び出されます。
  • declareOutputFields -タプルの出力スキーマを宣言します。
  • ack -特定のタプルが処理されたことを認める
  • fail -特定のタプルが処理されず、再処理されないことを指定します。

Open

*open* メソッドのシグネチャは次のとおりです-
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf -この注ぎ口のストーム構成を提供します。
  • context -トポロジ内の噴出場所、そのタスクID、入力および出力情報に関する完全な情報を提供します。
  • collector -ボルトによって処理されるタプルを発行できるようにします。

nextTuple

*nextTuple* メソッドの署名は次のとおりです-
nextTuple()

nextTuple()は、ack()およびfail()メソッドと同じループから定期的に呼び出されます。 他のメソッドが呼び出される機会があるように、実行する作業がないときにスレッドの制御を解放する必要があります。 そのため、nextTupleの最初の行は、処理が終了したかどうかを確認します。 その場合、戻る前にプロセッサの負荷を減らすために少なくとも1ミリ秒間スリープする必要があります。

閉じる

*close* メソッドのシグネチャは次のとおりです-
close()

declareOutputFields

*declareOutputFields* メソッドのシグネチャは次のとおりです-
declareOutputFields(OutputFieldsDeclarer declarer)
*declarer* -出力ストリームID、出力フィールドなどの宣言に使用されます。

このメソッドは、タプルの出力スキーマを指定するために使用されます。

ack

*ack* メソッドの署名は次のとおりです-
ack(Object msgId)

このメソッドは、特定のタプルが処理されたことを確認します。

fail

*nextTuple* メソッドの署名は次のとおりです-
ack(Object msgId)

このメソッドは、特定のタプルが完全に処理されていないことを通知します。 Stormは特定のタプルを再処理します。

FakeCallLogReaderSpout

このシナリオでは、通話ログの詳細を収集する必要があります。 通話ログの情報が含まれます。

  • 発信者番号
  • 受信者番号
  • 期間

コールログのリアルタイム情報がないため、偽のコールログを生成します。 偽情報は、Randomクラスを使用して作成されます。 完全なプログラムコードを以下に示します。

コーディング-FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface
   to access functionalities

public class FakeCallLogReaderSpout implements IRichSpout {
  //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;

  //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;

  //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }

            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

  //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

ボルト作成

Boltは、タプルを入力として受け取り、タプルを処理し、出力として新しいタプルを生成するコンポーネントです。 ボルトは IRichBolt インターフェースを実装します。 このプログラムでは、2つのボルトクラス CallLogCreatorBolt および CallLogCounterBolt を使用して操作を実行します。

IRichBoltインターフェイスには次のメソッドがあります-

  • prepare -実行する環境をボルトに提供します。 エグゼキュータはこのメソッドを実行して、スパウトを初期化します。
  • 実行-入力の単一タプルを処理します。
  • cleanup -ボルトがシャットダウンするときに呼び出されます。
  • declareOutputFields -タプルの出力スキーマを宣言します。

準備する

*prepare* メソッドの署名は次のとおりです-
prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf -このボルトのStorm構成を提供します。
  • context -トポロジ内のボルト位置、タスクID、入力および出力情報などに関する完全な情報を提供します。
  • collector -処理されたタプルを発行できるようにします。

実行する

  • 実行*メソッドのシグネチャは次のとおりです-
execute(Tuple tuple)

ここで、 tuple は処理される入力タプルです。

*execute* メソッドは、一度に1つのタプルを処理します。 タプルデータには、TupleクラスのgetValueメソッドを使用してアクセスできます。 入力タプルをすぐに処理する必要はありません。 複数のタプルを処理して、単一の出力タプルとして出力できます。 処理されたタプルは、OutputCollectorクラスを使用して発行できます。

掃除

  • クリーンアップ*メソッドのシグネチャは次のとおりです-
cleanup()

declareOutputFields

*declareOutputFields* メソッドのシグネチャは次のとおりです-
declareOutputFields(OutputFieldsDeclarer declarer)

ここでは、パラメータ declarer を使用して、出力ストリームID、出力フィールドなどを宣言します。

このメソッドは、タプルの出力スキーマを指定するために使用されます

コールログ作成者ボルト

コールログ作成者ボルトはコールログタプルを受け取ります。 通話ログタプルには、発信者番号、受信者番号、および通話時間が含まれます。 このボルトは、発信者番号と受信者番号を組み合わせて新しい値を作成するだけです。 新しい値の形式は「発信者番号-受信者番号」であり、新しいフィールド「call」と名付けられています。 完全なコードは以下の通りです。

コーディング-CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
  //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

コールログカウンターボルト

コールログカウンターボルトは、コールとその継続時間をタプルとして受け取ります。 このボルトは、prepareメソッドでディクショナリ(マップ)オブジェクトを初期化します。 execute メソッドでは、タプルをチェックし、タプルの新しい「呼び出し」値ごとに辞書オブジェクトに新しいエントリを作成し、辞書オブジェクトに値1を設定します。 ディクショナリですでに利用可能なエントリの場合、値をインクリメントします。 簡単に言えば、このボルトは呼び出しとそのカウントを辞書オブジェクトに保存します。 呼び出しとそのカウントを辞書に保存する代わりに、データソースに保存することもできます。 完全なプログラムコードは次のとおりです-

コーディング-CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);

      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }

      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

トポロジーの作成

Stormトポロジは、基本的にThrift構造です。 TopologyBuilderクラスは、複雑なトポロジを作成するためのシンプルで簡単なメソッドを提供します。 TopologyBuilderクラスには、注ぎ口*(setSpout)を設定し、ボルト(setBolt)*を設定するメソッドがあります。 最後に、TopologyBuilderにはトポロジを作成するcreateTopologyがあります。 次のコードスニペットを使用して、トポロジを作成します-

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
*shuffleGrouping* および *fieldsGrouping* メソッドは、スパウトとボルトのストリームグループを設定するのに役立ちます。

ローカルクラスター

開発目的で、「LocalCluster」オブジェクトを使用してローカルクラスターを作成し、「LocalCluster」クラスの「submitTopology」メソッドを使用してトポロジを送信できます。 「submitTopology」の引数の1つは、「Config」クラスのインスタンスです。 「Config」クラスは、トポロジを送信する前に構成オプションを設定するために使用されます。 この構成オプションは、実行時にクラスター構成とマージされ、prepareメソッドを使用してすべてのタスク(注ぎ口とボルト)に送信されます。 トポロジがクラスターに送信されると、クラスターが送信されたトポロジを計算するまで10秒待機し、「LocalCluster」の「shutdown」メソッドを使用してクラスターをシャットダウンします。 完全なプログラムコードは次のとおりです-

コーディング-LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
     //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);

     //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);

     //Stop the topology

      cluster.shutdown();
   }
}

アプリケーションの構築と実行

完全なアプリケーションには4つのJavaコードがあります。 彼らは-

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

アプリケーションは、次のコマンドを使用して構築することができます-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

アプリケーションは、次のコマンドを使用して実行できます-

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

出力

アプリケーションが起動されると、クラスターの起動プロセス、注ぎ口とボルトの処理、最後にクラスターのシャットダウンプロセスに関する詳細が出力されます。 「CallLogCounterBolt」で、コールとそのカウントの詳細を出力しました。 この情報は、次のようにコンソールに表示されます-

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非JVM言語

StormトポロジーはThriftインターフェースによって実装され、任意の言語でトポロジーを簡単に送信できます。 Stormは、Ruby、Python、および他の多くの言語をサポートしています。 Pythonバインディングを見てみましょう。

Pythonバインディング

Pythonは、汎用のインタプリタ型、インタラクティブ、オブジェクト指向、高レベルのプログラミング言語です。 Stormは、トポロジを実装するためにPythonをサポートしています。 Pythonは、放出、アンカー、確認、およびロギング操作をサポートしています。

ご存知のように、ボルトはどの言語でも定義できます。 別の言語で記述されたボルトはサブプロセスとして実行され、Stormはstdin/stdoutを介してJSONメッセージでそれらのサブプロセスと通信します。 最初に、PythonバインディングをサポートするサンプルボルトWordCountを取得します。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }

   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

ここで、クラス WordCountIRichBolt インターフェースを実装し、Python実装で指定されたスーパーメソッド引数 "splitword.py"で実行されます。 ここで、「splitword.py」という名前のpython実装を作成します。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

これは、特定の文の単語をカウントするPythonのサンプル実装です。 同様に、他のサポート言語ともバインドできます。

Apache Storm-トライデント

TridentはStormの拡張です。 Stormと同様、TridentもTwitterで開発されました。 Tridentの開発の主な理由は、ステートフルストリーム処理と低遅延分散クエリとともに、Stormの上に高レベルの抽象化を提供することです。

Tridentはスパウトとボルトを使用しますが、これらの低レベルコンポーネントは実行前にTridentによって自動生成されます。 Tridentには、機能、フィルター、結合、グループ化、および集約があります。

Tridentは、トランザクションと呼ばれる一連のバッチとしてストリームを処理します。 一般的に、これらの小さなバッチのサイズは、入力ストリームに応じて、数千または数百万のタプルのオーダーになります。 このように、トライデントは、タプルごとの処理を実行するStormとは異なります。

バッチ処理の概念は、データベーストランザクションに非常に似ています。 すべてのトランザクションには、トランザクションIDが割り当てられます。 すべての処理が完了すると、トランザクションは成功したと見なされます。 ただし、トランザクションのタプルの1つを処理できないと、トランザクション全体が再送信されます。 Tridentは、バッチごとに、トランザクションの開始時にbeginCommitを呼び出し、トランザクションの終了時にコミットします。

トライデントトポロジ

Trident APIは、「TridentTopology」クラスを使用してTridentトポロジを作成する簡単なオプションを公開します。 基本的に、トライデントトポロジは、スパウトから入力ストリームを受信し、ストリームに対して順序付けられた操作シーケンス(フィルター、集約、グループ化など)を実行します。 Storm TupleはTrident Tupleに置き換えられ、Boltsは操作に置き換えられます。 シンプルなトライデントトポロジは、次のように作成できます-

TridentTopology topology = new TridentTopology();

トライデントタプル

トライデントタプルは、値の名前付きリストです。 TridentTupleインターフェイスは、Tridentトポロジのデータモデルです。 TridentTupleインターフェイスは、Tridentトポロジで処理できるデータの基本単位です。

トライデントスパウト

トライデントスパウトはストームスパウトに似ていますが、トライデントの機能を使用するための追加オプションがあります。 実際には、Stormトポロジで使用したIRichSpoutを引き続き使用できますが、本質的に非トランザクションであり、Tridentが提供する利点を使用することはできません。

Tridentの機能を使用するためのすべての機能を備えた基本的なスパウトは、「ITridentSpout」です。 トランザクションのセマンティクスと不透明なトランザクションのセマンティクスの両方をサポートします。 他のスパウトは、IBatchSpout、IPartitionedTridentSpout、およびIOpaquePartitionedTridentSpoutです。

これらの汎用スパウトに加えて、トライデントにはトライデントスパウトの多くのサンプル実装があります。 それらの1つはFeederBatchSpoutスパウトです。これを使用して、バッチ処理や並列処理などを心配することなく、トライデントタプルの名前付きリストを簡単に送信できます。

以下に示すようにFeederBatchSpoutの作成とデータフィードを行うことができます-

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

トライデント操作

トライデントは、「トライデント操作」に依存して、トライデントタプルの入力ストリームを処理します。 Trident APIには、単純から複雑なストリーム処理を処理するための多数の組み込み操作があります。 これらの操作は、単純な検証から複雑なグループ化とトライデントタプルの集約にまで及びます。 最も重要で頻繁に使用される操作について説明します。

フィルタ

フィルターは、入力検証のタスクを実行するために使用されるオブジェクトです。 Tridentフィルターは、トライデントタプルフィールドのサブセットを入力として取得し、特定の条件が満たされているかどうかに応じてtrueまたはfalseを返します。 trueが返された場合、タプルは出力ストリームに保持されます。それ以外の場合、タプルはストリームから削除されます。 フィルタは基本的に BaseFilter クラスを継承し、 isKeep メソッドを実装します。 ここにフィルター操作のサンプル実装があります-

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

フィルター関数は、「各」メソッドを使用してトポロジーで呼び出すことができます。 「フィールド」クラスを使用して、入力(トライデントタプルのサブセット)を指定できます。 サンプルコードは次のとおりです-

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

関数

  • 関数*は、単一のトライデントタプルで簡単な操作を実行するために使用されるオブジェクトです。 これは、トライデントタプルフィールドのサブセットを取り、0個以上の新しいトライデントタプルフィールドを生成します。
*Function* は基本的に *BaseFunction* クラスを継承し、 *execute* メソッドを実装します。 サンプルの実装を以下に示します-
public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Filter操作と同様に、Function操作は each メソッドを使用してトポロジで呼び出すことができます。 サンプルコードは次のとおりです-

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

集約

集約は、入力バッチ、パーティション、またはストリームで集約操作を実行するために使用されるオブジェクトです。 トライデントには3つのタイプの集約があります。 彼らは次のとおりです-

  • aggregate -トライデントタプルの各バッチを分離して集約します。 集約プロセス中、最初にグローバルグループ化を使用してタプルが再パーティション化され、同じバッチのすべてのパーティションが単一のパーティションに結合されます。
  • partitionAggregate -トライデントタプルのバッチ全体ではなく、各パーティションを集約します。 パーティション集合体の出力は、入力タプルを完全に置き換えます。 パーティション集合の出力には、単一のフィールドタプルが含まれます。
  • persistentaggregate -すべてのバッチにわたるすべてのトライデントタプルで集計し、結果をメモリまたはデータベースに保存します。
TridentTopology topology = new TridentTopology();

//aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))

//partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))

//persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

集約操作は、CombinerAggregator、ReducerAggregator、または汎用Aggregatorインターフェイスのいずれかを使用して作成できます。 上記の例で使用されている「カウント」アグリゲーターは、組み込みアグリゲーターの1つです。 「CombinerAggregator」を使用して実装されます。 実装は次のとおりです-

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }

   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }

   @Override
   public Long zero() {
      return 0L;
   }
}

グルーピング

グループ化操作は組み込みの操作であり、 groupBy メソッドによって呼び出すことができます。 groupByメソッドは、指定されたフィールドでpartitionByを実行してストリームを再パーティション化し、各パーティション内で、グループフィールドが等しいタプルをグループ化します。 通常、「groupBy」と「persistentAggregate」を使用して、グループ化された集計を取得します。 サンプルコードは次のとおりです-

TridentTopology topology = new TridentTopology();

//persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

結合と結合

マージと結合は、それぞれ「マージ」と「結合」メソッドを使用して実行できます。 マージは、1つ以上のストリームを結合します。 結合はマージに似ていますが、結合は2つのストリームをチェックして結合するために両側からトライデントタプルフィールドを使用するという事実を除きます。 さらに、結合はバッチレベルでのみ機能します。 サンプルコードは次のとおりです-

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
   new Fields("key", "a", "b", "c"));

状態維持

トライデントは、状態を維持するメカニズムを提供します。 状態情報はトポロジ自体に保存できます。それ以外の場合は、別のデータベースに保存することもできます。 その理由は、処理中にタプルが失敗した場合、失敗したタプルが再試行されるという状態を維持するためです。 このタプルの状態が以前に更新されたかどうかわからないため、状態の更新中に問題が発生します。 状態を更新する前にタプルが失敗した場合、タプルを再試行すると状態が安定します。 ただし、状態の更新後にタプルが失敗した場合、同じタプルを再試行すると、データベース内のカウントが再び増加し、状態が不安定になります。 メッセージが一度だけ処理されるようにするには、次の手順を実行する必要があります-

  • タプルを小さなバッチで処理します。
  • 各バッチに一意​​のIDを割り当てます。 バッチが再試行される場合、同じ一意のIDが与えられます。
  • 状態の更新はバッチ間で順序付けられます。 たとえば、最初のバッチの状態更新が完了するまで、2番目のバッチの状態更新はできません。

分散RPC

分散RPCは、トライデントトポロジから結果を照会および取得するために使用されます。 Stormには、組み込みの分散RPCサーバーがあります。 分散RPCサーバーは、クライアントからRPC要求を受信し、それをトポロジに渡します。 トポロジは要求を処理し、結果を分散RPCサーバーに送信します。分散RPCサーバーは、分散RPCサーバーによってクライアントにリダイレクトされます。 Tridentの分散RPCクエリは、これらのクエリが並行して実行されるという事実を除いて、通常のRPCクエリと同様に実行されます。

トライデントを使用する場合

多くのユースケースのように、クエリを1回だけ処理することが要件である場合、Tridentでトポロジを記述することでそれを実現できます。 一方、Stormの場合、正確に1回処理することは困難です。 したがって、トライデントは、一度だけ処理する必要があるユースケースに役立ちます。 Tridentは、Stormに複雑さを追加し、状態を管理するため、すべてのユースケース、特に高性能のユースケースに適しているわけではありません。

トライデントの実例

前のセクションで解決した通話ログアナライザーアプリケーションをTridentフレームワークに変換します。 トライデントアプリケーションは、その高レベルAPIのおかげで、単純な嵐に比べて比較的簡単です。 Stormは、基本的にTridentでFunction、Filter、Aggregate、GroupBy、Join、Mergeのいずれかの操作を実行するために必要になります。 最後に、 LocalDRPC クラスを使用してDRPCサーバーを起動し、LocalDRPCクラスの execute メソッドを使用していくつかのキーワードを検索します。

通話情報のフォーマット

FormatCallクラスの目的は、「発信者番号」と「受信者番号」で構成される通話情報をフォーマットすることです。 完全なプログラムコードは次のとおりです-

コーディング:FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplitクラスの目的は、「カンマ(、)」に基づいて入力文字列を分割し、文字列内のすべての単語を出力することです。 この関数は、分散クエリの入力引数を解析するために使用されます。 完全なコードは次のとおりです-

コーディング:CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

ログアナライザー

これがメインアプリケーションです。 最初に、アプリケーションは FeederBatchSpout を使用してTridentTopologyを初期化し、呼び出し元情報をフィードします。 Trident Topologyクラスの newStream メソッドを使用して、Trident Topologyストリームを作成できます。 同様に、TridentTopologyクラスの newDRCPStream メソッドを使用して、TridentトポロジのDRPCストリームを作成できます。 LocalDRPCクラスを使用して、単純なDRCPサーバーを作成できます。 LocalDRPC には、キーワードを検索するためのexecuteメソッドがあります。 完全なコードは以下の通りです。

コーディング:LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();

      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"),
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(),
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(),
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;

      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402",
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

     //DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

アプリケーションの構築と実行

完全なアプリケーションには3つのJavaコードがあります。 彼らは次のとおりです-

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

アプリケーションは、次のコマンドを使用して構築することができます-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

アプリケーションは、次のコマンドを使用して実行できます-

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

出力

アプリケーションが起動されると、アプリケーションはクラスターの起動プロセス、操作処理、DRPCサーバーとクライアントの情報、そして最後にクラスターのシャットダウンプロセスに関する完全な詳細を出力します。 この出力は、以下に示すようにコンソールに表示されます。

DRPC : Query starts
[[DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[DRPC : Query ends

TwitterのApache Storm

この章では、Apache Stormのリアルタイムアプリケーションについて説明します。 StormがTwitterでどのように使用されるかを確認します。

Twitter

Twitterは、ユーザーのツイートを送受信するプラットフォームを提供するオンラインソーシャルネットワーキングサービスです。 登録ユーザーはツイートを読んだり投稿したりできますが、未登録ユーザーはツイートを読むことしかできません。 ハッシュタグは、関連するキーワードの前に#を追加することにより、ツイートをキーワード別に分類するために使用されます。 次に、トピックごとに最も使用されるハッシュタグを見つけるリアルタイムのシナリオを見てみましょう。

スパウト作成

スパウトの目的は、できるだけ早く人々から投稿されたツイートを取得することです。 Twitterは、リアルタイムで人々が送信したツイートを取得するためのWebサービスベースのツールである「Twitter Streaming API」を提供します。 Twitter Streaming APIは、あらゆるプログラミング言語でアクセスできます。

*twitter4j* はオープンソースの非公式Javaライブラリであり、Twitter Streaming APIに簡単にアクセスするためのJavaベースのモジュールを提供します。 *twitter4j* は、ツイートにアクセスするためのリスナーベースのフレームワークを提供します。 Twitter Streaming APIにアクセスするには、Twitter開発者アカウントにサインインする必要があり、次のOAuth認証の詳細を取得する必要があります。
  • 顧客キー
  • CustomerSecret
  • アクセストークン
  • AccessTookenSecret

Stormは、スターターキットでtwitterスパウト TwitterSampleSpout を提供しています。 これを使用してツイートを取得します。 注ぎ口には、OAuth認証の詳細と少なくともキーワードが必要です。 注ぎ口は、キーワードに基づいてリアルタイムのツイートを送信します。 完全なプログラムコードを以下に示します。

コーディング:TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;

   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;

   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }

   public TwitterSampleSpout() {
     //TODO Auto-generated constructor stub
   }

   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }

            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}

            @Override
            public void onTrackLimitationNotice(int i) {}

            @Override
            public void onScrubGeo(long l, long l1) {}

            @Override
            public void onException(Exception ex) {}

            @Override
            public void onStallWarning(StallWarning arg0) {
              //TODO Auto-generated method stub
            }
         };

         ConfigurationBuilder cb = new ConfigurationBuilder();

         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);

         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);

         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }

   @Override
   public void nextTuple() {
      Status ret = queue.poll();

      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }

   @Override
   public void close() {
      _twitterStream.shutdown();
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }

   @Override
   public void ack(Object id) {}

   @Override
   public void fail(Object id) {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

ハッシュタグリーダーボルト

スパウトから送信されたツイートは HashtagReaderBolt に転送され、ツイートを処理して利用可能なすべてのハッシュタグを送信します。 HashtagReaderBoltは、twitter4jが提供する getHashTagEntities メソッドを使用します。 getHashTagEntitiesはツイートを読み取り、ハッシュタグのリストを返します。 完全なプログラムコードは次のとおりです-

コーディング:HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

ハッシュタグカウンターボルト

発行されたハッシュタグは HashtagCounterBolt に転送されます。 このボルトはすべてのハッシュタグを処理し、Java Mapオブジェクトを使用してすべてのハッシュタグとそのカウントをメモリに保存します。 完全なプログラムコードを以下に示します。

コーディング:HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }

      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

トポロジの送信

トポロジの送信がメインアプリケーションです。 Twitterトポロジは、 TwitterSampleSpoutHashtagReaderBolt 、および HashtagCounterBolt で構成されます。 次のプログラムコードは、トポロジを送信する方法を示しています。

コーディング:TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];

      String accessToken = args[2];
      String accessTokenSecret = args[3];

      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);

      Config config = new Config();
      config.setDebug(true);

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

アプリケーションの構築と実行

完全なアプリケーションには4つのJavaコードがあります。 彼らは次のとおりです-

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

次のコマンドを使用してアプリケーションをコンパイルできます-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

次のコマンドを使用してアプリケーションを実行します-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

出力

アプリケーションは、現在利用可能なハッシュタグとそのカウントを印刷します。 出力は次のようになります-

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1

Yahoo!のApache Storm ファイナンス

Yahoo! 金融は、インターネットを代表するビジネスニュースおよび金融データWebサイトです。 Yahoo!の一部です。 金融ニュース、市場統計、国際市場データ、および誰でもアクセスできる金融リソースに関するその他の情報に関する情報を提供します。

登録済みのYahoo!の場合 ユーザー、Yahoo!をカスタマイズできます。 特定のサービスを活用するためのファイナンス。 Yahoo! Finance APIは、Yahoo!からの財務データのクエリに使用されます

このAPIは、リアルタイムから15分遅れたデータを表示し、データベースを1分ごとに更新して、現在の株式関連情報にアクセスします。 次に、会社のリアルタイムシナリオを取り上げて、株価が100を下回ったときにアラートを発生させる方法を見てみましょう。

スパウト作成

注ぎ口の目的は、会社の詳細を取得し、ボルトに価格を発行することです。 次のプログラムコードを使用して、注ぎ口を作成できます。

コーディング:YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

ボルト作成

ここで、ボルトの目的は、価格が100を下回ったときに特定の会社の価格を処理することです。 Java Mapオブジェクトを使用して、株価が100を下回ると、カットオフ価格制限アラートを true に設定します。そうでない場合はfalse。 完全なプログラムコードは次のとおりです-

コーディング:PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;

   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }

      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

トポロジの送信

これは、YahooFinanceSpout.javaとPriceCutOffBolt.javaが接続されてトポロジを生成するメインアプリケーションです。 次のプログラムコードは、トポロジを送信する方法を示しています。

コーディング:YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

アプリケーションの構築と実行

完全なアプリケーションには3つのJavaコードがあります。 彼らは次のとおりです-

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

アプリケーションは、次のコマンドを使用して構築することができます-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

アプリケーションは、次のコマンドを使用して実行できます-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

出力

出力は次のようになります-

GOOGL : false
AAPL : false
INTC : true

Apache Storm-アプリケーション

Apache Stormフレームワークは、今日の最高の産業用アプリケーションの多くをサポートしています。 この章では、Stormの最も注目すべきアプリケーションの概要を簡単に説明します。

クラウト

Kloutは、ソーシャルメディア分析を使用して、* Kloutスコア*(1〜100の数値)を使用して、オンラインの社会的影響に基づいてユーザーをランク付けするアプリケーションです。 Kloutは、Apache Stormの組み込みのTrident抽象化を使用して、データをストリーミングする複雑なトポロジを作成します。

天気予報チャンネル

Weather Channelは、Stormトポロジーを使用して気象データを取り込みます。 Twitterと提携して、天気情報に基づいたTwitterおよびモバイルアプリケーションでの広告を可能にしました。 OpenSignal は、ワイヤレスカバレッジマッピングを専門とする会社です。 StormTag および WeatherSignal は、OpenSignalによって作成された気象ベースのプロジェクトです。 StormTagは、キーチェーンに接続するBluetooth気象ステーションです。 デバイスによって収集された気象データは、WeatherSignalアプリとOpenSignalサーバーに送信されます。

テレコム産業

通信プロバイダーは、1秒あたり数百万の電話を処理します。 彼らは、ドロップされたコールと低音質でフォレンジックを実行します。 コール詳細レコードは毎秒数百万の割合で流入し、Apache Stormはそれらをリアルタイムで処理し、問題のあるパターンを識別します。 ストーム分析を使用して、通話品質を継続的に改善できます。