Apache-storm-working-example
Apache Storm-作業例
Apache Stormの主要な技術的詳細を確認したので、今度はいくつかの簡単なシナリオをコーディングします。
シナリオ–モバイルコールログアナライザー
モバイルコールとその継続時間は、Apache Stormへの入力として提供され、Stormは同じ発信者と受信者間のコールとそれらの合計コール数を処理およびグループ化します。
スパウト作成
注ぎ口は、データ生成に使用されるコンポーネントです。 基本的に、スパウトはIRichSpoutインターフェイスを実装します。 「IRichSpout」インターフェイスには、次の重要な方法があります-
- open -実行する環境をスパウトに提供します。 エグゼキュータはこのメソッドを実行して、スパウトを初期化します。
- nextTuple -コレクターを介して生成されたデータを発行します。
- close -このメソッドは、スパウトがシャットダウンするときに呼び出されます。
- declareOutputFields -タプルの出力スキーマを宣言します。
- ack -特定のタプルが処理されたことを認める
- fail -特定のタプルが処理されず、再処理されないことを指定します。
Open
*open* メソッドのシグネチャは次のとおりです-
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
- conf -この注ぎ口のストーム構成を提供します。
- context -トポロジ内の噴出場所、そのタスクID、入力および出力情報に関する完全な情報を提供します。
- collector -ボルトによって処理されるタプルを発行できるようにします。
nextTuple
*nextTuple* メソッドの署名は次のとおりです-
nextTuple()
nextTuple()は、ack()およびfail()メソッドと同じループから定期的に呼び出されます。 他のメソッドが呼び出される機会があるように、実行する作業がないときにスレッドの制御を解放する必要があります。 そのため、nextTupleの最初の行は、処理が終了したかどうかを確認します。 その場合、戻る前にプロセッサの負荷を減らすために少なくとも1ミリ秒間スリープする必要があります。
閉じる
*close* メソッドのシグネチャは次のとおりです-
close()
declareOutputFields
*declareOutputFields* メソッドのシグネチャは次のとおりです-
declareOutputFields(OutputFieldsDeclarer declarer)
*declarer* -出力ストリームID、出力フィールドなどの宣言に使用されます。
このメソッドは、タプルの出力スキーマを指定するために使用されます。
ack
*ack* メソッドの署名は次のとおりです-
ack(Object msgId)
このメソッドは、特定のタプルが処理されたことを確認します。
fail
*nextTuple* メソッドの署名は次のとおりです-
ack(Object msgId)
このメソッドは、特定のタプルが完全に処理されていないことを通知します。 Stormは特定のタプルを再処理します。
FakeCallLogReaderSpout
このシナリオでは、通話ログの詳細を収集する必要があります。 通話ログの情報が含まれます。
- 発信者番号
- 受信者番号
- 期間
コールログのリアルタイム情報がないため、偽のコールログを生成します。 偽情報は、Randomクラスを使用して作成されます。 完全なプログラムコードを以下に示します。
コーディング-FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
ボルト作成
Boltは、タプルを入力として受け取り、タプルを処理し、出力として新しいタプルを生成するコンポーネントです。 ボルトは IRichBolt インターフェースを実装します。 このプログラムでは、2つのボルトクラス CallLogCreatorBolt および CallLogCounterBolt を使用して操作を実行します。
IRichBoltインターフェイスには次のメソッドがあります-
- prepare -実行する環境をボルトに提供します。 エグゼキュータはこのメソッドを実行して、スパウトを初期化します。
- 実行-入力の単一タプルを処理します。
- cleanup -ボルトがシャットダウンするときに呼び出されます。
- declareOutputFields -タプルの出力スキーマを宣言します。
準備する
*prepare* メソッドの署名は次のとおりです-
prepare(Map conf, TopologyContext context, OutputCollector collector)
- conf -このボルトのStorm構成を提供します。
- context -トポロジ内のボルト位置、タスクID、入力および出力情報などに関する完全な情報を提供します。
- collector -処理されたタプルを発行できるようにします。
実行する
- 実行*メソッドのシグネチャは次のとおりです-
execute(Tuple tuple)
ここで、 tuple は処理される入力タプルです。
*execute* メソッドは、一度に1つのタプルを処理します。 タプルデータには、TupleクラスのgetValueメソッドを使用してアクセスできます。 入力タプルをすぐに処理する必要はありません。 複数のタプルを処理して、単一の出力タプルとして出力できます。 処理されたタプルは、OutputCollectorクラスを使用して発行できます。
掃除
- クリーンアップ*メソッドのシグネチャは次のとおりです-
cleanup()
declareOutputFields
*declareOutputFields* メソッドのシグネチャは次のとおりです-
declareOutputFields(OutputFieldsDeclarer declarer)
ここでは、パラメータ declarer を使用して、出力ストリームID、出力フィールドなどを宣言します。
このメソッドは、タプルの出力スキーマを指定するために使用されます
コールログ作成者ボルト
コールログ作成者ボルトはコールログタプルを受け取ります。 通話ログタプルには、発信者番号、受信者番号、および通話時間が含まれます。 このボルトは、発信者番号と受信者番号を組み合わせて新しい値を作成するだけです。 新しい値の形式は「発信者番号-受信者番号」であり、新しいフィールド「call」と名付けられています。 完全なコードは以下の通りです。
コーディング-CallLogCreatorBolt.java
//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
コールログカウンターボルト
コールログカウンターボルトは、コールとその継続時間をタプルとして受け取ります。 このボルトは、prepareメソッドでディクショナリ(マップ)オブジェクトを初期化します。 execute メソッドでは、タプルをチェックし、タプルの新しい「呼び出し」値ごとに辞書オブジェクトに新しいエントリを作成し、辞書オブジェクトに値1を設定します。 ディクショナリですでに利用可能なエントリの場合、値をインクリメントします。 簡単に言えば、このボルトは呼び出しとそのカウントを辞書オブジェクトに保存します。 呼び出しとそのカウントを辞書に保存する代わりに、データソースに保存することもできます。 完全なプログラムコードは次のとおりです-
コーディング-CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
トポロジーの作成
Stormトポロジは、基本的にThrift構造です。 TopologyBuilderクラスは、複雑なトポロジを作成するためのシンプルで簡単なメソッドを提供します。 TopologyBuilderクラスには、注ぎ口*(setSpout)を設定し、ボルト(setBolt)*を設定するメソッドがあります。 最後に、TopologyBuilderにはトポロジを作成するcreateTopologyがあります。 次のコードスニペットを使用して、トポロジを作成します-
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
*shuffleGrouping* および *fieldsGrouping* メソッドは、スパウトとボルトのストリームグループを設定するのに役立ちます。
ローカルクラスター
開発目的で、「LocalCluster」オブジェクトを使用してローカルクラスターを作成し、「LocalCluster」クラスの「submitTopology」メソッドを使用してトポロジを送信できます。 「submitTopology」の引数の1つは、「Config」クラスのインスタンスです。 「Config」クラスは、トポロジを送信する前に構成オプションを設定するために使用されます。 この構成オプションは、実行時にクラスター構成とマージされ、prepareメソッドを使用してすべてのタスク(注ぎ口とボルト)に送信されます。 トポロジがクラスターに送信されると、クラスターが送信されたトポロジを計算するまで10秒待機し、「LocalCluster」の「shutdown」メソッドを使用してクラスターをシャットダウンします。 完全なプログラムコードは次のとおりです-
コーディング-LogAnalyserStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
アプリケーションの構築と実行
完全なアプリケーションには4つのJavaコードがあります。 彼らは-
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
アプリケーションは、次のコマンドを使用して構築することができます-
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
アプリケーションは、次のコマンドを使用して実行できます-
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
出力
アプリケーションが起動されると、クラスターの起動プロセス、注ぎ口とボルトの処理、最後にクラスターのシャットダウンプロセスに関する詳細が出力されます。 「CallLogCounterBolt」で、コールとそのカウントの詳細を出力しました。 この情報は、次のようにコンソールに表示されます-
1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93
非JVM言語
StormトポロジーはThriftインターフェースによって実装され、任意の言語でトポロジーを簡単に送信できます。 Stormは、Ruby、Python、および他の多くの言語をサポートしています。 Pythonバインディングを見てみましょう。
Pythonバインディング
Pythonは、汎用のインタプリタ型、インタラクティブ、オブジェクト指向、高レベルのプログラミング言語です。 Stormは、トポロジを実装するためにPythonをサポートしています。 Pythonは、放出、アンカー、確認、およびロギング操作をサポートしています。
ご存知のように、ボルトはどの言語でも定義できます。 別の言語で記述されたボルトはサブプロセスとして実行され、Stormはstdin/stdoutを介してJSONメッセージでそれらのサブプロセスと通信します。 最初に、PythonバインディングをサポートするサンプルボルトWordCountを取得します。
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
ここで、クラス WordCount は IRichBolt インターフェースを実装し、Python実装で指定されたスーパーメソッド引数 "splitword.py"で実行されます。 ここで、「splitword.py」という名前のpython実装を作成します。
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
これは、特定の文の単語をカウントするPythonのサンプル実装です。 同様に、他のサポート言語ともバインドできます。