Hadoop-streaming
Hadoop-ストリーミング
Hadoopストリーミングは、Hadoopディストリビューションに付属しているユーティリティです。 このユーティリティを使用すると、マッパーまたはレデューサーとして実行可能ファイルまたはスクリプトを使用して、Map/Reduceジョブを作成および実行できます。
Pythonを使用した例
Hadoopストリーミングでは、単語数の問題を考慮しています。 Hadoopのジョブには、マッパーとリデューサーの2つのフェーズが必要です。 Hadoopで実行するマッパーとレデューサーのコードをPythonスクリプトで記述しました。 PerlとRubyでも同じことを書くことができます。
マッパーフェーズコード
!/usr/bin/python
import sys
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Break the line into words
words = myline.split()
# Iterate the words list
for myword in words:
# Write the results to standard output
print '%s\t%s' % (myword, 1)
このファイルに実行許可があることを確認してください(chmod + x/home/expert/hadoop-1.2.1/mapper.py)。
減速機フェーズコード
#!/usr/bin/python
from operator import itemgetter
import sys
current_word = ""
current_count = 0
word = ""
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Split the input we got from mapper.py word,
count = myline.split('\t', 1)
# Convert count variable to integer
try:
count = int(count)
except ValueError:
# Count was not a number, so silently ignore this line continue
if current_word == word:
current_count += count
else:
if current_word:
# Write result to standard output print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# Do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Hadoopホームディレクトリのmapper.pyおよびreducer.pyにマッパーコードとリデューサーコードを保存します。 これらのファイルに実行許可があることを確認してください(chmod + x mapper.pyおよびchmod + x reducer.py)。 pythonはインデントに敏感なので、同じコードを以下のリンクからダウンロードできます。
WordCountプログラムの実行
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
-input input_dirs \
-output output_dir \
-mapper <path/mapper.py \
-reducer <path/reducer.py
「\」は、読みやすくするために行の継続に使用されます。
例えば、
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper/home/expert/hadoop-1.2.1/mapper.py -reducer/home/expert/hadoop-1.2.1/reducer.py
ストリーミングの仕組み
上記の例では、マッパーとリデューサーはどちらも標準入力から入力を読み取り、出力を標準出力に出力するpythonスクリプトです。 ユーティリティは、Map/Reduceジョブを作成し、ジョブを適切なクラスターに送信し、完了するまでジョブの進行状況を監視します。
マッパーにスクリプトが指定されている場合、マッパーが初期化されると、各マッパータスクがスクリプトを個別のプロセスとして起動します。 マッパータスクが実行されると、入力を行に変換し、その行をプロセスの標準入力(STDIN)に送ります。 その間、マッパーはプロセスの標準出力(STDOUT)から行指向の出力を収集し、各行をキー/値のペアに変換します。これはマッパーの出力として収集されます。 デフォルトでは、最初のタブ文字までの行の接頭辞がキーであり、行の残り(タブ文字を除く)が値になります。 行にタブ文字がない場合、行全体がキーと見なされ、値はヌルになります。 ただし、これは必要に応じてカスタマイズできます。
レデューサーにスクリプトが指定されると、各レデューサータスクはスクリプトを個別のプロセスとして起動し、レデューサーが初期化されます。 レデューサータスクが実行されると、入力キー/値のペアを行に変換し、その行をプロセスの標準入力(STDIN)に送ります。 それまでの間、リデューサーはプロセスの標準出力(STDOUT)から行指向の出力を収集し、各行をキー/値のペアに変換します。これはリデューサーの出力として収集されます。 デフォルトでは、最初のタブ文字までの行の接頭辞がキーであり、行の残りの部分(タブ文字を除く)が値です。 ただし、これは特定の要件に従ってカスタマイズできます。
重要なコマンド
Parameters | Options | Description |
---|---|---|
-input directory/file-name | Required | Input location for mapper. |
-output directory-name | Required | Output location for reducer. |
-mapper executable or script or JavaClassName | Required | Mapper executable. |
-reducer executable or script or JavaClassName | Required | Reducer executable. |
-file file-name | Optional | Makes the mapper, reducer, or combiner executable available locally on the compute nodes. |
-inputformat JavaClassName | Optional | Class you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default. |
-outputformat JavaClassName | Optional | Class you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default. |
-partitioner JavaClassName | Optional | Class that determines which reduce a key is sent to. |
-combiner streamingCommand or JavaClassName | Optional | Combiner executable for map output. |
-cmdenv name=value | Optional | Passes the environment variable to streaming commands. |
-inputreader | Optional | For backwards-compatibility: specifies a record reader class (instead of an input format class). |
-verbose | Optional | Verbose output. |
-lazyOutput | Optional | Creates output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write). |
-numReduceTasks | Optional | Specifies the number of reducers. |
-mapdebug | Optional | Script to call when map task fails. |
-reducedebug | Optional | Script to call when reduce task fails. |