Hadoop-mapreduce

提供:Dev Guides
移動先:案内検索

Hadoop-MapReduce

MapReduceは、市販のハードウェアの大規模なクラスターで、大量のデータを信頼性の高い方法で並列処理するアプリケーションを作成できるフレームワークです。

MapReduceとは何ですか?

MapReduceは、Javaに基づく分散コンピューティングの処理技術およびプログラムモデルです。 MapReduceアルゴリズムには、MapとReduceという2つの重要なタスクが含まれています。 Mapはデータのセットを取得し、それを別のデータのセットに変換します。個々の要素はタプル(キー/値のペア)に分解されます。 次に、reduceタスクは、マップからの出力を入力として受け取り、それらのデータタプルを小さなタプルセットに結合します。 MapReduceという名前のシーケンスが示すように、reduceタスクは常にマップジョブの後に実行されます。

MapReduceの主な利点は、複数のコンピューティングノードでデータ処理を簡単にスケーリングできることです。 MapReduceモデルでは、データ処理プリミティブはマッパーおよびレデューサーと呼ばれます。 データ処理アプリケーションを_mappers_と_reducers_に分解することは、簡単ではない場合があります。 ただし、アプリケーションをMapReduceフォームで記述した後、クラスター内で数百、数千、または数万のマシンで実行するようにアプリケーションをスケーリングすることは、構成の変更にすぎません。 この単純なスケーラビリティは、多くのプログラマーがMapReduceモデルを使用するようになった理由です。

アルゴリズム

  • 通常、MapReduceパラダイムは、データが存在する場所にコンピューターを送信することに基づいています!
  • MapReduceプログラムは、マップステージ、シャッフルステージ、およびリデュースステージの3つのステージで実行されます。
  • マップステージ-マップまたはマッパーの仕事は、入力データを処理することです。 通常、入力データはファイルまたはディレクトリの形式であり、Hadoopファイルシステム(HDFS)に保存されます。 入力ファイルは、1行ずつマッパー関数に渡されます。 マッパーはデータを処理し、いくつかの小さなデータチャンクを作成します。
  • * Reduceステージ*-このステージは、 Shuffle ステージと Reduce ステージの組み合わせです。 Reducerの仕事は、マッパーからのデータを処理することです。 処理後、新しい出力セットが生成され、HDFSに保存されます。
  • MapReduceジョブ中に、HadoopはMapおよびReduceタスクをクラスター内の適切なサーバーに送信します。
  • フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター全体のデータのコピーなど、データの受け渡しに関するすべての詳細を管理します。
  • ほとんどのコンピューティングは、ネットワークトラフィックを削減するローカルディスク上のデータを持つノードで実行されます。 *指定されたタスクの完了後、クラスターはデータを収集および削減して適切な結果を形成し、Hadoopサーバーに送り返します。

MapReduceアルゴリズム

入力と出力(Javaパースペクティブ)

MapReduceフレームワークは<key、value>ペアで動作します。つまり、フレームワークはジョブへの入力を<key、value>ペアのセットとして表示し、ジョブの出力として<key、value>ペアのセットを生成します、おそらく異なるタイプの。

キーと値のクラスは、フレームワークによってシリアル化された方法である必要があるため、Writableインターフェイスを実装する必要があります。 さらに、キークラスは、フレームワークによるソートを容易にするために、Writable-Comparableインターフェイスを実装する必要があります。* MapReduceジョブの入力および出力タイプ*-(入力)<k1、v1>→マップ→<k2、v2>→削減→<k3、v3>(出力)。

Input Output
Map <k1, v1> list (<k2, v2>)
Reduce <k2, list(v2)> list (<k3, v3>)

用語

  • PayLoad -アプリケーションはMapおよびReduce関数を実装し、ジョブのコアを形成します。
  • マッパー-マッパーは、入力キー/値ペアを一連の中間キー/値ペアにマップします。
  • NamedNode -Hadoop分散ファイルシステム(HDFS)を管理するノード。
  • DataNode -処理が行われる前にデータが事前に提示されるノード。
  • MasterNode -JobTrackerが実行され、クライアントからのジョブ要求を受け入れるノード。
  • SlaveNode -Map and Reduceプログラムが実行されるノード。
  • JobTracker -ジョブをスケジュールし、タスクトラッカーへの割り当てジョブを追跡します。
  • タスクトラッカー-タスクを追跡し、ステータスをJobTrackerに報告します。
  • ジョブ-プログラムとは、データセット全体でマッパーとリデューサーを実行することです。
  • タスク-データのスライスに対するマッパーまたはリデューサーの実行。
  • Task Attempt -SlaveNodeでタスクを実行しようとする特定のインスタンス。

