Apache-flume-fetching-twitter-data

提供:Dev Guides
移動先:案内検索

Apache Flume-Twitterデータの取得

Flumeを使用して、さまざまなサービスからデータを取得し、それを中央ストア(HDFSおよびHBase)に転送できます。 この章では、Apache Flumeを使用してTwitterサービスからデータを取得し、HDFSに保存する方法について説明します。

Flumeアーキテクチャで説明したように、Webサーバーはログデータを生成し、このデータはFlumeのエージェントによって収集されます。 チャネルはこのデータをシンクにバッファリングし、シンクは最終的に中央ストアにプッシュします。

この章で提供する例では、アプリケーションを作成し、Apache Flumeが提供する実験的なtwitterソースを使用してツイートを取得します。 メモリチャネルを使用してこれらのツイートをバッファリングし、HDFSシンクを使用してこれらのツイートをHDFSにプッシュします。

データの取得

Twitterデータを取得するには、以下の手順に従う必要があります-

  • twitterアプリケーションを作成する
  • HDFSのインストール/開始
  • Flumeを構成する

Twitterアプリケーションの作成

Twitterからツイートを取得するには、Twitterアプリケーションを作成する必要があります。 以下の手順に従って、Twitterアプリケーションを作成します。

ステップ1

Twitterアプリケーションを作成するには、次のリンクhttps://apps.twitter.com/をクリックします。 Twitterアカウントにサインインします。 Twitterアプリケーションを作成、削除、管理できるTwitterアプリケーション管理ウィンドウが表示されます。

アプリケーション管理ウィンドウ

ステップ2

