Hadoop-mapreduce
Hadoop-MapReduce
MapReduceは、市販のハードウェアの大規模なクラスターで、大量のデータを信頼性の高い方法で並列処理するアプリケーションを作成できるフレームワークです。
MapReduceとは何ですか?
MapReduceは、Javaに基づく分散コンピューティングの処理技術およびプログラムモデルです。 MapReduceアルゴリズムには、MapとReduceという2つの重要なタスクが含まれています。 Mapはデータのセットを取得し、それを別のデータのセットに変換します。個々の要素はタプル(キー/値のペア)に分解されます。 次に、reduceタスクは、マップからの出力を入力として受け取り、それらのデータタプルを小さなタプルセットに結合します。 MapReduceという名前のシーケンスが示すように、reduceタスクは常にマップジョブの後に実行されます。
MapReduceの主な利点は、複数のコンピューティングノードでデータ処理を簡単にスケーリングできることです。 MapReduceモデルでは、データ処理プリミティブはマッパーおよびレデューサーと呼ばれます。 データ処理アプリケーションを_mappers_と_reducers_に分解することは、簡単ではない場合があります。 ただし、アプリケーションをMapReduceフォームで記述した後、クラスター内で数百、数千、または数万のマシンで実行するようにアプリケーションをスケーリングすることは、構成の変更にすぎません。 この単純なスケーラビリティは、多くのプログラマーがMapReduceモデルを使用するようになった理由です。
アルゴリズム
- 通常、MapReduceパラダイムは、データが存在する場所にコンピューターを送信することに基づいています!
- MapReduceプログラムは、マップステージ、シャッフルステージ、およびリデュースステージの3つのステージで実行されます。
- マップステージ-マップまたはマッパーの仕事は、入力データを処理することです。 通常、入力データはファイルまたはディレクトリの形式であり、Hadoopファイルシステム(HDFS)に保存されます。 入力ファイルは、1行ずつマッパー関数に渡されます。 マッパーはデータを処理し、いくつかの小さなデータチャンクを作成します。
- * Reduceステージ*-このステージは、 Shuffle ステージと Reduce ステージの組み合わせです。 Reducerの仕事は、マッパーからのデータを処理することです。 処理後、新しい出力セットが生成され、HDFSに保存されます。
- MapReduceジョブ中に、HadoopはMapおよびReduceタスクをクラスター内の適切なサーバーに送信します。
- フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター全体のデータのコピーなど、データの受け渡しに関するすべての詳細を管理します。
- ほとんどのコンピューティングは、ネットワークトラフィックを削減するローカルディスク上のデータを持つノードで実行されます。 *指定されたタスクの完了後、クラスターはデータを収集および削減して適切な結果を形成し、Hadoopサーバーに送り返します。
入力と出力(Javaパースペクティブ)
MapReduceフレームワークは<key、value>ペアで動作します。つまり、フレームワークはジョブへの入力を<key、value>ペアのセットとして表示し、ジョブの出力として<key、value>ペアのセットを生成します、おそらく異なるタイプの。
キーと値のクラスは、フレームワークによってシリアル化された方法である必要があるため、Writableインターフェイスを実装する必要があります。 さらに、キークラスは、フレームワークによるソートを容易にするために、Writable-Comparableインターフェイスを実装する必要があります。* MapReduceジョブの入力および出力タイプ*-(入力)<k1、v1>→マップ→<k2、v2>→削減→<k3、v3>(出力)。
Input | Output | |
---|---|---|
Map | <k1, v1> | list (<k2, v2>) |
Reduce | <k2, list(v2)> | list (<k3, v3>) |
用語
- PayLoad -アプリケーションはMapおよびReduce関数を実装し、ジョブのコアを形成します。
- マッパー-マッパーは、入力キー/値ペアを一連の中間キー/値ペアにマップします。
- NamedNode -Hadoop分散ファイルシステム(HDFS)を管理するノード。
- DataNode -処理が行われる前にデータが事前に提示されるノード。
- MasterNode -JobTrackerが実行され、クライアントからのジョブ要求を受け入れるノード。
- SlaveNode -Map and Reduceプログラムが実行されるノード。
- JobTracker -ジョブをスケジュールし、タスクトラッカーへの割り当てジョブを追跡します。
- タスクトラッカー-タスクを追跡し、ステータスをJobTrackerに報告します。
- ジョブ-プログラムとは、データセット全体でマッパーとリデューサーを実行することです。
- タスク-データのスライスに対するマッパーまたはリデューサーの実行。
- Task Attempt -SlaveNodeでタスクを実行しようとする特定のインスタンス。
シナリオ例
以下は、組織の電力消費に関するデータです。 さまざまな年の毎月の電気消費量と年間平均が含まれています。
Jan | Feb | Mar | Apr | May | Jun | Jul | Aug | Sep | Oct | Nov | Dec | Avg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
上記のデータが入力として与えられる場合、それを処理し、最大使用年、最小使用年などを見つけるなどの結果を生成するアプリケーションを作成する必要があります。 これは、有限数のレコードを持つプログラマーのためのウォークオーバーです。 単純に必要な出力を生成するロジックを記述し、書き込まれたアプリケーションにデータを渡します。
しかし、特定の州のすべての大規模産業の電気消費量は、その形成以来と考えてください。
このようなバルクデータを処理するアプリケーションを作成する場合、
- それらの実行には多くの時間がかかります。
- ソースからネットワークサーバーなどにデータを移動すると、大量のネットワークトラフィックが発生します。
これらの問題を解決するために、MapReduceフレームワークがあります。
入力データ
上記のデータは* sample.txt *として保存され、入力として提供されます。 入力ファイルは次のようになります。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
サンプルプログラム
以下に、MapReduceフレームワークを使用したサンプルデータへのプログラムを示します。
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits {
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable ,/*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()) {
lasttoken = s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxavg = 30;
int val = Integer.MIN_VALUE;
while (values.hasNext()) {
if((val = values.next().get())>maxavg) {
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception {
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
上記のプログラムを* ProcessUnits.java。*として保存します。プログラムのコンパイルと実行について以下に説明します。
プロセスユニットプログラムのコンパイルと実行
Hadoopユーザーのホームディレクトリにいると仮定しましょう(例:/home/hadoop)。
上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。
ステップ1
次のコマンドは、コンパイル済みのJavaクラスを格納するディレクトリを作成します。
$ mkdir units
ステップ2
MapReduceプログラムのコンパイルと実行に使用される Hadoop-core-1.2.1.jar をダウンロードします。 次のリンクhttp://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1[mvnrepository.com]にアクセスして、jarをダウンロードします。 ダウンロードしたフォルダが*/home/hadoop/.*であると仮定しましょう
ステップ3
次のコマンドは、 ProcessUnits.java プログラムをコンパイルし、プログラムのjarを作成するために使用されます。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/.
ステップ4
次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
ステップ5
次のコマンドを使用して、* sample.txt *という名前の入力ファイルをHDFSの入力ディレクトリにコピーします。
$HADOOP_HOME/bin/hadoop fs -put/home/hadoop/sample.txt input_dir
ステップ6
次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
ステップ7
次のコマンドは、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行するために使用されます。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
ファイルが実行されるまでしばらく待ちます。 実行後、以下に示すように、出力には入力分割の数、マップタスクの数、レデューサータスクの数などが含まれます。
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters
Launched map tasks = 2
Launched reduce tasks = 1
Data-local map tasks = 2
Total time spent by all maps in occupied slots (ms) = 146137
Total time spent by all reduces in occupied slots (ms) = 441
Total time spent by all map tasks (ms) = 14613
Total time spent by all reduce tasks (ms) = 44120
Total vcore-seconds taken by all map tasks = 146137
Total vcore-seconds taken by all reduce tasks = 44120
Total megabyte-seconds taken by all map tasks = 149644288
Total megabyte-seconds taken by all reduce tasks = 45178880
Map-Reduce Framework
Map input records = 5
Map output records = 5
Map output bytes = 45
Map output materialized bytes = 67
Input split bytes = 208
Combine input records = 5
Combine output records = 5
Reduce input groups = 5
Reduce shuffle bytes = 6
Reduce input records = 5
Reduce output records = 5
Spilled Records = 10
Shuffled Maps = 2
Failed Shuffles = 0
Merged Map outputs = 2
GC time elapsed (ms) = 948
CPU time spent (ms) = 5160
Physical memory (bytes) snapshot = 47749120
Virtual memory (bytes) snapshot = 2899349504
Total committed heap usage (bytes) = 277684224
File Output Format Counters
Bytes Written = 40
ステップ8
次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
ステップ9
*Part-00000* ファイルの出力を表示するには、次のコマンドを使用します。 このファイルはHDFSによって生成されます。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下は、MapReduceプログラムによって生成された出力です。
1981 34
1984 40
1985 45
ステップ10
次のコマンドは、分析のためにHDFSからローカルファイルシステムに出力フォルダーをコピーするために使用されます。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir/home/hadoop
重要なコマンド
すべてのHadoopコマンドは、 $ HADOOP_HOME/bin/hadoop コマンドによって呼び出されます。 引数なしでHadoopスクリプトを実行すると、すべてのコマンドの説明が出力されます。
使用法-hadoop [--config confdir]コマンド
次の表に、使用可能なオプションとその説明を示します。
Sr.No. | Option & Description |
---|---|
1 |
namenode -format DFSファイルシステムをフォーマットします。 |
2 |
secondarynamenode DFSセカンダリネームノードを実行します。 |
3 |
namenode DFS名前ノードを実行します。 |
4 |
datanode DFSデータノードを実行します。 |
5 |
dfsadmin DFS管理クライアントを実行します。 |
6 |
mradmin Map-Reduce管理クライアントを実行します。 |
7 |
fsck DFSファイルシステムチェックユーティリティを実行します。 |
8 |
fs 汎用ファイルシステムユーザークライアントを実行します。 |
9 |
balancer クラスターバランシングユーティリティを実行します。 |
10 |
oiv オフラインfsimageビューアーをfsimageに適用します。 |
11 |
fetchdt NameNodeから委任トークンを取得します。 |
12 |
jobtracker MapReduceジョブトラッカーノードを実行します。 |
13 |
pipes パイプジョブを実行します。 |
14 |
tasktracker MapReduceタスクトラッカーノードを実行します。 |
15 |
historyserver ジョブ履歴サーバーをスタンドアロンデーモンとして実行します。 |
16 |
job MapReduceジョブを操作します。 |
17 |
queue JobQueuesに関する情報を取得します。 |
18 |
version バージョンを出力します。 |
19 |
jar <jar> jarファイルを実行します。 |
20 |
distcp <srcurl> <desturl> ファイルまたはディレクトリを再帰的にコピーします。 |
21 |
distcp2 <srcurl> <desturl> DistCpバージョン2。 |
22 |
archive -archiveName NAME -p <parent path> <src> <dest>* hadoopアーカイブを作成します。 |
23 |
classpath Hadoop jarおよび必要なライブラリを取得するために必要なクラスパスを出力します。 |
24 |
daemonlog 各デーモンのログレベルを取得/設定します |
MapReduceジョブと対話する方法
使用法-hadoopジョブ[GENERIC_OPTIONS]
以下は、Hadoopジョブで使用可能な汎用オプションです。
Sr.No. | GENERIC_OPTION & Description |
---|---|
1 |
-submit <job-file> ジョブを送信します。 |
2 |
-status <job-id> マップを印刷し、完了率とすべてのジョブカウンターを減らします。 |
3 |
-counter <job-id> <group-name> <countername> カウンター値を出力します。 |
4 |
-kill <job-id> ジョブを強制終了します。 |
5 |
-events <job-id> <fromevent-> <-of-events> 指定された範囲でジョブトラッカーが受信したイベントの詳細を印刷します。 |
6 |
-history [all] <jobOutputDir> - history < jobOutputDir> ジョブの詳細、失敗したチップ、終了したチップの詳細を出力します。 [all]オプションを指定すると、成功したタスクや各タスクのタスク試行など、ジョブに関する詳細を表示できます。 |
7 |
-list[all] すべてのジョブを表示します。 -listは、まだ完了していないジョブのみを表示します。 |
8 |
-kill-task <task-id> タスクを強制終了します。 終了したタスクは、失敗した試行に対してカウントされません。 |
9 |
-fail-task <task-id> タスクを失敗させます。 失敗したタスクは、失敗した試行に対してカウントされます。 |
10 |
-set-priority <job-id> <priority> ジョブの優先度を変更します。 許可される優先度の値は、VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOWです。 |
ジョブのステータスを表示するには
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
ジョブoutput-dirの履歴を表示するには
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history/user/expert/output
仕事を殺すために
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004