Apache-storm-trident

提供:Dev Guides
移動先:案内検索

Apache Storm-トライデント

TridentはStormの拡張です。 Stormと同様、TridentもTwitterで開発されました。 Tridentの開発の主な理由は、ステートフルストリーム処理と低遅延分散クエリとともに、Stormの上に高レベルの抽象化を提供することです。

Tridentはスパウトとボルトを使用しますが、これらの低レベルコンポーネントは実行前にTridentによって自動生成されます。 Tridentには、機能、フィルター、結合、グループ化、および集約があります。

Tridentは、トランザクションと呼ばれる一連のバッチとしてストリームを処理します。 一般的に、これらの小さなバッチのサイズは、入力ストリームに応じて、数千または数百万のタプルのオーダーになります。 このように、トライデントは、タプルごとの処理を実行するStormとは異なります。

バッチ処理の概念は、データベーストランザクションに非常に似ています。 すべてのトランザクションには、トランザクションIDが割り当てられます。 すべての処理が完了すると、トランザクションは成功したと見なされます。 ただし、トランザクションのタプルの1つを処理できないと、トランザクション全体が再送信されます。 Tridentは、バッチごとに、トランザクションの開始時にbeginCommitを呼び出し、トランザクションの終了時にコミットします。

トライデントトポロジ

Trident APIは、「TridentTopology」クラスを使用してTridentトポロジを作成する簡単なオプションを公開します。 基本的に、トライデントトポロジは、スパウトから入力ストリームを受信し、ストリームに対して順序付けられた操作シーケンス(フィルター、集約、グループ化など)を実行します。 Storm TupleはTrident Tupleに置き換えられ、Boltsは操作に置き換えられます。 シンプルなトライデントトポロジは、次のように作成できます-

TridentTopology topology = new TridentTopology();

トライデントタプル

トライデントタプルは、値の名前付きリストです。 TridentTupleインターフェイスは、Tridentトポロジのデータモデルです。 TridentTupleインターフェイスは、Tridentトポロジで処理できるデータの基本単位です。

トライデントスパウト

トライデントスパウトはストームスパウトに似ていますが、トライデントの機能を使用するための追加オプションがあります。 実際には、Stormトポロジで使用したIRichSpoutを引き続き使用できますが、本質的に非トランザクションであり、Tridentが提供する利点を使用することはできません。

Tridentの機能を使用するためのすべての機能を備えた基本的なスパウトは、「ITridentSpout」です。 トランザクションのセマンティクスと不透明なトランザクションのセマンティクスの両方をサポートします。 他のスパウトは、IBatchSpout、IPartitionedTridentSpout、およびIOpaquePartitionedTridentSpoutです。

これらの汎用スパウトに加えて、トライデントにはトライデントスパウトの多くのサンプル実装があります。 それらの1つはFeederBatchSpoutスパウトです。これを使用して、バッチ処理や並列処理などを心配することなく、トライデントタプルの名前付きリストを簡単に送信できます。

以下に示すようにFeederBatchSpoutの作成とデータフィードを行うことができます-

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

トライデント操作

トライデントは、「トライデント操作」に依存して、トライデントタプルの入力ストリームを処理します。 Trident APIには、単純から複雑なストリーム処理を処理するための多数の組み込み操作があります。 これらの操作は、単純な検証から複雑なグループ化とトライデントタプルの集約にまで及びます。 最も重要で頻繁に使用される操作について説明します。

フィルタ

フィルターは、入力検証のタスクを実行するために使用されるオブジェクトです。 Tridentフィルターは、トライデントタプルフィールドのサブセットを入力として取得し、特定の条件が満たされているかどうかに応じてtrueまたはfalseを返します。 trueが返された場合、タプルは出力ストリームに保持されます。それ以外の場合、タプルはストリームから削除されます。 フィルタは基本的に BaseFilter クラスを継承し、 isKeep メソッドを実装します。 ここにフィルター操作のサンプル実装があります-

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

フィルター関数は、「各」メソッドを使用してトポロジーで呼び出すことができます。 「フィールド」クラスを使用して、入力(トライデントタプルのサブセット)を指定できます。 サンプルコードは次のとおりです-

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

関数

  • 関数*は、単一のトライデントタプルで簡単な操作を実行するために使用されるオブジェクトです。 これは、トライデントタプルフィールドのサブセットを取り、0個以上の新しいトライデントタプルフィールドを生成します。
*Function* は基本的に *BaseFunction* クラスを継承し、 *execute* メソッドを実装します。 サンプルの実装を以下に示します-
public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Filter操作と同様に、Function操作は each メソッドを使用してトポロジで呼び出すことができます。 サンプルコードは次のとおりです-

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

集約

集約は、入力バッチ、パーティション、またはストリームで集約操作を実行するために使用されるオブジェクトです。 トライデントには3つのタイプの集約があります。 彼らは次のとおりです-

  • aggregate -トライデントタプルの各バッチを分離して集約します。 集約プロセス中、最初にグローバルグループ化を使用してタプルが再パーティション化され、同じバッチのすべてのパーティションが単一のパーティションに結合されます。
  • partitionAggregate -トライデントタプルのバッチ全体ではなく、各パーティションを集約します。 パーティション集合体の出力は、入力タプルを完全に置き換えます。 パーティション集合の出力には、単一のフィールドタプルが含まれます。
  • persistentaggregate -すべてのバッチにわたるすべてのトライデントタプルで集計し、結果をメモリまたはデータベースに保存します。
TridentTopology topology = new TridentTopology();

//aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))

//partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))

//persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

