Apache-storm-trident
Apache Storm-トライデント
TridentはStormの拡張です。 Stormと同様、TridentもTwitterで開発されました。 Tridentの開発の主な理由は、ステートフルストリーム処理と低遅延分散クエリとともに、Stormの上に高レベルの抽象化を提供することです。
Tridentはスパウトとボルトを使用しますが、これらの低レベルコンポーネントは実行前にTridentによって自動生成されます。 Tridentには、機能、フィルター、結合、グループ化、および集約があります。
Tridentは、トランザクションと呼ばれる一連のバッチとしてストリームを処理します。 一般的に、これらの小さなバッチのサイズは、入力ストリームに応じて、数千または数百万のタプルのオーダーになります。 このように、トライデントは、タプルごとの処理を実行するStormとは異なります。
バッチ処理の概念は、データベーストランザクションに非常に似ています。 すべてのトランザクションには、トランザクションIDが割り当てられます。 すべての処理が完了すると、トランザクションは成功したと見なされます。 ただし、トランザクションのタプルの1つを処理できないと、トランザクション全体が再送信されます。 Tridentは、バッチごとに、トランザクションの開始時にbeginCommitを呼び出し、トランザクションの終了時にコミットします。
トライデントトポロジ
Trident APIは、「TridentTopology」クラスを使用してTridentトポロジを作成する簡単なオプションを公開します。 基本的に、トライデントトポロジは、スパウトから入力ストリームを受信し、ストリームに対して順序付けられた操作シーケンス(フィルター、集約、グループ化など)を実行します。 Storm TupleはTrident Tupleに置き換えられ、Boltsは操作に置き換えられます。 シンプルなトライデントトポロジは、次のように作成できます-
トライデントタプル
トライデントタプルは、値の名前付きリストです。 TridentTupleインターフェイスは、Tridentトポロジのデータモデルです。 TridentTupleインターフェイスは、Tridentトポロジで処理できるデータの基本単位です。
トライデントスパウト
トライデントスパウトはストームスパウトに似ていますが、トライデントの機能を使用するための追加オプションがあります。 実際には、Stormトポロジで使用したIRichSpoutを引き続き使用できますが、本質的に非トランザクションであり、Tridentが提供する利点を使用することはできません。
Tridentの機能を使用するためのすべての機能を備えた基本的なスパウトは、「ITridentSpout」です。 トランザクションのセマンティクスと不透明なトランザクションのセマンティクスの両方をサポートします。 他のスパウトは、IBatchSpout、IPartitionedTridentSpout、およびIOpaquePartitionedTridentSpoutです。
これらの汎用スパウトに加えて、トライデントにはトライデントスパウトの多くのサンプル実装があります。 それらの1つはFeederBatchSpoutスパウトです。これを使用して、バッチ処理や並列処理などを心配することなく、トライデントタプルの名前付きリストを簡単に送信できます。
以下に示すようにFeederBatchSpoutの作成とデータフィードを行うことができます-
トライデント操作
トライデントは、「トライデント操作」に依存して、トライデントタプルの入力ストリームを処理します。 Trident APIには、単純から複雑なストリーム処理を処理するための多数の組み込み操作があります。 これらの操作は、単純な検証から複雑なグループ化とトライデントタプルの集約にまで及びます。 最も重要で頻繁に使用される操作について説明します。
フィルタ
フィルターは、入力検証のタスクを実行するために使用されるオブジェクトです。 Tridentフィルターは、トライデントタプルフィールドのサブセットを入力として取得し、特定の条件が満たされているかどうかに応じてtrueまたはfalseを返します。 trueが返された場合、タプルは出力ストリームに保持されます。それ以外の場合、タプルはストリームから削除されます。 フィルタは基本的に BaseFilter クラスを継承し、 isKeep メソッドを実装します。 ここにフィルター操作のサンプル実装があります-
フィルター関数は、「各」メソッドを使用してトポロジーで呼び出すことができます。 「フィールド」クラスを使用して、入力(トライデントタプルのサブセット)を指定できます。 サンプルコードは次のとおりです-
関数
- 関数*は、単一のトライデントタプルで簡単な操作を実行するために使用されるオブジェクトです。 これは、トライデントタプルフィールドのサブセットを取り、0個以上の新しいトライデントタプルフィールドを生成します。
Filter操作と同様に、Function操作は each メソッドを使用してトポロジで呼び出すことができます。 サンプルコードは次のとおりです-
集約
集約は、入力バッチ、パーティション、またはストリームで集約操作を実行するために使用されるオブジェクトです。 トライデントには3つのタイプの集約があります。 彼らは次のとおりです-
- aggregate -トライデントタプルの各バッチを分離して集約します。 集約プロセス中、最初にグローバルグループ化を使用してタプルが再パーティション化され、同じバッチのすべてのパーティションが単一のパーティションに結合されます。
- partitionAggregate -トライデントタプルのバッチ全体ではなく、各パーティションを集約します。 パーティション集合体の出力は、入力タプルを完全に置き換えます。 パーティション集合の出力には、単一のフィールドタプルが含まれます。
- persistentaggregate -すべてのバッチにわたるすべてのトライデントタプルで集計し、結果をメモリまたはデータベースに保存します。
集約操作は、CombinerAggregator、ReducerAggregator、または汎用Aggregatorインターフェイスのいずれかを使用して作成できます。 上記の例で使用されている「カウント」アグリゲーターは、組み込みアグリゲーターの1つです。 「CombinerAggregator」を使用して実装されます。 実装は次のとおりです-
グルーピング
グループ化操作は組み込みの操作であり、 groupBy メソッドによって呼び出すことができます。 groupByメソッドは、指定されたフィールドでpartitionByを実行してストリームを再パーティション化し、各パーティション内で、グループフィールドが等しいタプルをグループ化します。 通常、「groupBy」と「persistentAggregate」を使用して、グループ化された集計を取得します。 サンプルコードは次のとおりです-
結合と結合
マージと結合は、それぞれ「マージ」と「結合」メソッドを使用して実行できます。 マージは、1つ以上のストリームを結合します。 結合はマージに似ていますが、結合は2つのストリームをチェックして結合するために両側からトライデントタプルフィールドを使用するという事実を除きます。 さらに、結合はバッチレベルでのみ機能します。 サンプルコードは次のとおりです-
状態維持
トライデントは、状態を維持するメカニズムを提供します。 状態情報はトポロジ自体に保存できます。それ以外の場合は、別のデータベースに保存することもできます。 その理由は、処理中にタプルが失敗した場合、失敗したタプルが再試行されるという状態を維持するためです。 このタプルの状態が以前に更新されたかどうかわからないため、状態の更新中に問題が発生します。 状態を更新する前にタプルが失敗した場合、タプルを再試行すると状態が安定します。 ただし、状態の更新後にタプルが失敗した場合、同じタプルを再試行すると、データベース内のカウントが再び増加し、状態が不安定になります。 メッセージが一度だけ処理されるようにするには、次の手順を実行する必要があります-
- タプルを小さなバッチで処理します。
- 各バッチに一意のIDを割り当てます。 バッチが再試行される場合、同じ一意のIDが与えられます。
- 状態の更新はバッチ間で順序付けられます。 たとえば、最初のバッチの状態更新が完了するまで、2番目のバッチの状態更新はできません。
分散RPC
分散RPCは、トライデントトポロジから結果を照会および取得するために使用されます。 Stormには、組み込みの分散RPCサーバーがあります。 分散RPCサーバーは、クライアントからRPC要求を受信し、それをトポロジに渡します。 トポロジは要求を処理し、結果を分散RPCサーバーに送信します。分散RPCサーバーは、分散RPCサーバーによってクライアントにリダイレクトされます。 Tridentの分散RPCクエリは、これらのクエリが並行して実行されるという事実を除いて、通常のRPCクエリと同様に実行されます。
トライデントを使用する場合
多くのユースケースのように、クエリを1回だけ処理することが要件である場合、Tridentでトポロジを記述することでそれを実現できます。 一方、Stormの場合、正確に1回処理することは困難です。 したがって、トライデントは、一度だけ処理する必要があるユースケースに役立ちます。 Tridentは、Stormに複雑さを追加し、状態を管理するため、すべてのユースケース、特に高性能のユースケースに適しているわけではありません。
トライデントの実例
前のセクションで解決した通話ログアナライザーアプリケーションをTridentフレームワークに変換します。 トライデントアプリケーションは、その高レベルAPIのおかげで、単純な嵐に比べて比較的簡単です。 Stormは、基本的にTridentでFunction、Filter、Aggregate、GroupBy、Join、Mergeのいずれかの操作を実行するために必要になります。 最後に、 LocalDRPC クラスを使用してDRPCサーバーを起動し、LocalDRPCクラスの execute メソッドを使用していくつかのキーワードを検索します。
通話情報のフォーマット
FormatCallクラスの目的は、「発信者番号」と「受信者番号」で構成される通話情報をフォーマットすることです。 完全なプログラムコードは次のとおりです-
コーディング:FormatCall.java
CSVSplit
CSVSplitクラスの目的は、「カンマ(、)」に基づいて入力文字列を分割し、文字列内のすべての単語を出力することです。 この関数は、分散クエリの入力引数を解析するために使用されます。 完全なコードは次のとおりです-
コーディング:CSVSplit.java
ログアナライザー
これがメインアプリケーションです。 最初に、アプリケーションは FeederBatchSpout を使用してTridentTopologyを初期化し、呼び出し元情報をフィードします。 Trident Topologyクラスの newStream メソッドを使用して、Trident Topologyストリームを作成できます。 同様に、TridentTopologyクラスの newDRCPStream メソッドを使用して、TridentトポロジのDRPCストリームを作成できます。 LocalDRPCクラスを使用して、単純なDRCPサーバーを作成できます。 LocalDRPC には、キーワードを検索するためのexecuteメソッドがあります。 完全なコードは以下の通りです。
コーディング:LogAnalyserTrident.java
アプリケーションの構築と実行
完全なアプリケーションには3つのJavaコードがあります。 彼らは次のとおりです-
- FormatCall.java
- CSVSplit.java
- LogAnalyerTrident.java
アプリケーションは、次のコマンドを使用して構築することができます-
アプリケーションは、次のコマンドを使用して実行できます-
出力
アプリケーションが起動されると、アプリケーションはクラスターの起動プロセス、操作処理、DRPCサーバーとクライアントの情報、そして最後にクラスターのシャットダウンプロセスに関する完全な詳細を出力します。 この出力は、以下に示すようにコンソールに表示されます。