Apache-storm-working-example
Apache Storm-作業例
Apache Stormの主要な技術的詳細を確認したので、今度はいくつかの簡単なシナリオをコーディングします。
シナリオ–モバイルコールログアナライザー
モバイルコールとその継続時間は、Apache Stormへの入力として提供され、Stormは同じ発信者と受信者間のコールとそれらの合計コール数を処理およびグループ化します。
スパウト作成
注ぎ口は、データ生成に使用されるコンポーネントです。 基本的に、スパウトはIRichSpoutインターフェイスを実装します。 「IRichSpout」インターフェイスには、次の重要な方法があります-
- open -実行する環境をスパウトに提供します。 エグゼキュータはこのメソッドを実行して、スパウトを初期化します。
- nextTuple -コレクターを介して生成されたデータを発行します。
- close -このメソッドは、スパウトがシャットダウンするときに呼び出されます。
- declareOutputFields -タプルの出力スキーマを宣言します。
- ack -特定のタプルが処理されたことを認める
- fail -特定のタプルが処理されず、再処理されないことを指定します。
Open
- conf -この注ぎ口のストーム構成を提供します。
- context -トポロジ内の噴出場所、そのタスクID、入力および出力情報に関する完全な情報を提供します。
- collector -ボルトによって処理されるタプルを発行できるようにします。
nextTuple
nextTuple()は、ack()およびfail()メソッドと同じループから定期的に呼び出されます。 他のメソッドが呼び出される機会があるように、実行する作業がないときにスレッドの制御を解放する必要があります。 そのため、nextTupleの最初の行は、処理が終了したかどうかを確認します。 その場合、戻る前にプロセッサの負荷を減らすために少なくとも1ミリ秒間スリープする必要があります。
閉じる
declareOutputFields
このメソッドは、タプルの出力スキーマを指定するために使用されます。
ack
このメソッドは、特定のタプルが処理されたことを確認します。
fail
このメソッドは、特定のタプルが完全に処理されていないことを通知します。 Stormは特定のタプルを再処理します。
FakeCallLogReaderSpout
このシナリオでは、通話ログの詳細を収集する必要があります。 通話ログの情報が含まれます。
- 発信者番号
- 受信者番号
- 期間
コールログのリアルタイム情報がないため、偽のコールログを生成します。 偽情報は、Randomクラスを使用して作成されます。 完全なプログラムコードを以下に示します。
コーディング-FakeCallLogReaderSpout.java
ボルト作成
Boltは、タプルを入力として受け取り、タプルを処理し、出力として新しいタプルを生成するコンポーネントです。 ボルトは IRichBolt インターフェースを実装します。 このプログラムでは、2つのボルトクラス CallLogCreatorBolt および CallLogCounterBolt を使用して操作を実行します。
IRichBoltインターフェイスには次のメソッドがあります-
- prepare -実行する環境をボルトに提供します。 エグゼキュータはこのメソッドを実行して、スパウトを初期化します。
- 実行-入力の単一タプルを処理します。
- cleanup -ボルトがシャットダウンするときに呼び出されます。
- declareOutputFields -タプルの出力スキーマを宣言します。
準備する
- conf -このボルトのStorm構成を提供します。
- context -トポロジ内のボルト位置、タスクID、入力および出力情報などに関する完全な情報を提供します。
- collector -処理されたタプルを発行できるようにします。
実行する
- 実行*メソッドのシグネチャは次のとおりです-
ここで、 tuple は処理される入力タプルです。
掃除
- クリーンアップ*メソッドのシグネチャは次のとおりです-
declareOutputFields
ここでは、パラメータ declarer を使用して、出力ストリームID、出力フィールドなどを宣言します。
このメソッドは、タプルの出力スキーマを指定するために使用されます
コールログ作成者ボルト
コールログ作成者ボルトはコールログタプルを受け取ります。 通話ログタプルには、発信者番号、受信者番号、および通話時間が含まれます。 このボルトは、発信者番号と受信者番号を組み合わせて新しい値を作成するだけです。 新しい値の形式は「発信者番号-受信者番号」であり、新しいフィールド「call」と名付けられています。 完全なコードは以下の通りです。
コーディング-CallLogCreatorBolt.java
コールログカウンターボルト
コールログカウンターボルトは、コールとその継続時間をタプルとして受け取ります。 このボルトは、prepareメソッドでディクショナリ(マップ)オブジェクトを初期化します。 execute メソッドでは、タプルをチェックし、タプルの新しい「呼び出し」値ごとに辞書オブジェクトに新しいエントリを作成し、辞書オブジェクトに値1を設定します。 ディクショナリですでに利用可能なエントリの場合、値をインクリメントします。 簡単に言えば、このボルトは呼び出しとそのカウントを辞書オブジェクトに保存します。 呼び出しとそのカウントを辞書に保存する代わりに、データソースに保存することもできます。 完全なプログラムコードは次のとおりです-
コーディング-CallLogCounterBolt.java
トポロジーの作成
Stormトポロジは、基本的にThrift構造です。 TopologyBuilderクラスは、複雑なトポロジを作成するためのシンプルで簡単なメソッドを提供します。 TopologyBuilderクラスには、注ぎ口*(setSpout)を設定し、ボルト(setBolt)*を設定するメソッドがあります。 最後に、TopologyBuilderにはトポロジを作成するcreateTopologyがあります。 次のコードスニペットを使用して、トポロジを作成します-
ローカルクラスター
開発目的で、「LocalCluster」オブジェクトを使用してローカルクラスターを作成し、「LocalCluster」クラスの「submitTopology」メソッドを使用してトポロジを送信できます。 「submitTopology」の引数の1つは、「Config」クラスのインスタンスです。 「Config」クラスは、トポロジを送信する前に構成オプションを設定するために使用されます。 この構成オプションは、実行時にクラスター構成とマージされ、prepareメソッドを使用してすべてのタスク(注ぎ口とボルト)に送信されます。 トポロジがクラスターに送信されると、クラスターが送信されたトポロジを計算するまで10秒待機し、「LocalCluster」の「shutdown」メソッドを使用してクラスターをシャットダウンします。 完全なプログラムコードは次のとおりです-
コーディング-LogAnalyserStorm.java
アプリケーションの構築と実行
完全なアプリケーションには4つのJavaコードがあります。 彼らは-
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
アプリケーションは、次のコマンドを使用して構築することができます-
アプリケーションは、次のコマンドを使用して実行できます-
出力
アプリケーションが起動されると、クラスターの起動プロセス、注ぎ口とボルトの処理、最後にクラスターのシャットダウンプロセスに関する詳細が出力されます。 「CallLogCounterBolt」で、コールとそのカウントの詳細を出力しました。 この情報は、次のようにコンソールに表示されます-
非JVM言語
StormトポロジーはThriftインターフェースによって実装され、任意の言語でトポロジーを簡単に送信できます。 Stormは、Ruby、Python、および他の多くの言語をサポートしています。 Pythonバインディングを見てみましょう。
Pythonバインディング
Pythonは、汎用のインタプリタ型、インタラクティブ、オブジェクト指向、高レベルのプログラミング言語です。 Stormは、トポロジを実装するためにPythonをサポートしています。 Pythonは、放出、アンカー、確認、およびロギング操作をサポートしています。
ご存知のように、ボルトはどの言語でも定義できます。 別の言語で記述されたボルトはサブプロセスとして実行され、Stormはstdin/stdoutを介してJSONメッセージでそれらのサブプロセスと通信します。 最初に、PythonバインディングをサポートするサンプルボルトWordCountを取得します。
ここで、クラス WordCount は IRichBolt インターフェースを実装し、Python実装で指定されたスーパーメソッド引数 "splitword.py"で実行されます。 ここで、「splitword.py」という名前のpython実装を作成します。
これは、特定の文の単語をカウントするPythonのサンプル実装です。 同様に、他のサポート言語ともバインドできます。