集約操作は、CombinerAggregator、ReducerAggregator、または汎用Aggregatorインターフェイスのいずれかを使用して作成できます。 上記の例で使用されている「カウント」アグリゲーターは、組み込みアグリゲーターの1つです。 「CombinerAggregator」を使用して実装されます。 実装は次のとおりです-

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }

   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }

   @Override
   public Long zero() {
      return 0L;
   }
}

グルーピング

グループ化操作は組み込みの操作であり、 groupBy メソッドによって呼び出すことができます。 groupByメソッドは、指定されたフィールドでpartitionByを実行してストリームを再パーティション化し、各パーティション内で、グループフィールドが等しいタプルをグループ化します。 通常、「groupBy」と「persistentAggregate」を使用して、グループ化された集計を取得します。 サンプルコードは次のとおりです-

TridentTopology topology = new TridentTopology();

//persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

結合と結合

マージと結合は、それぞれ「マージ」と「結合」メソッドを使用して実行できます。 マージは、1つ以上のストリームを結合します。 結合はマージに似ていますが、結合は2つのストリームをチェックして結合するために両側からトライデントタプルフィールドを使用するという事実を除きます。 さらに、結合はバッチレベルでのみ機能します。 サンプルコードは次のとおりです-

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
   new Fields("key", "a", "b", "c"));

状態維持

トライデントは、状態を維持するメカニズムを提供します。 状態情報はトポロジ自体に保存できます。それ以外の場合は、別のデータベースに保存することもできます。 その理由は、処理中にタプルが失敗した場合、失敗したタプルが再試行されるという状態を維持するためです。 このタプルの状態が以前に更新されたかどうかわからないため、状態の更新中に問題が発生します。 状態を更新する前にタプルが失敗した場合、タプルを再試行すると状態が安定します。 ただし、状態の更新後にタプルが失敗した場合、同じタプルを再試行すると、データベース内のカウントが再び増加し、状態が不安定になります。 メッセージが一度だけ処理されるようにするには、次の手順を実行する必要があります-

  • タプルを小さなバッチで処理します。
  • 各バッチに一意​​のIDを割り当てます。 バッチが再試行される場合、同じ一意のIDが与えられます。
  • 状態の更新はバッチ間で順序付けられます。 たとえば、最初のバッチの状態更新が完了するまで、2番目のバッチの状態更新はできません。

分散RPC

分散RPCは、トライデントトポロジから結果を照会および取得するために使用されます。 Stormには、組み込みの分散RPCサーバーがあります。 分散RPCサーバーは、クライアントからRPC要求を受信し、それをトポロジに渡します。 トポロジは要求を処理し、結果を分散RPCサーバーに送信します。分散RPCサーバーは、分散RPCサーバーによってクライアントにリダイレクトされます。 Tridentの分散RPCクエリは、これらのクエリが並行して実行されるという事実を除いて、通常のRPCクエリと同様に実行されます。

トライデントを使用する場合

多くのユースケースのように、クエリを1回だけ処理することが要件である場合、Tridentでトポロジを記述することでそれを実現できます。 一方、Stormの場合、正確に1回処理することは困難です。 したがって、トライデントは、一度だけ処理する必要があるユースケースに役立ちます。 Tridentは、Stormに複雑さを追加し、状態を管理するため、すべてのユースケース、特に高性能のユースケースに適しているわけではありません。

トライデントの実例

前のセクションで解決した通話ログアナライザーアプリケーションをTridentフレームワークに変換します。 トライデントアプリケーションは、その高レベルAPIのおかげで、単純な嵐に比べて比較的簡単です。 Stormは、基本的にTridentでFunction、Filter、Aggregate、GroupBy、Join、Mergeのいずれかの操作を実行するために必要になります。 最後に、 LocalDRPC クラスを使用してDRPCサーバーを起動し、LocalDRPCクラスの execute メソッドを使用していくつかのキーワードを検索します。

通話情報のフォーマット

FormatCallクラスの目的は、「発信者番号」と「受信者番号」で構成される通話情報をフォーマットすることです。 完全なプログラムコードは次のとおりです-

コーディング:FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplitクラスの目的は、「カンマ(、)」に基づいて入力文字列を分割し、文字列内のすべての単語を出力することです。 この関数は、分散クエリの入力引数を解析するために使用されます。 完全なコードは次のとおりです-

コーディング:CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

ログアナライザー

これがメインアプリケーションです。 最初に、アプリケーションは FeederBatchSpout を使用してTridentTopologyを初期化し、呼び出し元情報をフィードします。 Trident Topologyクラスの newStream メソッドを使用して、Trident Topologyストリームを作成できます。 同様に、TridentTopologyクラスの newDRCPStream メソッドを使用して、TridentトポロジのDRPCストリームを作成できます。 LocalDRPCクラスを使用して、単純なDRCPサーバーを作成できます。 LocalDRPC には、キーワードを検索するためのexecuteメソッドがあります。 完全なコードは以下の通りです。

コーディング:LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();

      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"),
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(),
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(),
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;

      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402",
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

     //DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

アプリケーションの構築と実行

完全なアプリケーションには3つのJavaコードがあります。 彼らは次のとおりです-

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

アプリケーションは、次のコマンドを使用して構築することができます-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

アプリケーションは、次のコマンドを使用して実行できます-

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

出力

アプリケーションが起動されると、アプリケーションはクラスターの起動プロセス、操作処理、DRPCサーバーとクライアントの情報、そして最後にクラスターのシャットダウンプロセスに関する完全な詳細を出力します。 この出力は、以下に示すようにコンソールに表示されます。

DRPC : Query starts
[[DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[DRPC : Query ends