シナリオ例

以下は、組織の電力消費に関するデータです。 さまざまな年の毎月の電気消費量と年間平均が含まれています。

Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec Avg
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

上記のデータが入力として与えられる場合、それを処理し、最大使用年、最小使用年などを見つけるなどの結果を生成するアプリケーションを作成する必要があります。 これは、有限数のレコードを持つプログラマーのためのウォークオーバーです。 単純に必要な出力を生成するロジックを記述し、書き込まれたアプリケーションにデータを渡します。

しかし、特定の州のすべての大規模産業の電気消費量は、その形成以来と考えてください。

このようなバルクデータを処理するアプリケーションを作成する場合、

  • それらの実行には多くの時間がかかります。
  • ソースからネットワークサーバーなどにデータを移動すると、大量のネットワークトラフィックが発生します。

これらの問題を解決するために、MapReduceフレームワークがあります。

入力データ

上記のデータは* sample.txt *として保存され、入力として提供されます。 入力ファイルは次のようになります。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25
1980   26   27   28  28   28   30   31   31   31   30   30   30  29
1981   31   32   32  32   33   34   35   36   36   34   34   34  34
1984   39   38   39  39   39   41   42   43   40   39   38   38  40
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

サンプルプログラム

以下に、MapReduceフレームワークを使用したサンプルデータへのプログラムを示します。

package hadoop;

import java.util.*;

