Map-reduce-combiners

提供:Dev Guides
移動先:案内検索

MapReduce-コンバイナー

Combinerは、 semi-reducer とも呼ばれ、Mapクラスからの入力を受け入れてから、出力のキーと値のペアをReducerクラスに渡すことで動作するオプションのクラスです。

Combinerの主な機能は、マップ出力レコードを同じキーで要約することです。 コンバイナの出力(キーと値のコレクション)は、ネットワークを介して入力として実際のReducerタスクに送信されます。

コンバイナー

MapクラスとReduceクラスの間でCombinerクラスを使用して、MapとReduce間のデータ転送量を削減します。 通常、mapタスクの出力は大きく、reduceタスクに転送されるデータは大きくなります。

次のMapReduceタスク図は、COMBINER PHASEを示しています。

Combiner

Combinerの仕組み

MapReduce Combinerの仕組みに関する簡単な概要を次に示します-

  • コンバイナには事前定義されたインターフェースがなく、Reducerインターフェースのreduce()メソッドを実装する必要があります。
  • コンバイナは、各マップ出力キーで動作します。 Reducerクラスと同じ出力Key-Valueタイプが必要です。
  • コンバイナは、元のマップ出力を置き換えるため、大規模なデータセットから要約情報を生成できます。

ただし、Combinerはオプションですが、データを複数のグループに分けてリデュースフェーズを支援し、処理を容易にします。

MapReduce Combinerの実装

次の例は、コンバイナに関する理論的なアイデアを提供します。 MapReduceの input.txt という名前の次の入力テキストファイルがあるとします。

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

Combinerを使用したMapReduceプログラムの重要なフェーズを以下で説明します。

レコードリーダー

これはMapReduceの最初のフェーズであり、レコードリーダーは入力テキストファイルからすべての行をテキストとして読み取り、キーと値のペアとして出力を生成します。

入力-入力ファイルの行ごとのテキスト。

出力-キーと値のペアを形成します。 以下は、予想されるキーと値のペアのセットです。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

マップフェーズ

マップフェーズでは、レコードリーダーから入力を取得して処理し、キーと値のペアの別のセットとして出力を生成します。

入力-次のキーと値のペアは、レコードリーダーから取得した入力です。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Mapフェーズでは、各キーと値のペアを読み取り、StringTokenizerを使用して各単語を値から除算し、各単語をキーとして扱い、その単語のカウントを値として扱います。 次のコードスニペットは、Mapperクラスとmap関数を示しています。

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();

   public void map(Object key, Text value, Context context) throws IOException, InterruptedException
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens())
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

出力-予想される出力は次のとおりです-

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

コンバイナーフェーズ

Combinerフェーズは、Mapフェーズから各キーと値のペアを取得して処理し、*キーと値のコレクション*のペアとして出力を生成します。

入力-次のキーと値のペアは、マップフェーズから取得した入力です。

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

コンバイナフェーズでは、各キーと値のペアを読み取り、キーとして一般的な単語を組み合わせ、コレクションとして値を組み合わせます。 通常、Combinerのコードと操作はReducerのコードと操作に似ています。 以下は、Mapper、Combiner、およびReducerのクラス宣言のコードスニペットです。

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

出力-予想される出力は次のとおりです-

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

減速機フェーズ

Reducerフェーズでは、Combinerフェーズから各キーと値のコレクションペアを取得して処理し、出力をキーと値のペアとして渡します。 Combiner機能はReducerと同じであることに注意してください。

入力-次のキーと値のペアは、コンバイナフェーズから取得した入力です。

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

レデューサーフェーズは、各キーと値のペアを読み取ります。 以下は、Combinerのコードスニペットです。

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
   private IntWritable result = new IntWritable();

   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
   {
      int sum = 0;
      for (IntWritable val : values)
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

出力-レデューサーフェーズから予想される出力は次のとおりです-

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

レコードライター

これはMapReduceの最後のフェーズで、Record WriterはReducerフェーズからのすべてのキーと値のペアを書き込み、出力をテキストとして送信します。

入力-出力形式とともに、Reducerフェーズからの各キーと値のペア。

出力-キーと値のペアをテキスト形式で提供します。 予想される出力は次のとおりです。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

サンプルプログラム

次のコードブロックは、プログラム内の単語の数をカウントします。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens())
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }

   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
      {
         int sum = 0;
         for (IntWritable val : values)
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }

   public static void main(String[] args) throws Exception
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");

      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);

      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);

      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

上記のプログラムを WordCount.java として保存します。 プログラムのコンパイルと実行を以下に示します。

コンパイルと実行

Hadoopユーザーのホームディレクトリ(たとえば、/home/hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

  • ステップ1 *-次のコマンドを使用して、コンパイル済みのJavaクラスを保存するディレクトリを作成します。
$ mkdir units
  • ステップ2 *-MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。 jarはhttp://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1[mvnrepository.com]からダウンロードできます。

ダウンロードしたフォルダーが/home/hadoop/であると仮定します。

ステップ3 *-次のコマンドを使用して、 *WordCount.java プログラムをコンパイルし、プログラムのjarを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/.
  • ステップ4 *-次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

ステップ5 *-次のコマンドを使用して、HDFSの入力ディレクトリにある *input.txt という名前の入力ファイルをコピーします。

$HADOOP_HOME/bin/hadoop fs -put/home/hadoop/input.txt input_dir
  • ステップ6 *-次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
  • ステップ7 *-次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してワードカウントアプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。 実行後、出力にはいくつかの入力分割、マップタスク、およびレデューサータスクが含まれます。

  • ステップ8 *-次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/

ステップ9 *-次のコマンドを使用して、 *Part-00000 ファイルの出力を確認します。 このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下は、MapReduceプログラムによって生成された出力です。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1