Apache-flume-quick-guide
Apache Flume-はじめに
Flumeとは何ですか?
Apache Flumeは、ログファイル、イベントなどの大量のストリーミングデータを収集し、さまざまなソースから中央のデータストアに転送するためのツール/サービス/データ取り込みメカニズムです。
Flumeは、信頼性が高く、分散された、構成可能なツールです。 主に、さまざまなWebサーバーからHDFSにストリーミングデータ(ログデータ)をコピーするように設計されています。
Flumeのアプリケーション
eコマースWebアプリケーションが特定の地域の顧客行動を分析したいとします。 そのためには、利用可能なログデータを分析のためにHadoopに移動する必要があります。 ここで、Apache Flumeが助けになります。
Flumeは、アプリケーションサーバーによって生成されたログデータをより高速でHDFSに移動するために使用されます。
Flumeの利点
Flumeを使用する利点は次のとおりです-
- Apache Flumeを使用して、データを任意の中央ストア(HBase、HDFS)に保存できます。
- 着信データのレートがデータを宛先に書き込むことができるレートを超えると、Flumeはデータプロデューサーと中央ストア間のメディエーターとして機能し、ストア間でデータの安定したフローを提供します。
- Flumeは、*コンテキストルーティング*の機能を提供します。
- Flumeのトランザクションはチャネルベースであり、メッセージごとに2つのトランザクション(1つの送信者と1つの受信者)が維持されます。 信頼できるメッセージ配信を保証します。
- Flumeは、信頼性、耐障害性、拡張性、管理性、カスタマイズ性に優れています。
Flumeの機能
Flumeの注目すべき機能のいくつかは次のとおりです-
- Flumeは、複数のWebサーバーからログデータを中央ストア(HDFS、HBase)に効率的に取り込みます。
- Flumeを使用すると、複数のサーバーからHadoopにデータをすぐに取得できます。
- ログファイルに加えて、Flumeは、FacebookやTwitterなどのソーシャルネットワーキングサイトや、AmazonやFlipkartなどのeコマースWebサイトで生成された大量のイベントデータをインポートするためにも使用されます。
- Flumeは、多数のソースと宛先のタイプをサポートしています。
- Flumeは、マルチホップフロー、ファンインファンアウトフロー、コンテキストルーティングなどをサポートしています。
- Flumeは水平方向に拡大縮小できます。
Apache Flume-Hadoopでのデータ転送
ビッグデータ*は、私たちが知っているように、従来のコンピューティング技術では処理できない大きなデータセットのコレクションです。 ビッグデータを分析すると、貴重な結果が得られます。 *Hadoop は、シンプルなプログラミングモデルを使用してコンピューターのクラスター間で分散環境にビッグデータを格納および処理できるオープンソースフレームワークです。
ストリーミング/ログデータ
一般に、分析されるデータのほとんどは、アプリケーションサーバー、ソーシャルネットワーキングサイト、クラウドサーバー、エンタープライズサーバーなどのさまざまなデータソースによって生成されます。 このデータは、*ログファイル*および*イベント*の形式になります。
ログファイル-一般に、ログファイルはオペレーティングシステムで発生するイベント/アクションをリストする*ファイル*です。 たとえば、Webサーバーは、ログファイルにサーバーに対して行われたすべての要求をリストします。
そのようなログデータを収集すると、次の情報を取得できます-
- アプリケーションのパフォーマンスと、さまざまなソフトウェアおよびハードウェアの障害の特定。
- ユーザーの行動を把握し、より良いビジネス洞察を引き出します。
データをHDFSシステムに転送する従来の方法は、 put コマンドを使用することです。 put コマンドの使用方法を見てみましょう。
HDFS putコマンド
ログデータの処理における主な課題は、複数のサーバーで生成されたこれらのログをHadoop環境に移動することです。
Hadoop File System Shell は、Hadoopにデータを挿入し、そこからデータを読み取るコマンドを提供します。 以下に示すように、 put コマンドを使用して、Hadoopにデータを挿入できます。
$ Hadoop fs –put/path of the required file /path in HDFS where to save the file
putコマンドの問題
Hadoopの put コマンドを使用して、これらのソースからHDFSにデータを転送できます。 しかし、それは次の欠点に苦しんでいます-
- put コマンドを使用すると、一度に1つのファイルのみを転送できますが、データジェネレーターははるかに高いレートでデータを生成します。 古いデータで行われた分析の精度は低いため、リアルタイムでデータを転送するソリューションが必要です。
- put コマンドを使用する場合、データをパッケージ化する必要があり、アップロードの準備ができている必要があります。 Webサーバーは継続的にデータを生成するため、非常に難しいタスクです。
ここで必要なのは、 put コマンドの欠点を克服し、「ストリーミングデータ」をデータジェネレーターから集中ストア(特にHDFS)に少ない遅延で転送できるソリューションです。
HDFSの問題
HDFSでは、ファイルはディレクトリエントリとして存在し、ファイルが閉じられるまでファイルの長さはゼロと見なされます。 たとえば、ソースがHDFSにデータを書き込んでおり、ネットワークが操作の途中で(ファイルを閉じずに)中断された場合、ファイルに書き込まれたデータは失われます。
したがって、ログデータをHDFSに転送するには、信頼性が高く、構成可能で、保守可能なシステムが必要です。
注意-POSIXファイルシステムでは、ファイルにアクセスする(書き込み操作を実行するなど)ときはいつでも、他のプログラムはこのファイル(少なくともファイルの保存された部分)を読み取ることができます。 これは、ファイルが閉じる前にディスク上に存在するためです。
利用可能なソリューション
ストリーミングデータ(ログファイル、イベントなど)をさまざまなソースからHDFSに送信するには、次のツールを自由に使用できます-
Facebookのスクライブ
Scribeは、ログデータの集約とストリーミングに使用される非常に人気のあるツールです。 非常に多数のノードに対応し、ネットワークおよびノードの障害に強いように設計されています。
アパッチカフカ
Kafkaは、Apache Software Foundationによって開発されました。 オープンソースのメッセージブローカーです。 Kafkaを使用すると、フィードを高スループットで低遅延で処理できます。
Apache Flume
Apache Flumeは、ログデータ、イベントなどの大量のストリーミングデータを収集し、さまざまなWebserveから中央のデータストアに転送するためのツール/サービス/データ取り込みメカニズムです。
これは、主にさまざまなソースからHDFSにストリーミングデータを転送するように設計された、信頼性の高い分散型の構成可能なツールです。
このチュートリアルでは、Flumeの使用方法をいくつかの例とともに詳しく説明します。
Apache Flume-アーキテクチャ
次の図は、Flumeの基本アーキテクチャを示しています。 図に示すように、データジェネレータ(Facebook、Twitterなど)は、それらで実行されている個々のFlume エージェント*によって収集されるデータを生成します。 その後、*データコレクター(エージェントでもあります)がエージェントからデータを収集し、集約され、HDFSやHBaseなどの中央ストアにプッシュされます。
水路イベント
イベント*は *Flume 内で転送されるデータの基本単位です。 これには、オプションのヘッダーを伴ってソースから宛先に転送されるバイト配列のペイロードが含まれています。 典型的なFlumeイベントは次の構造を持つでしょう-
Flumeエージェント
エージェント*は、Flumeの独立したデーモンプロセス(JVM)です。 クライアントまたは他のエージェントからデータ(イベント)を受信し、次の宛先(シンクまたはエージェント)に転送します。 Flumeには複数のエージェントが含まれる場合があります。 次の図は *Flume Agent を表しています
図に示すように、Flume Agentには、 source 、 channel 、および sink という3つの主要コンポーネントが含まれています。
ソース
- ソース*は、データジェネレーターからデータを受信し、Flumeイベントの形式で1つ以上のチャネルに転送するエージェントのコンポーネントです。
Apache Flumeはいくつかのタイプのソースをサポートし、各ソースは指定されたデータジェネレーターからイベントを受け取ります。
例-Avroソース、Thriftソース、twitter 1%ソースなど
チャネル
- チャネル*は、ソースからイベントを受信し、シンクによって消費されるまでそれらをバッファリングする一時ストアです。 ソースとシンクの間のブリッジとして機能します。
これらのチャネルは完全にトランザクショナルであり、任意の数のソースおよびシンクと連携できます。
例-JDBCチャネル、ファイルシステムチャネル、メモリチャネルなど
Sink
- シンク*は、HBaseやHDFSなどの中央ストアにデータを保存します。 チャネルからのデータ(イベント)を消費し、宛先に配信します。 シンクの宛先は、別のエージェントまたは中央ストアである可能性があります。
例-HDFSシンク
注-flumeエージェントは、複数のソース、シンク、およびチャネルを持つことができます。 このチュートリアルのFlume構成の章に、サポートされているすべてのソース、シンク、チャネルをリストしました。
Flumeエージェントの追加コンポーネント
上記で説明したのは、エージェントの基本的なコンポーネントです。 これに加えて、イベントをデータジェネレーターから中央ストアに転送する際に重要な役割を果たすコンポーネントがいくつかあります。
インターセプター
インターセプターは、ソースとチャネル間で転送される水路イベントを変更/検査するために使用されます。
チャンネルセレクター
これらは、複数のチャネルの場合にデータを転送するために選択されるチャネルを決定するために使用されます。 チャネルセレクタには2種類あります-
- デフォルトチャネルセレクタ-これらは、各チャネルのすべてのイベントを複製するチャネルセレクタの複製とも呼ばれます。
- 多重化チャネルセレクタ-これらは、イベントのヘッダー内のアドレスに基づいてイベントを送信するチャネルを決定します。
シンクプロセッサー
これらは、選択したシンクグループから特定のシンクを呼び出すために使用されます。 これらは、シンクのフェールオーバーパスを作成したり、チャネルから複数のシンクにわたってイベントをロードバランスしたりするために使用されます。
Apache Flume-データフロー
Flumeは、ログデータをHDFSに移動するために使用されるフレームワークです。 通常、イベントとログデータはログサーバーによって生成され、これらのサーバーにはFlumeエージェントが実行されています。 これらのエージェントは、データジェネレータからデータを受け取ります。
これらのエージェントのデータは、*コレクター*として知られる中間ノードによって収集されます。 エージェントと同様に、Flumeには複数のコレクターが存在する場合があります。
最後に、これらのすべてのコレクターからのデータが集約され、HBaseやHDFSなどの中央ストアにプッシュされます。 次の図は、Flumeのデータフローを説明しています。
マルチホップフロー
Flumeでは、複数のエージェントが存在する可能性があり、最終目的地に到達する前に、イベントが複数のエージェントを経由する場合があります。 これは*マルチホップフロー*と呼ばれます。
ファンアウトフロー
1つのソースから複数のチャネルへのデータフローは*ファンアウトフロー*と呼ばれます。 それは2つのタイプです-
- 複製-データが設定されたすべてのチャネルで複製されるデータフロー。
- Multiplexing -イベントのヘッダーに記載されている選択されたチャネルにデータが送信されるデータフロー。
ファンインフロー
データが多くのソースから1つのチャネルに転送されるデータフローは、「ファンインフロー」と呼ばれます。
障害処理
Flumeでは、イベントごとに、送信者と受信者の2つのトランザクションが発生します。 送信者はイベントを受信者に送信します。 データを受信するとすぐに、受信者は自身のトランザクションをコミットし、「受信した」信号を送信者に送信します。 シグナルを受信した後、送信者はトランザクションをコミットします。 (送信者は、受信者から信号を受信するまでトランザクションをコミットしません。)
Apache Flume-環境
前の章でFlumeのアーキテクチャについてはすでに説明しました。 この章では、Apache Flumeをダウンロードしてセットアップする方法を見てみましょう。
さらに先に進む前に、システムにJava環境が必要です。 まず最初に、システムにJavaがインストールされていることを確認してください。 このチュートリアルのいくつかの例では、Hadoop HDFS(シンクとして)を使用しました。 したがって、JavaとともにHadoopをインストールすることをお勧めします。 詳細情報を収集するには、リンクhttp://www.finddevguides.com/hadoop/hadoop_enviornment_setupにアクセスしてください。
Flumeのインストール
まず、Webサイトhttps://flume.apache.org/からApache Flumeソフトウェアの最新バージョンをダウンロードします。
ステップ1
ウェブサイトを開きます。 ホームページの左側にある[ダウンロード]リンクをクリックします。 Apache Flumeのダウンロードページに移動します。
ステップ2
ミラーのリストにリダイレクトされ、これらのミラーのいずれかをクリックしてダウンロードを開始できます。 同様に、http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-src.tarをクリックして、Apache Flumeのソースコードをダウンロードできます。 .gz [apache-flume-1.6.0-src.tar.gz]。
ステップ3
以下に示すように、 Hadoop 、 HBase 、およびその他のソフトウェアのインストールディレクトリ(インストール済みの場合)と同じディレクトリにFlumeという名前のディレクトリを作成します。
$ mkdir Flume
ステップ4
以下に示すように、ダウンロードしたtarファイルを解凍します。
$ cd Downloads/
$ tar zxvf apache-flume-1.6.0-bin.tar.gz
$ tar zxvf apache-flume-1.6.0-src.tar.gz
ステップ5
以下に示すように、apache- flume-1.6.0-bin.tar ファイルのコンテンツを先ほど作成した Flume ディレクトリに移動します。 (Hadoopという名前のローカルユーザーにFlumeディレクトリを作成したと仮定します。)
$ mv apache-flume-1.6.0-bin.tar/*/home/Hadoop/Flume/
Flumeの構成
Flumeを構成するには、3つのファイル、つまり* flume-env.sh、flumeconf.properties、、および *bash.rc を変更する必要があります。
パス/クラスパスの設定
*.bashrc* ファイルで、以下に示すように、Flumeのホームフォルダー、パス、およびクラスパスを設定します。
confフォルダー
Apache Flumeの conf フォルダーを開くと、次の4つのファイルがあります-
- flume-conf.properties.template、
- flume-env.sh.template、
- flume-env.ps1.template、および
- log4j.properties。
名前を変更
- flume-conf.properties としての flume-conf.properties.template ファイル
- flume-env.sh.template as flume-env.sh
flume-env.sh
*flume-env.sh* ファイルを開き、 *JAVA_Home* をシステムでJavaがインストールされたフォルダーに設定します。
インストールの検証
*bin* フォルダーを参照し、次のコマンドを入力して、Apache Flumeのインストールを確認します。
$ ./flume-ng
Flumeを正常にインストールすると、以下に示すようにFlumeのヘルププロンプトが表示されます。
Apache Flume-構成
Flumeをインストールしたら、* key-valueペア*を持つJavaプロパティファイルである構成ファイルを使用して構成する必要があります。 ファイルのキーに値を渡す必要があります。
Flume構成ファイルでは、次のことが必要です-
- 現在のエージェントのコンポーネントに名前を付けます。
- ソースを説明/構成します。
- シンクを説明/構成します。
- チャネルを説明/構成します。
- ソースとシンクをチャネルにバインドします。
通常、Flumeには複数のエージェントを配置できます。 一意の名前を使用して、各エージェントを区別できます。 そして、この名前を使用して、各エージェントを構成する必要があります。
コンポーネントの命名
まず最初に、以下に示すように、エージェントのソース、シンク、チャネルなどのコンポーネントに名前を付けてリストする必要があります。
agent_name.sources = source_name
agent_name.sinks = sink_name
agent_name.channels = channel_name
Flumeは、さまざまなソース、シンク、およびチャネルをサポートしています。 それらは以下の表にリストされています。
Sources | Channels | Sinks |
---|---|---|
|
|
|
どれでも使用できます。 たとえば、Twitterソースを使用してメモリチャネルを介してTwitterデータをHDFSシンクに転送し、エージェント名idが TwitterAgent の場合、
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
エージェントのコンポーネントを一覧表示した後、プロパティに値を指定して、ソース、シンク、およびチャネルを説明する必要があります。
ソースの説明
各ソースには、プロパティの個別のリストがあります。 「type」という名前のプロパティはすべてのソースに共通であり、使用しているソースのタイプを指定するために使用されます。
以下に示すように、プロパティ「type」とともに、特定のソースのすべての*必須*プロパティの値を提供して設定する必要があります。
agent_name.sources. source_name.type = value
agent_name.sources. source_name.property2 = value
agent_name.sources. source_name.property3 = value
たとえば、* twitterソース*を検討する場合、設定する値を提供する必要があるプロパティは次のとおりです。
TwitterAgent.sources.Twitter.type = Twitter (type name)
TwitterAgent.sources.Twitter.consumerKey =
TwitterAgent.sources.Twitter.consumerSecret =
TwitterAgent.sources.Twitter.accessToken =
TwitterAgent.sources.Twitter.accessTokenSecret =
シンクの説明
ソースと同様に、各シンクにはプロパティの個別のリストがあります。 「type」という名前のプロパティはすべてのシンクに共通であり、使用しているシンクのタイプを指定するために使用されます。 以下に示すように、プロパティ "type"と共に、特定のシンクのすべての*必須*プロパティに値を提供して構成する必要があります。
agent_name.sinks. sink_name.type = value
agent_name.sinks. sink_name.property2 = value
agent_name.sinks. sink_name.property3 = value
たとえば、* HDFSシンク*を検討する場合、設定する値を提供する必要があるプロパティは次のとおりです。
TwitterAgent.sinks.HDFS.type = hdfs (type name)
TwitterAgent.sinks.HDFS.hdfs.path = HDFS directory’s Path to store the data
チャンネルの説明
Flumeは、ソースとシンク間でデータを転送するためのさまざまなチャネルを提供します。 したがって、ソースおよびチャネルとともに、エージェントで使用されるチャネルを記述する必要があります。
各チャネルを説明するには、以下に示すように、必要なプロパティを設定する必要があります。
agent_name.channels.channel_name.type = value
agent_name.channels.channel_name. property2 = value
agent_name.channels.channel_name. property3 = value
たとえば、*メモリチャネル*を考慮する場合、設定する値を提供する必要があるプロパティは次のとおりです。
TwitterAgent.channels.MemChannel.type = memory (type name)
ソースとシンクをチャネルにバインドする
チャネルはソースとシンクを接続するため、以下に示すように、両方をチャネルにバインドする必要があります。
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channels = channel_name
次の例は、ソースとシンクをチャネルにバインドする方法を示しています。 ここでは、* twitterソース、メモリチャネル*、および* HDFSシンク*を検討します。
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channels = MemChannel
Flumeエージェントの開始
構成後、Flumeエージェントを開始する必要があります。 それは次のように行われます-
$ bin/flume-ng agent --conf ./conf/-f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent
ここで-
- agent -Flumeエージェントを開始するコマンド
- *-conf、-c <conf> *-confディレクトリの構成ファイルを使用
- -f <file> -欠落している場合、構成ファイルのパスを指定します
- *-name、-n <name> *-twitterエージェントの名前
- -D property = value -Javaシステムプロパティ値を設定します。
Apache Flume-Twitterデータの取得
Flumeを使用して、さまざまなサービスからデータを取得し、それを中央ストア(HDFSおよびHBase)に転送できます。 この章では、Apache Flumeを使用してTwitterサービスからデータを取得し、HDFSに保存する方法について説明します。
Flumeアーキテクチャで説明したように、Webサーバーはログデータを生成し、このデータはFlumeのエージェントによって収集されます。 チャネルはこのデータをシンクにバッファリングし、シンクは最終的に中央ストアにプッシュします。
この章で提供する例では、アプリケーションを作成し、Apache Flumeが提供する実験的なtwitterソースを使用してツイートを取得します。 メモリチャネルを使用してこれらのツイートをバッファリングし、HDFSシンクを使用してこれらのツイートをHDFSにプッシュします。
Twitterデータを取得するには、以下の手順に従う必要があります-
- twitterアプリケーションを作成する
- HDFSのインストール/開始
- Flumeを構成する
Twitterアプリケーションの作成
Twitterからツイートを取得するには、Twitterアプリケーションを作成する必要があります。 以下の手順に従って、Twitterアプリケーションを作成します。
ステップ1
Twitterアプリケーションを作成するには、次のリンクhttps://apps.twitter.com/をクリックします。 Twitterアカウントにサインインします。 Twitterアプリケーションを作成、削除、管理できるTwitterアプリケーション管理ウィンドウが表示されます。
ステップ2
*Create New App* ボタンをクリックします。 ウィンドウにリダイレクトされ、アプリケーションを作成するために詳細を入力する必要があるアプリケーションフォームが表示されます。 Webサイトのアドレスを入力するときに、完全なURLパターン(たとえば、http://example.com/[http://example.com。])を指定します
ステップ3
詳細を入力し、終了したら Developer Agreement に同意し、ページの下部にある* Create your Twitter applicationボタン*をクリックします。 すべてがうまくいくと、以下に示すように、指定された詳細でアプリが作成されます。
ステップ4
ページの下部にある keys and Access Tokens タブで、 Create my access token という名前のボタンを確認できます。 それをクリックして、アクセストークンを生成します。
ステップ5
最後に、ページの右上にある[OAuthのテスト]ボタンをクリックします。 これにより、コンシューマキー、コンシューマシークレット、アクセストークン、および*アクセストークンシークレット*を表示するページが表示されます。 これらの詳細をコピーします。 これらは、Flumeでエージェントを構成するのに役立ちます。
HDFSの開始
HDFSにデータを保存しているため、Hadoopをインストール/検証する必要があります。 Hadoopを起動し、Flumeデータを保存するフォルダーを作成します。 Flumeを設定する前に、以下の手順に従ってください。
ステップ1:Hadoopのインストール/検証
Hadoopをインストールします。 Hadoopがシステムにすでにインストールされている場合、以下に示すように、Hadoop versionコマンドを使用してインストールを確認します。
$ hadoop version
システムにHadoopが含まれていて、パス変数を設定している場合、次の出力が得られます-
Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using/home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar
ステップ2:Hadoopを開始する
Hadoopの sbin ディレクトリを参照し、以下に示すようにyarnとHadoop dfs(分散ファイルシステム)を起動します。
cd/$Hadoop_Home/sbin/
$ start-dfs.sh
localhost: starting namenode, logging to
/home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out
localhost: starting datanode, logging to
/home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out
Starting secondary namenodes [0.0.0.0]
starting secondarynamenode, logging to
/home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out
$ start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to
/home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out
localhost: starting nodemanager, logging to
/home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out
ステップ3:HDFSでディレクトリを作成する
Hadoop DFSでは、 mkdir コマンドを使用してディレクトリを作成できます。 それを参照し、以下に示すように、必要なパスに twitter_data という名前のディレクトリを作成します。
$cd/$Hadoop_Home/bin/
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data
Flumeの構成
*conf* フォルダー内の構成ファイルを使用して、ソース、チャネル、およびシンクを構成する必要があります。 この章の例では、Apache Flumeが提供する *Twitter 1%Firehose* メモリチャネルとHDFSシンクという実験ソースを使用します。
Twitter 1%Firehoseソース
このソースは非常に実験的です。 ストリーミングAPIを使用して1%のサンプルTwitter Firehoseに接続し、継続的にツイートをダウンロードし、Avro形式に変換して、AvroイベントをダウンストリームFlumeシンクに送信します。
Flumeのインストールとともに、デフォルトでこのソースを取得します。 このソースに対応する jar ファイルは、以下に示すように lib フォルダーにあります。
クラスパスを設定する
以下に示すように、 classpath 変数を Flume-env.sh ファイルでFlumeの lib フォルダーに設定します。
export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/*
このソースには、Twitterアプリケーションの Consumer key、Consumer secret、Access token 、 Access token secret などの詳細が必要です。 このソースを設定している間、次のプロパティに値を提供する必要があります-
- チャンネル
- *ソースタイプ:org.apache.flume.source.twitter.TwitterSource *
- consumerKey -OAuthコンシューマキー
- consumerSecret -OAuthコンシューマシークレット
- accessToken -OAuthアクセストークン
- accessTokenSecret -OAuthトークンシークレット
- maxBatchSize -twitterバッチに含まれるtwitterメッセージの最大数。 デフォルト値は1000(オプション)です。
- maxBatchDurationMillis -バッチを閉じる前に待機する最大ミリ秒数。 デフォルト値は1000(オプション)です。
チャネル
メモリチャネルを使用しています。 メモリチャネルを設定するには、チャネルのタイプに値を提供する必要があります。
- type -チャネルのタイプを保持します。 この例では、タイプは MemChannel です。
- 容量-チャネルに保存されるイベントの最大数です。 デフォルト値は100(オプション)です。
- TransactionCapacity -チャネルが受け入れるまたは送信するイベントの最大数です。 デフォルト値は100(オプション)です。
HDFSシンク
このシンクはデータをHDFSに書き込みます。 このシンクを設定するには、次の詳細を提供する必要があります。
- チャネル
- タイプ-hdfs
- hdfs.path -データが保存されるHDFSのディレクトリのパス。
そして、シナリオに基づいていくつかのオプションの値を提供できます。 以下に示すのは、アプリケーションで構成しているHDFSシンクのオプションのプロパティです。
- fileType -これは、HDFSファイルに必要なファイル形式です。 SequenceFile、DataStream 、および CompressedStream は、このストリームで使用できる3つのタイプです。 この例では、 DataStream を使用しています。
- writeFormat -テキストまたは書き込み可能。
- batchSize -HDFSにフラッシュされる前にファイルに書き込まれるイベントの数です。 デフォルト値は100です。
- rollsize -ロールをトリガーするファイルサイズです。 デフォルト値は100です。
- rollCount -ロールされる前にファイルに書き込まれたイベントの数です。 デフォルト値は10です。
例–構成ファイル
以下に、構成ファイルの例を示します。 このコンテンツをコピーして、Flumeのconfフォルダーに twitter.conf として保存します。
# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
# Describing/Configuring the sink
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel
実行
Flumeホームディレクトリを参照し、以下に示すようにアプリケーションを実行します。
$ cd $FLUME_HOME
$ bin/flume-ng agent --conf ./conf/-f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent
すべてがうまくいけば、HDFSへのツイートのストリーミングが開始されます。 以下は、ツイートを取得する際のコマンドプロンプトウィンドウのスナップショットです。
HDFSの検証
以下に示すURLを使用して、Hadoop管理Web UIにアクセスできます。
http://localhost:50070/
ページの右側にある Utilities という名前のドロップダウンをクリックします。 以下のスナップショットに示すように、2つのオプションを見ることができます。
[ファイルシステムの参照]をクリックし、ツイートを保存したHDFSディレクトリのパスを入力します。 この例では、パスは */user/Hadoop/twitter_data/ になります。 次に、以下に示すように、HDFSに保存されているtwitterログファイルのリストを確認できます。
Apache Flume-シーケンスジェネレーターソース
前の章で、twitterソースからHDFSにデータを取得する方法を見てきました。 この章では、 Sequence generator からデータを取得する方法について説明します。
前提条件
この章で提供される例を実行するには、 HDFS と Flume をインストールする必要があります。 したがって、先に進む前に、Hadoopのインストールを確認し、HDFSを開始してください。 (HDFSの起動方法については、前の章を参照してください)。
Flumeの構成
*conf* フォルダー内の構成ファイルを使用して、ソース、チャネル、およびシンクを構成する必要があります。 この章の例では、 *sequence generator source* 、 *memory channel* 、および *HDFS sink* を使用します。
シーケンスジェネレーターソース
イベントを継続的に生成するのはソースです。 0から始まり1ずつ増加するカウンターを維持します。 テスト目的で使用されます。 このソースを設定している間、次のプロパティに値を提供する必要があります-
- チャンネル
- ソースタイプ-seq
チャネル
*memory* チャネルを使用しています。 メモリチャネルを設定するには、チャネルのタイプに値を提供する必要があります。 以下は、メモリチャネルの設定中に指定する必要があるプロパティのリストです-
- type -チャネルのタイプを保持します。 この例では、タイプはMemChannelです。
- 容量-チャネルに保存されるイベントの最大数です。 デフォルト値は100です。 (オプション)
- TransactionCapacity -チャネルが受け入れるまたは送信するイベントの最大数です。 デフォルトは100です。 (オプション)。
HDFSシンク
このシンクはデータをHDFSに書き込みます。 このシンクを設定するには、次の詳細を提供する必要があります。
- チャネル
- タイプ-hdfs
- hdfs.path -データが保存されるHDFSのディレクトリのパス。
そして、シナリオに基づいていくつかのオプションの値を提供できます。 以下に示すのは、アプリケーションで構成しているHDFSシンクのオプションのプロパティです。
- fileType -これは、HDFSファイルに必要なファイル形式です。 SequenceFile、DataStream 、および CompressedStream は、このストリームで使用できる3つのタイプです。 この例では、 DataStream を使用しています。
- writeFormat -テキストまたは書き込み可能。
- batchSize -HDFSにフラッシュされる前にファイルに書き込まれるイベントの数です。 デフォルト値は100です。
- rollsize -ロールをトリガーするファイルサイズです。 デフォルト値は100です。
- rollCount -ロールされる前にファイルに書き込まれたイベントの数です。 デフォルト値は10です。
例–構成ファイル
以下に、構成ファイルの例を示します。 このコンテンツをコピーして、Flumeのconfフォルダーに seq_gen .conf として保存します。
# Naming the components on the current agent
SeqGenAgent.sources = SeqSource
SeqGenAgent.channels = MemChannel
SeqGenAgent.sinks = HDFS
# Describing/Configuring the source
SeqGenAgent.sources.SeqSource.type = seq
# Describing/Configuring the sink
SeqGenAgent.sinks.HDFS.type = hdfs
SeqGenAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/seqgen_data/
SeqGenAgent.sinks.HDFS.hdfs.filePrefix = log
SeqGenAgent.sinks.HDFS.hdfs.rollInterval = 0
SeqGenAgent.sinks.HDFS.hdfs.rollCount = 10000
SeqGenAgent.sinks.HDFS.hdfs.fileType = DataStream
# Describing/Configuring the channel
SeqGenAgent.channels.MemChannel.type = memory
SeqGenAgent.channels.MemChannel.capacity = 1000
SeqGenAgent.channels.MemChannel.transactionCapacity = 100
# Binding the source and sink to the channel
SeqGenAgent.sources.SeqSource.channels = MemChannel
SeqGenAgent.sinks.HDFS.channel = MemChannel
実行
Flumeホームディレクトリを参照し、以下に示すようにアプリケーションを実行します。
$ cd $FLUME_HOME
$./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/seq_gen.conf
--name SeqGenAgent
すべてが順調に進むと、ソースはシーケンス番号の生成を開始し、ログファイルの形式でHDFSにプッシュされます。
以下は、シーケンスジェネレーターによって生成されたデータをHDFSにフェッチするコマンドプロンプトウィンドウのスナップショットです。
HDFSの検証
次のURLを使用して、Hadoop管理Web UIにアクセスできます-
http://localhost:50070/
ページの右側にある Utilities という名前のドロップダウンをクリックします。 以下の図に示すように、2つのオプションがあります。
[*ファイルシステムの参照]をクリックし、シーケンスジェネレーターによって生成されたデータを保存したHDFSディレクトリのパスを入力します。
この例では、パスは /user/Hadoop/seqgen_data/ になります。 次に、以下に示すように、HDFSに保存されているシーケンスジェネレーターによって生成されたログファイルのリストを確認できます。
ファイルの内容の検証
これらのすべてのログファイルには、順次形式の番号が含まれています。 以下に示す cat コマンドを使用して、ファイルシステム内のこれらのファイルの内容を確認できます。
Apache Flume-NetCatソース
この章では、イベントを生成し、その後コンソールにログインする方法を説明する例を取り上げます。 このために、 NetCat ソースと logger シンクを使用しています。
前提条件
この章で提供される例を実行するには、 Flume をインストールする必要があります。
Flumeの構成
*conf* フォルダー内の構成ファイルを使用して、ソース、チャネル、およびシンクを構成する必要があります。 この章の例では、* NetCat Source、Memoryチャネル*、および *logger sink* を使用しています。
NetCatソース
NetCatソースの設定中に、ソースの設定中にポートを指定する必要があります。 これで、ソース(NetCatソース)は指定されたポートをリッスンし、そのポートに入力された各行を個別のイベントとして受信し、指定されたチャネルを介してシンクに転送します。
このソースを設定している間、次のプロパティに値を提供する必要があります-
- チャンネル
- ソースタイプ-netcat
- bind -バインドするホスト名またはIPアドレス。
- port -ソースがリッスンするポート番号。
チャネル
*memory* チャネルを使用しています。 メモリチャネルを設定するには、チャネルのタイプに値を提供する必要があります。 以下は、メモリチャネルの設定中に指定する必要があるプロパティのリストです-
- type -チャネルのタイプを保持します。 この例では、タイプは MemChannel です。
- 容量-チャネルに保存されるイベントの最大数です。 デフォルト値は100です。 (オプション)
- TransactionCapacity -チャネルが受け入れるまたは送信するイベントの最大数です。 デフォルト値は100です。 (オプション)。
ロガーシンク
このシンクは、渡されたすべてのイベントを記録します。 通常、テストまたはデバッグの目的で使用されます。 このシンクを構成するには、次の詳細を指定する必要があります。
- チャネル
- type -ロガー
設定ファイルの例
以下に、構成ファイルの例を示します。 このコンテンツをコピーし、Flumeのconfフォルダーに netcat.conf として保存します。
# Naming the components on the current agent
NetcatAgent.sources = Netcat
NetcatAgent.channels = MemChannel
NetcatAgent.sinks = LoggerSink
# Describing/Configuring the source
NetcatAgent.sources.Netcat.type = netcat
NetcatAgent.sources.Netcat.bind = localhost
NetcatAgent.sources.Netcat.port = 56565
# Describing/Configuring the sink
NetcatAgent.sinks.LoggerSink.type = logger
# Describing/Configuring the channel
NetcatAgent.channels.MemChannel.type = memory
NetcatAgent.channels.MemChannel.capacity = 1000
NetcatAgent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
NetcatAgent.sources.Netcat.channels = MemChannel
NetcatAgent.sinks.LoggerSink.channel = MemChannel
実行
Flumeホームディレクトリを参照し、以下に示すようにアプリケーションを実行します。
$ cd $FLUME_HOME
$ ./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/netcat.conf
--name NetcatAgent -Dflume.root.logger=INFO,console
すべてがうまくいくと、ソースは指定されたポートのリッスンを開始します。 この場合、 56565 です。 以下は、ポート56565を開始してリッスンしているNetCatソースのコマンドプロンプトウィンドウのスナップショットです。
データをソースに渡す
データをNetCatソースに渡すには、構成ファイルで指定されたポートを開く必要があります。 別のターミナルを開き、 curl コマンドを使用してソース(56565)に接続します。 接続が成功すると、次のように「 connected 」というメッセージが表示されます。
$ curl telnet://localhost:56565
connected
これで、行ごとにデータを入力できます(各行の後にEnterキーを押す必要があります)。 NetCatソースは各行を個別のイベントとして受信し、受信したメッセージ「 OK 」を取得します。
データの受け渡しが完了したら、( Ctrl&plus; C )を押してコンソールを終了できます。 以下に示すのは、 curl コマンドを使用してソースに接続したコンソールのスナップショットです。
上記のコンソールに入力された各行は、ソースによって個別のイベントとして受信されます。 Logger シンクを使用したため、これらのイベントは指定されたチャネル(この場合はメモリチャネル)を介してコンソール(ソースコンソール)にログオンされます。
次のスナップショットは、イベントが記録されるNetCatコンソールを示しています。