Map-reduce-partitioner

提供:Dev Guides
移動先:案内検索

MapReduce-パーティショナー

パーティショナーは、入力データセットを処理する条件のように機能します。 分割フェーズは、マップフェーズの後、削減フェーズの前に行われます。

パーティショナーの数は、レデューサーの数と同じです。 つまり、パーティショナーはレデューサーの数に応じてデータを分割します。 したがって、単一のパーティショナーから渡されたデータは、単一のReducerによって処理されます。

パーティショナー

パーティショナーは、中間のMap出力のキーと値のペアをパーティション分割します。 ハッシュ関数のように機能するユーザー定義の条件を使用してデータを分割します。 パーティションの合計数は、ジョブのReducerタスクの数と同じです。 パーティショナーがどのように機能するかを理解するために例を挙げましょう。

MapReduce Partitionerの実装

便宜上、次のデータを持つEmployeeという小さなテーブルがあると仮定します。 このサンプルデータを入力データセットとして使用して、パーティショナーの動作を示します。

Id Name Age Gender Salary
1201 gopal 45 Male 50,000
1202 manisha 40 Female 50,000
1203 khalil 34 Male 30,000
1204 prasanth 30 Male 30,000
1205 kiran 20 Male 40,000
1206 laxmi 25 Female 35,000
1207 bhavya 20 Female 15,000
1208 reshma 19 Female 15,000
1209 kranthi 22 Male 22,000
1210 Satish 24 Male 25,000
1211 Krishna 25 Male 25,000
1212 Arshad 28 Male 20,000
1213 lavanya 18 Female 8,000

入力データセットを処理して、さまざまな年齢層(20歳未満、21歳から30歳、30歳以上など)の性別ごとに最高給与の従業員を見つけるアプリケーションを作成する必要があります。

入力データ

上記のデータは、「/home/hadoop/hadoopPartitioner」ディレクトリに input.txt として保存され、入力として提供されます。

1201 gopal 45 Male 50000
1202 manisha 40 Female 51000
1203 khaleel 34 Male 30000
1204 prasanth 30 Male 31000
1205 kiran 20 Male 40000
1206 laxmi 25 Female 35000
1207 bhavya 20 Female 15000
1208 reshma 19 Female 14000
1209 kranthi 22 Male 22000
1210 Satish 24 Male 25000
1211 Krishna 25 Male 26000
1212 Arshad 28 Male 20000
1213 lavanya 18 Female 8000

指定された入力に基づいて、プログラムのアルゴリズムの説明を以下に示します。

マップタスク

マップタスクは、テキストファイルにテキストデータがある間は、キーと値のペアを入力として受け入れます。 このマップタスクの入力は次のとおりです-

入力-キーは、「任意の特別なキー+」などのパターンになります。ファイル名+行番号」(例:キー= @ input1)、値はその行のデータになります(例:値= 1201 \ t gopal \ t 45 \ t男性\ t 50000)。

方法-このマップタスクの操作は次のとおりです-

  • 文字列の引数リストから入力値として取得される*値*(レコードデータ)を読み取ります。
  • split関数を使用して、性別を分離し、文字列変数に保存します。
String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • マップタスクから*パーティションタスク*に出力キーと値のペアとして性別情報とレコードデータ*値*を送信します。
context.write(new Text(gender), new Text(value));
  • テキストファイル内のすべてのレコードに対して上記のすべての手順を繰り返します。

出力-性別データとレコードデータ値をキーと値のペアとして取得します。

パーティショナータスク

パーティショナータスクは、マップタスクからのキーと値のペアを入力として受け入れます。 パーティションとは、データをセグメントに分割することを意味します。 パーティションの指定された条件基準に従って、入力キーと値のペアのデータは、年齢基準に基づいて3つの部分に分割できます。

入力-キーと値のペアのコレクション内のデータ全体。

キー=レコード内の性別フィールド値。

値=その性別のレコードデータ値全体。

方法-パーティションロジックのプロセスは次のように実行されます。

  • 入力キーと値のペアから年齢フィールドの値を読み取ります。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 次の条件で年齢の値を確認します。
  • 20歳以下
  • 年齢20歳以上30歳以下。
  • 30歳以上。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

出力-キーと値のペアのデータ全体が、キーと値のペアの3つのコレクションに分割されます。 レデューサーは、各コレクションで個別に機能します。

