Map-reduce-implementation-in-hadoop

提供:Dev Guides
移動先:案内検索

MapReduce-Hadoopの実装

MapReduceは、商品ハードウェアの大規模なクラスターで大量のデータを信頼性の高い方法で処理するアプリケーションを作成するために使用されるフレームワークです。 この章では、Javaを使用したHadoopフレームワークでのMapReduceの操作について説明します。

MapReduceアルゴリズム

通常、MapReduceパラダイムは、実際のデータが存在するコンピューターにmap-reduceプログラムを送信することに基づいています。

  • MapReduceジョブ中に、HadoopはMapおよびReduceタスクをクラスター内の適切なサーバーに送信します。
  • フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター全体のデータのコピーなど、データの受け渡しに関するすべての詳細を管理します。
  • ほとんどの計算は、ネットワークトラフィックを削減するローカルディスク上のデータを使用してノードで実行されます。
  • 所定のタスクを完了すると、クラスターはデータを収集および削減して適切な結果を生成し、Hadoopサーバーに送り返します。

MapReduceアルゴリズム

入力と出力(Javaパースペクティブ)

MapReduceフレームワークは、キーと値のペアで動作します。つまり、フレームワークは、ジョブへの入力をキーと値のペアのセットと見なし、ジョブの出力として、おそらく異なるタイプのキーと値のペアのセットを生成します。

キーと値のクラスはフレームワークによってシリアル化できる必要があるため、Writableインターフェイスを実装する必要があります。 さらに、キークラスは、フレームワークによる並べ替えを容易にするためにWritableComparableインターフェイスを実装する必要があります。

MapReduceジョブの入力と出力の両方の形式は、キーと値のペアの形式です-

(入力)<k1、v1>→ map→ <k2、v2>→ reduce→ <k3、v3>(出力)。

Input Output
Map <k1, v1> list (<k2, v2>)
Reduce <k2, list(v2)> list (<k3, v3>)

MapReduceの実装

次の表は、組織の電力消費に関するデータを示しています。 この表には、5年間連続の月間電力消費量と年間平均が含まれています。

Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec Avg
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

指定された表の入力データを処理するアプリケーションを作成して、最大使用年、最小使用年などを見つける必要があります。 このタスクは、必要な出力を生成するロジックを記述し、書き込まれたアプリケーションにデータを渡すだけなので、レコード数が限られているプログラマーにとっては簡単です。

入力データのスケールを上げましょう。 特定の州のすべての大規模産業の電力消費量を分析する必要があるとします。 このようなバルクデータを処理するアプリケーションを作成する場合、

  • それらの実行には多くの時間がかかります。
  • ソースからネットワークサーバーにデータを移動すると、大量のネットワークトラフィックが発生します。

これらの問題を解決するために、MapReduceフレームワークがあります。

入力データ

上記のデータは sample.txt として保存され、入力として提供されます。 入力ファイルは次のようになります。

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

サンプルプログラム

サンプルデータ用の次のプログラムは、MapReduceフレームワークを使用しています。

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
  //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable, /*Input key Type */
   Text,                  /*Input value Type*/
   Text,                  /*Output key Type*/
   IntWritable>           /*Output value Type*/
   {
     //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();

         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }

         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }

  //Reducer class

   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
     //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }

  //Main function

   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);

      conf.setJobName("max_eletricityunits");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
   }
}

上記のプログラムを ProcessUnits.java に保存します。 プログラムのコンパイルと実行を以下に示します。

ProcessUnitsプログラムのコンパイルと実行

Hadoopユーザーのホームディレクトリにいると仮定しましょう(例:/home/hadoop)。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

  • ステップ1 *-次のコマンドを使用して、コンパイル済みのJavaクラスを保存するディレクトリを作成します。
$ mkdir units
  • ステップ2 *-MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。 mvnrepository.comからjarをダウンロードします。 ダウンロードフォルダーが/home/hadoop/であると仮定しましょう。

ステップ3 *-次のコマンドを使用して、 *ProcessUnits.java プログラムをコンパイルし、プログラムのjarを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/.
  • ステップ4 *-次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

ステップ5 *-次のコマンドを使用して、HDFSの入力ディレクトリにある *sample.txt という名前の入力ファイルをコピーします。

$HADOOP_HOME/bin/hadoop fs -put/home/hadoop/sample.txt input_dir
  • ステップ6 *-次のコマンドを使用して、入力ディレクトリ内のファイルを確認します
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
  • ステップ7 *-次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。 実行後、出力にはいくつかの入力分割、Mapタスク、Reducerタスクなどが含まれます。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters

   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2

   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120

   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120

   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework

   Map input records=5

   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67

   Input split bytes=208
   Combine input records=5
   Combine output records=5

   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5

   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2

   GC time elapsed (ms)=948
   CPU time spent (ms)=5160

   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504

   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40
  • ステップ8 *-次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/

ステップ9 *- *Part-00000 ファイルの出力を確認するには、次のコマンドを使用します。 このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下はMapReduceプログラムによって生成された出力です-

1981 34
1984 40
1985 45
  • ステップ10 *-次のコマンドを使用して、出力フォルダーをHDFSからローカルファイルシステムにコピーします。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir/home/hadoop