Hadoop-streaming

提供:Dev Guides
移動先:案内検索

Hadoop-ストリーミング

Hadoopストリーミングは、Hadoopディストリビューションに付属しているユーティリティです。 このユーティリティを使用すると、マッパーまたはレデューサーとして実行可能ファイルまたはスクリプトを使用して、Map/Reduceジョブを作成および実行できます。

Pythonを使用した例

Hadoopストリーミングでは、単語数の問題を考慮しています。 Hadoopのジョブには、マッパーとリデューサーの2つのフェーズが必要です。 Hadoopで実行するマッパーとレデューサーのコードをPythonスクリプトで記述しました。 PerlとRubyでも同じことを書くことができます。

マッパーフェーズコード

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin:
   # Remove whitespace either side
   myline = myline.strip()

   # Break the line into words
   words = myline.split()

   # Iterate the words list
   for myword in words:
      # Write the results to standard output
      print '%s\t%s' % (myword, 1)

このファイルに実行許可があることを確認してください(chmod + x/home/expert/hadoop-1.2.1/mapper.py)。

減速機フェーズコード

#!/usr/bin/python

from operator import itemgetter
import sys

current_word = ""
current_count = 0
word = ""

# Input takes from standard input for myline in sys.stdin:
   # Remove whitespace either side
   myline = myline.strip()

   # Split the input we got from mapper.py word,
   count = myline.split('\t', 1)

   # Convert count variable to integer
   try:
      count = int(count)

   except ValueError:
      # Count was not a number, so silently ignore this line continue

   if current_word == word:
   current_count += count
   else:
      if current_word:
         # Write result to standard output print '%s\t%s' % (current_word, current_count)

      current_count = count
      current_word = word

# Do not forget to output the last word if needed!
if current_word == word:
   print '%s\t%s' % (current_word, current_count)

Hadoopホームディレクトリのmapper.pyおよびreducer.pyにマッパーコードとリデューサーコードを保存します。 これらのファイルに実行許可があることを確認してください(chmod + x mapper.pyおよびchmod + x reducer.py)。 pythonはインデントに敏感なので、同じコードを以下のリンクからダウンロードできます。

WordCountプログラムの実行

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \
   -output output_dir \
   -mapper <path/mapper.py \
   -reducer <path/reducer.py

「\」は、読みやすくするために行の継続に使用されます。

例えば、

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper/home/expert/hadoop-1.2.1/mapper.py -reducer/home/expert/hadoop-1.2.1/reducer.py

ストリーミングの仕組み

上記の例では、マッパーとリデューサーはどちらも標準入力から入力を読み取り、出力を標準出力に出力するpythonスクリプトです。 ユーティリティは、Map/Reduceジョブを作成し、ジョブを適切なクラスターに送信し、完了するまでジョブの進行状況を監視します。

マッパーにスクリプトが指定されている場合、マッパーが初期化されると、各マッパータスクがスクリプトを個別のプロセスとして起動します。 マッパータスクが実行されると、入力を行に変換し、その行をプロセスの標準入力(STDIN)に送ります。 その間、マッパーはプロセスの標準出力(STDOUT)から行指向の出力を収集し、各行をキー/値のペアに変換します。これはマッパーの出力として収集されます。 デフォルトでは、最初のタブ文字までの行の接頭辞がキーであり、行の残り(タブ文字を除く)が値になります。 行にタブ文字がない場合、行全体がキーと見なされ、値はヌルになります。 ただし、これは必要に応じてカスタマイズできます。

レデューサーにスクリプトが指定されると、各レデューサータスクはスクリプトを個別のプロセスとして起動し、レデューサーが初期化されます。 レデューサータスクが実行されると、入力キー/値のペアを行に変換し、その行をプロセスの標準入力(STDIN)に送ります。 それまでの間、リデューサーはプロセスの標準出力(STDOUT)から行指向の出力を収集し、各行をキー/値のペアに変換します。これはリデューサーの出力として収集されます。 デフォルトでは、最初のタブ文字までの行の接頭辞がキーであり、行の残りの部分(タブ文字を除く)が値です。 ただし、これは特定の要件に従ってカスタマイズできます。

重要なコマンド

Parameters Options Description
-input directory/file-name Required Input location for mapper.
-output directory-name Required Output location for reducer.
-mapper executable or script or JavaClassName Required Mapper executable.
-reducer executable or script or JavaClassName Required Reducer executable.
-file file-name Optional Makes the mapper, reducer, or combiner executable available locally on the compute nodes.
-inputformat JavaClassName Optional Class you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default.
-outputformat JavaClassName Optional Class you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default.
-partitioner JavaClassName Optional Class that determines which reduce a key is sent to.
-combiner streamingCommand or JavaClassName Optional Combiner executable for map output.
-cmdenv name=value Optional Passes the environment variable to streaming commands.
-inputreader Optional For backwards-compatibility: specifies a record reader class (instead of an input format class).
-verbose Optional Verbose output.
-lazyOutput Optional Creates output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write).
-numReduceTasks Optional Specifies the number of reducers.
-mapdebug Optional Script to call when map task fails.
-reducedebug Optional Script to call when reduce task fails.