タスクを減らす

パーティショナータスクの数は、レデューサータスクの数と同じです。 ここでは、3つのパーティショナータスクがあり、したがって3つのReducerタスクが実行されます。

入力-レデューサーは、キーと値のペアの異なるコレクションで3回実行されます。

キー=レコード内の性別フィールド値。

値=その性別のレコードデータ全体。

方法-次のロジックが各コレクションに適用されます。

  • 各レコードの給与フィールド値を読み取ります。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • max変数で給与を確認してください。 str [4]が最大給与の場合、str [4]をmaxに割り当てます。それ以外の場合は、ステップをスキップします。
if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • キーコレクションごとに手順1と2を繰り返します(キーコレクションは男性と女性です)。 これらの3つのステップを実行すると、男性キーコレクションから最大給与が1つ、女性キーコレクションから最大給与が1つ見つかります。
context.write(new Text(key), new IntWritable(max));

出力-最後に、異なる年齢層の3つのコレクションでキーと値のペアのデータのセットを取得します。 各年齢層の男性コレクションの最高給与と女性コレクションの最高給与がそれぞれ含まれています。

Map、Partitioner、およびReduceタスクを実行した後、キーと値のペアのデータの3つのコレクションは、出力として3つの異なるファイルに保存されます。

3つのタスクはすべて、MapReduceジョブとして扱われます。 これらのジョブの次の要件と仕様は、構成で指定する必要があります-

  • 職種名
  • キーと値の入力および出力形式
  • Map、Reduce、およびPartitionerタスクの個々のクラス
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

//File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

サンプルプログラム

次のプログラムは、MapReduceプログラムで特定の条件にパーティショナーを実装する方法を示しています。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
  //Map class

   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }

  //Reducer class

   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;

         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }

         context.write(new Text(key), new IntWritable(max));
      }
   }

  //Partitioner class

   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);

         if(numReduceTasks == 0)
         {
            return 0;
         }

         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }

   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();

      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);

      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));

      job.setMapperClass(MapClass.class);

      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);

     //set partitioner statement

      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);

      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }

   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

上記のコードを「/home/hadoop/hadoopPartitioner」に PartitionerExample.java として保存します。 プログラムのコンパイルと実行を以下に示します。

コンパイルと実行

Hadoopユーザーのホームディレクトリ(たとえば、/home/hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

  • ステップ1 *-MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。 jarはhttp://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1[mvnrepository.com]からダウンロードできます。

ダウンロードしたフォルダが「/home/hadoop/hadoopPartitioner」であると仮定しましょう

ステップ2 *-次のコマンドは、プログラム *PartitionerExample.java をコンパイルし、プログラムのjarを作成するために使用されます。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
  • ステップ3 *-次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

ステップ4 *-次のコマンドを使用して、HDFSの入力ディレクトリにある *input.txt という名前の入力ファイルをコピーします。

$HADOOP_HOME/bin/hadoop fs -put/home/hadoop/hadoopPartitioner/input.txt input_dir
  • ステップ5 *-次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
  • ステップ6 *-次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してTop salaryアプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

ファイルが実行されるまでしばらく待ちます。 実行後、出力にはいくつかの入力分割、マップタスク、およびReducerタスクが含まれます。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6

Job Counters

   Launched map tasks=1
   Launched reduce tasks=3

   Data-local map tasks=1

   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858

   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858

   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592

Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467

   Input split bytes=119

   Combine input records=0
   Combine output records=0

   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6

   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690

   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688

   Total committed heap usage (bytes)=334102528

Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0

   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0

File Input Format Counters

   Bytes Read=361

File Output Format Counters

   Bytes Written=72
  • ステップ7 *-次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/

プログラムで3つのパーティショナーと3つのレデューサーを使用しているため、出力は3つのファイルにあります。

ステップ8 *-次のコマンドを使用して、 *Part-00000 ファイルの出力を確認します。 このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
  • Part-00000で出力*
Female   15000
Male     40000

次のコマンドを使用して、 Part-00001 ファイルの出力を確認します。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
  • Part-00001での出力*
Female   35000
Male    31000

次のコマンドを使用して、 Part-00002 ファイルの出力を確認します。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
  • Part-00002の出力*
Female  51000
Male   50000