Apache-kafka-real-time-application
リアルタイムアプリケーション(Twitter)
リアルタイムアプリケーションを分析して、最新のtwitterフィードとそのハッシュタグを取得しましょう。 以前、StormおよびSparkとKafkaの統合を見てきました。 どちらのシナリオでも、Kafkaエコシステムにメッセージを送信するために(cliを使用して)Kafkaプロデューサーを作成しました。 次に、ストームとスパークの統合は、Kafkaコンシューマーを使用してメッセージを読み取り、ストームとスパークのエコシステムにそれぞれ注入します。 したがって、実際には、カフカプロデューサーを作成する必要があります。
- 「Twitter Streaming API」を使用してtwitterフィードを読んでください。
- フィードを処理し、
- HashTagsを抽出し、
- カフカに送ってください。
Kafkaが「ハッシュタグ」を受信すると、Storm/Spark統合が情報を受信し、Storm/Sparkエコシステムに送信します。
Twitter Streaming API
「Twitter Streaming API」には、任意のプログラミング言語でアクセスできます。 「twitter4j」はオープンソースの非公式Javaライブラリであり、「Twitter Streaming API」に簡単にアクセスするためのJavaベースのモジュールを提供します。 「twitter4j」は、ツイートにアクセスするためのリスナーベースのフレームワークを提供します。 「Twitter Streaming API」にアクセスするには、Twitter開発者アカウントにサインインする必要があり、次の OAuth 認証の詳細を取得する必要があります。
- 顧客キー
- CustomerSecret
- アクセストークン
- AccessTookenSecret
開発者アカウントが作成されたら、「twitter4j」jarファイルをダウンロードして、javaクラスパスに配置します。
完全なTwitter Kafkaプロデューサーコーディング(KafkaTwitterProducer.java)を以下に示します-
編集
次のコマンドを使用してアプリケーションをコンパイルします-
実行
2つのコンソールを開きます。 上記のコンパイル済みアプリケーションを、以下に示すように1つのコンソールで実行します。
前の章で説明したSpark/Stormアプリケーションのいずれかを別のwin-dowで実行します。 注意すべき主な点は、どちらの場合も同じトピックを使用する必要があるということです。 ここでは、トピック名として「my-first-topic」を使用しています。
出力
このアプリケーションの出力は、キーワードとTwitterの現在のフィードに依存します。 サンプル出力を以下に指定します(ストーム統合)。