*Create New App* ボタンをクリックします。 ウィンドウにリダイレクトされ、アプリケーションを作成するために詳細を入力する必要があるアプリケーションフォームが表示されます。 Webサイトのアドレスを入力するときに、完全なURLパターン(たとえば、http://example.com/[http://example.com。])を指定します

アプリケーションの作成

ステップ3

詳細を入力し、終了したら Developer Agreement に同意し、ページの下部にある* Create your Twitter applicationボタン*をクリックします。 すべてがうまくいくと、以下に示すように、指定された詳細でアプリが作成されます。

作成されたアプリケーション

ステップ4

ページの下部にある keys and Access Tokens タブで、 Create my access token という名前のボタンを確認できます。 それをクリックして、アクセストークンを生成します。

キーアクセストークン

ステップ5

最後に、ページの右上にある[OAuthのテスト]ボタンをクリックします。 これにより、コンシューマキー、コンシューマシークレット、アクセストークン、および*アクセストークンシークレット*を表示するページが表示されます。 これらの詳細をコピーします。 これらは、Flumeでエージェントを構成するのに役立ちます。

OAuthツール

HDFSの開始

HDFSにデータを保存しているため、Hadoopをインストール/検証する必要があります。 Hadoopを起動し、Flumeデータを保存するフォルダーを作成します。 Flumeを設定する前に、以下の手順に従ってください。

ステップ1:Hadoopのインストール/検証

Hadoopをインストールします。 Hadoopがシステムにすでにインストールされている場合、以下に示すように、Hadoop versionコマンドを使用してインストールを確認します。

$ hadoop version

システムにHadoopが含まれていて、パス変数を設定している場合、次の出力が得られます-

Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using/home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

ステップ2:Hadoopを開始する

Hadoopの sbin ディレクトリを参照し、以下に示すようにyarnとHadoop dfs(分散ファイルシステム)を起動します。

cd/$Hadoop_Home/sbin/
$ start-dfs.sh
localhost: starting namenode, logging to
  /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out
localhost: starting datanode, logging to
  /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out
Starting secondary namenodes [0.0.0.0]
starting secondarynamenode, logging to
  /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out

$ start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to
  /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out
localhost: starting nodemanager, logging to
  /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out

ステップ3:HDFSでディレクトリを作成する

Hadoop DFSでは、 mkdir コマンドを使用してディレクトリを作成できます。 それを参照し、以下に示すように、必要なパスに twitter_data という名前のディレクトリを作成します。

$cd/$Hadoop_Home/bin/
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data

Flumeの構成

*conf* フォルダー内の構成ファイルを使用して、ソース、チャネル、およびシンクを構成する必要があります。 この章の例では、Apache Flumeが提供する *Twitter 1%Firehose* メモリチャネルとHDFSシンクという実験ソースを使用します。

Twitter 1%Firehoseソース

このソースは非常に実験的です。 ストリーミングAPIを使用して1%のサンプルTwitter Firehoseに接続し、継続的にツイートをダウンロードし、Avro形式に変換して、AvroイベントをダウンストリームFlumeシンクに送信します。

Flumeのインストールとともに、デフォルトでこのソースを取得します。 このソースに対応する jar ファイルは、以下に示すように lib フォルダーにあります。

Twitter Jarファイル

クラスパスを設定する

以下に示すように、 classpath 変数を Flume-env.sh ファイルでFlumeの lib フォルダーに設定します。

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/*

このソースには、Twitterアプリケーションの Consumer key、Consumer secret、Access tokenAccess token secret などの詳細が必要です。 このソースを設定している間、次のプロパティに値を提供する必要があります-

  • チャンネル
  • *ソースタイプ:org.apache.flume.source.twitter.TwitterSource *
  • consumerKey -OAuthコンシューマキー
  • consumerSecret -OAuthコンシューマシークレット
  • accessToken -OAuthアクセストークン
  • accessTokenSecret -OAuthトークンシークレット
  • maxBatchSize -twitterバッチに含まれるtwitterメッセージの最大数。 デフォルト値は1000(オプション)です。
  • maxBatchDurationMillis -バッチを閉じる前に待機する最大ミリ秒数。 デフォルト値は1000(オプション)です。

チャネル

メモリチャネルを使用しています。 メモリチャネルを設定するには、チャネルのタイプに値を提供する必要があります。

  • type -チャネルのタイプを保持します。 この例では、タイプは MemChannel です。
  • 容量-チャネルに保存されるイベントの最大数です。 デフォルト値は100(オプション)です。
  • TransactionCapacity -チャネルが受け入れるまたは送信するイベントの最大数です。 デフォルト値は100(オプション)です。

HDFSシンク

このシンクはデータをHDFSに書き込みます。 このシンクを設定するには、次の詳細を提供する必要があります。

  • チャネル
  • タイプ-hdfs
  • hdfs.path -データが保存されるHDFSのディレクトリのパス。

そして、シナリオに基づいていくつかのオプションの値を提供できます。 以下に示すのは、アプリケーションで構成しているHDFSシンクのオプションのプロパティです。

  • fileType -これは、HDFSファイルに必要なファイル形式です。 SequenceFile、DataStream 、および CompressedStream は、このストリームで使用できる3つのタイプです。 この例では、 DataStream を使用しています。
  • writeFormat -テキストまたは書き込み可能。
  • batchSize -HDFSにフラッシュされる前にファイルに書き込まれるイベントの数です。 デフォルト値は100です。
  • rollsize -ロールをトリガーするファイルサイズです。 デフォルト値は100です。
  • rollCount -ロールされる前にファイルに書き込まれたイベントの数です。 デフォルト値は10です。

例–構成ファイル

以下に、構成ファイルの例を示します。 このコンテンツをコピーして、Flumeのconfフォルダーに twitter.conf として保存します。

# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql

# Describing/Configuring the sink

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

実行

Flumeホームディレクトリを参照し、以下に示すようにアプリケーションを実行します。

$ cd $FLUME_HOME
$ bin/flume-ng agent --conf ./conf/-f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent

すべてがうまくいけば、HDFSへのツイートのストリーミングが開始されます。 以下は、ツイートを取得する際のコマンドプロンプトウィンドウのスナップショットです。

ツイートの取得

HDFSの検証

以下に示すURLを使用して、Hadoop管理Web UIにアクセスできます。

http://localhost:50070/

ページの右側にある Utilities という名前のドロップダウンをクリックします。 以下のスナップショットに示すように、2つのオプションを見ることができます。

HDFSの検証

[ファイルシステムの参照]をクリックし、ツイートを保存したHDFSディレクトリのパスを入力します。 この例では、パスは */user/Hadoop/twitter_data/ になります。 次に、以下に示すように、HDFSに保存されているtwitterログファイルのリストを確認できます。

ファイルシステムの参照