import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits {
  //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable ,/*Input key Type */
   Text,               /*Input value Type*/
   Text,               /*Output key Type*/
   IntWritable>       /*Output value Type*/
   {
     //Map function
      public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output,

      Reporter reporter) throws IOException {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();

         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }

  //Reducer class
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {

     //Reduce function
      public void reduce( Text key, Iterator <IntWritable> values,
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
         int maxavg = 30;
         int val = Integer.MIN_VALUE;

         while (values.hasNext()) {
            if((val = values.next().get())>maxavg) {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }

  //Main function
   public static void main(String args[])throws Exception {
      JobConf conf = new JobConf(ProcessUnits.class);

      conf.setJobName("max_eletricityunits");
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
   }
}

上記のプログラムを* ProcessUnits.java。*として保存します。プログラムのコンパイルと実行について以下に説明します。

プロセスユニットプログラムのコンパイルと実行

Hadoopユーザーのホームディレクトリにいると仮定しましょう(例:/home/hadoop)。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

ステップ1

次のコマンドは、コンパイル済みのJavaクラスを格納するディレクトリを作成します。

$ mkdir units

ステップ2

MapReduceプログラムのコンパイルと実行に使用される Hadoop-core-1.2.1.jar をダウンロードします。 次のリンクhttp://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1[mvnrepository.com]にアクセスして、jarをダウンロードします。 ダウンロードしたフォルダが*/home/hadoop/.*であると仮定しましょう

ステップ3

次のコマンドは、 ProcessUnits.java プログラムをコンパイルし、プログラムのjarを作成するために使用されます。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/.

ステップ4

次のコマンドを使用して、HDFSに入力ディレクトリを作成します。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

ステップ5

次のコマンドを使用して、* sample.txt *という名前の入力ファイルをHDFSの入力ディレクトリにコピーします。

$HADOOP_HOME/bin/hadoop fs -put/home/hadoop/sample.txt input_dir

ステップ6

次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

ステップ7

次のコマンドは、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行するために使用されます。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。 実行後、以下に示すように、出力には入力分割の数、マップタスクの数、レデューサータスクの数などが含まれます。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
   File System Counters

FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters


   Launched map tasks = 2
   Launched reduce tasks = 1
   Data-local map tasks = 2
   Total time spent by all maps in occupied slots (ms) = 146137
   Total time spent by all reduces in occupied slots (ms) = 441
   Total time spent by all map tasks (ms) = 14613
   Total time spent by all reduce tasks (ms) = 44120
   Total vcore-seconds taken by all map tasks = 146137
   Total vcore-seconds taken by all reduce tasks = 44120
   Total megabyte-seconds taken by all map tasks = 149644288
   Total megabyte-seconds taken by all reduce tasks = 45178880

Map-Reduce Framework

   Map input records = 5
   Map output records = 5
   Map output bytes = 45
   Map output materialized bytes = 67
   Input split bytes = 208
   Combine input records = 5
   Combine output records = 5
   Reduce input groups = 5
   Reduce shuffle bytes = 6
   Reduce input records = 5
   Reduce output records = 5
   Spilled Records = 10
   Shuffled Maps  = 2
   Failed Shuffles = 0
   Merged Map outputs = 2
   GC time elapsed (ms) = 948
   CPU time spent (ms) = 5160
   Physical memory (bytes) snapshot = 47749120
   Virtual memory (bytes) snapshot = 2899349504
   Total committed heap usage (bytes) = 277684224

File Output Format Counters

   Bytes Written = 40

ステップ8

次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

ステップ9

*Part-00000* ファイルの出力を表示するには、次のコマンドを使用します。 このファイルはHDFSによって生成されます。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下は、MapReduceプログラムによって生成された出力です。

1981    34
1984    40
1985    45

ステップ10

次のコマンドは、分析のためにHDFSからローカルファイルシステムに出力フォルダーをコピーするために使用されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir/home/hadoop

重要なコマンド

すべてのHadoopコマンドは、 $ HADOOP_HOME/bin/hadoop コマンドによって呼び出されます。 引数なしでHadoopスクリプトを実行すると、すべてのコマンドの説明が出力されます。

使用法-hadoop [--config confdir]コマンド

次の表に、使用可能なオプションとその説明を示します。

Sr.No. Option & Description
1

namenode -format

DFSファイルシステムをフォーマットします。

2

secondarynamenode

DFSセカンダリネームノードを実行します。

3

namenode

DFS名前ノードを実行します。

4

datanode

DFSデータノードを実行します。

5

dfsadmin

DFS管理クライアントを実行します。

6

mradmin

Map-Reduce管理クライアントを実行します。

7

fsck

DFSファイルシステムチェックユーティリティを実行します。

8

fs

汎用ファイルシステムユーザークライアントを実行します。

9

balancer

クラスターバランシングユーティリティを実行します。

10

oiv

オフラインfsimageビューアーをfsimageに適用します。

11

fetchdt

NameNodeから委任トークンを取得します。

12

jobtracker

MapReduceジョブトラッカーノードを実行します。

13

pipes

パイプジョブを実行します。

14

tasktracker

MapReduceタスクトラッカーノードを実行します。

15

historyserver

ジョブ履歴サーバーをスタンドアロンデーモンとして実行します。

16

job

MapReduceジョブを操作します。

17

queue

JobQueuesに関する情報を取得します。

18

version

バージョンを出力します。

19

jar <jar>

jarファイルを実行します。

20

distcp <srcurl> <desturl>

ファイルまたはディレクトリを再帰的にコピーします。

21

distcp2 <srcurl> <desturl>

DistCpバージョン2。

22

archive -archiveName NAME -p <parent path> <src> <dest>*

hadoopアーカイブを作成します。

23

classpath

Hadoop jarおよび必要なライブラリを取得するために必要なクラスパスを出力します。

24

daemonlog

各デーモンのログレベルを取得/設定します

MapReduceジョブと対話する方法

使用法-hadoopジョブ[GENERIC_OPTIONS]

以下は、Hadoopジョブで使用可能な汎用オプションです。

Sr.No. GENERIC_OPTION & Description
1

-submit <job-file>

ジョブを送信します。

2

-status <job-id>

マップを印刷し、完了率とすべてのジョブカウンターを減らします。

3

-counter <job-id> <group-name> <countername>

カウンター値を出力します。

4

-kill <job-id>

ジョブを強制終了します。

5

-events <job-id> <fromevent-> <-of-events>

指定された範囲でジョブトラッカーが受信したイベントの詳細を印刷します。

6

-history [all] <jobOutputDir> - history < jobOutputDir>

ジョブの詳細、失敗したチップ、終了したチップの詳細を出力します。 [all]オプションを指定すると、成功したタスクや各タスクのタスク試行など、ジョブに関する詳細を表示できます。

7

-list[all]

すべてのジョブを表示します。 -listは、まだ完了していないジョブのみを表示します。

8

-kill-task <task-id>

タスクを強制終了します。 終了したタスクは、失敗した試行に対してカウントされません。

9

-fail-task <task-id>

タスクを失敗させます。 失敗したタスクは、失敗した試行に対してカウントされます。

10

-set-priority <job-id> <priority>

ジョブの優先度を変更します。 許可される優先度の値は、VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOWです。

ジョブのステータスを表示するには

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

ジョブoutput-dirの履歴を表示するには

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history/user/expert/output

仕事を殺すために

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004