Map-reduce-quick-guide

提供:Dev Guides
移動先:案内検索

MapReduce-はじめに

MapReduceは、複数のノードでビッグデータを並列処理できるアプリケーションを作成するためのプログラミングモデルです。 MapReduceは、膨大な量の複雑なデータを分析するための分析機能を提供します。

ビッグデータとは?

ビッグデータは、従来のコンピューティング技術では処理できない大規模なデータセットのコレクションです。 たとえば、FacebookまたはYoutubeが毎日収集および管理する必要があるデータの量は、ビッグデータのカテゴリに分類されます。 ただし、ビッグデータはスケールとボリュームだけでなく、速度、多様性、ボリューム、複雑さの1つ以上の側面も含みます。

MapReduceを選ぶ理由

従来のエンタープライズシステムには、通常、データを保存および処理するための集中サーバーがあります。 次の図は、従来のエンタープライズシステムの概略図を示しています。 従来のモデルは、確かに膨大な量のスケーラブルなデータの処理には適さず、標準のデータベースサーバーでは対応できません。 さらに、集中システムでは、複数のファイルを同時に処理する際にボトルネックが多くなりすぎます。

従来のエンタープライズシステムビュー

GoogleはMapReduceと呼ばれるアルゴリズムを使用してこのボトルネックの問題を解決しました。 MapReduceは、タスクを小さな部分に分割し、それらを多くのコンピューターに割り当てます。 その後、結果は1か所で収集され、統合されて結果データセットが形成されます。

集中システム

MapReduceの仕組み

MapReduceアルゴリズムには、MapとReduceという2つの重要なタスクが含まれています。

  • Mapタスクはデータのセットを受け取り、それを別のデータのセットに変換します。個々の要素はタプル(キーと値のペア)に分解されます。
  • Reduceタスクは、Mapからの出力を入力として受け取り、それらのデータタプル(キーと値のペア)を小さなタプルセットに結合します。

削減タスクは、マップジョブの後に常に実行されます。

次に、各フェーズを詳しく見て、それらの重要性を理解してみましょう。

フェーズ

  • 入力フェーズ-入力ファイル内の各レコードを変換し、キーと値のペアの形式でマッパーに解析済みデータを送信するレコードリーダーがあります。
  • Map -Mapはユーザー定義関数であり、一連のキーと値のペアを受け取り、それらの各ペアを処理して0個以上のキーと値のペアを生成します。
  • 中間キー-マッパーによって生成されたキーと値のペアは、中間キーとして知られています。
  • コンバイナー-コンバイナーは、マップフェーズからの類似データを識別可能なセットにグループ化するローカルレデューサーの一種です。 マッパーから中間キーを入力として受け取り、ユーザー定義のコードを適用して、1つのマッパーの小さなスコープで値を集計します。 これは、メインのMapReduceアルゴリズムの一部ではありません。オプションです。
  • シャッフルとソート-レデューサータスクはシャッフルとソートのステップから始まります。 グループ化されたキーと値のペアを、Reducerが実行されているローカルマシンにダウンロードします。 個々のキーと値のペアは、キーごとにソートされてより大きなデータリストになります。 データリストは同等のキーをグループ化して、Reducerタスクで値を簡単に反復できるようにします。
  • Reducer -Reducerは、グループ化されたキーと値のペアのデータを入力として受け取り、それらのそれぞれに対してReducer関数を実行します。 ここでは、データをさまざまな方法で集約、フィルタリング、結合することができ、幅広い処理が必要です。 実行が終了すると、最終ステップに0個以上のキーと値のペアが与えられます。
  • 出力フェーズ-出力フェーズには、Reducer関数からの最終的なキーと値のペアを変換し、レコードライターを使用してファイルに書き込む出力フォーマッターがあります。

小さな図の助けを借りて2つのタスクMap&f Reduceを理解してみましょう-

MapReduce Work

MapReduce-Example

MapReduceのパワーを理解するために、実際の例を見てみましょう。 Twitterは1日に約5億のツイートを受信します。これは1秒あたり約3000のツイートです。 次の図は、MapReduceを使用してTweeterがツイートを管理する方法を示しています。

MapReduceの例

図に示すように、MapReduceアルゴリズムは次のアクションを実行します-

  • Tokenize -ツイートをトークンのマップにトークン化し、キーと値のペアとして書き込みます。
  • Filter -トークンのマップから不要な単語をフィルタリングし、フィルタリングされたマップをキーと値のペアとして書き込みます。
  • Count -単語ごとにトークンカウンターを生成します。
  • 集計カウンタ-同様のカウンタ値の集計を小さな管理可能な単位に準備します。

MapReduce-アルゴリズム

MapReduceアルゴリズムには、MapとReduceという2つの重要なタスクが含まれています。

  • マップタスクは、マッパークラスによって実行されます。
  • 削減タスクは、Reducerクラスによって実行されます。

マッパークラスは入力を受け取り、トークン化し、マッピングし、並べ替えます。 Mapperクラスの出力は、Reducerクラスの入力として使用されます。Reducerクラスは、一致するペアを順番に検索してそれらを削減します。

マッパーリデューサークラス

MapReduceはさまざまな数学アルゴリズムを実装して、タスクを小さな部分に分割し、それらを複数のシステムに割り当てます。 技術的には、MapReduceアルゴリズムは、Map&Reduceタスクをクラスター内の適切なサーバーに送信するのに役立ちます。

これらの数学的アルゴリズムには次のものが含まれます-

  • ソート
  • 検索中
  • 索引付け
  • TF-IDF

ソート

ソートは、データを処理および分析するための基本的なMapReduceアルゴリズムの1つです。 MapReduceは、キーによってマッパーからの出力キーと値のペアを自動的にソートするソートアルゴリズムを実装しています。

  • 並べ替えメソッドは、マッパークラス自体に実装されます。
  • シャッフルおよびソートフェーズでは、マッパークラスの値をトークン化した後、 Context クラス(ユーザー定義クラス)は、一致する値のキーをコレクションとして収集します。
  • 同様のキーと値のペア(中間キー)を収集するために、Mapperクラスは RawComparator クラスの助けを借りてキーと値のペアをソートします。
  • 特定のレデューサーの中間キーと値のペアのセットは、Reduceに提示される前に、Hadoopによってキー値(K2、\ {V2、V2、…})を自動的にソートされます。

検索中

検索は、MapReduceアルゴリズムで重要な役割を果たします。 これは、コンバイナーフェーズ(オプション)およびレデューサーフェーズで役立ちます。 例の助けを借りて、検索がどのように機能するかを理解してみましょう。

次の例は、MapReduceが検索アルゴリズムを使用して、特定の従業員データセットで最高の給与を引き出している従業員の詳細を見つける方法を示しています。

  • A、B、C、Dの4つの異なるファイルに従業員データがあると仮定します。 また、すべてのデータベーステーブルから従業員データを繰り返しインポートするため、4つのファイルすべてに従業員レコードが重複していると仮定します。 次の図を参照してください。

Map Reduce Illustration

  • *マップフェーズ*では、各入力ファイルを処理し、従業員データをキーと値のペア(<k、v>:<emp name、salary>)で提供します。 次の図を参照してください。

Map Reduce Illustration

  • コンバイナーフェーズ(検索手法)は、マップフェーズからの入力を従業員名と給与のキーと値のペアとして受け入れます。 検索手法を使用して、コンバイナはすべての従業員の給与をチェックして、各ファイルで最高の給与を受け取る従業員を見つけます。 次のスニペットを参照してください。
<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary

if(v(second employee).salary > Max){
   Max = v(salary);
}

else{
   Continue checking;
}

期待される結果は次のとおりです-

a|

<satish, 26000>

| a|

<gopal, 50000>

| a|

<kiran, 45000>

| a|

<manisha, 45000>

|

  • リデューサーフェーズ-各ファイルを作成すると、最高給の従業員が見つかります。 冗長性を回避するには、すべての<k、v>ペアを確認し、重複するエントリがあれば削除します。 4つの入力ファイルからの4つの<k、v>ペアの間でも同じアルゴリズムが使用されます。 最終的な出力は次のようになります-
<gopal, 50000>

索引付け

通常、インデックスは特定のデータとそのアドレスを指すために使用されます。 特定のマッパーの入力ファイルに対してバッチインデックス作成を実行します。

MapReduceで通常使用されるインデックス作成手法は、*反転インデックス*と呼ばれます。GoogleやBingなどの検索エンジンは、反転インデックス作成手法を使用します。 簡単な例を使用して、インデックス作成がどのように機能するかを理解してみましょう。

次のテキストは、逆索引付けの入力です。 ここで、T [0]、T [1]、およびt [2]はファイル名であり、その内容は二重引用符で囲まれています。

T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"

インデックスアルゴリズムを適用した後、次の出力が得られます-

"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}

ここで「a」:\ {2}は、「a」という用語がT [2]ファイルに現れることを意味します。 同様に、「is」:\ {0、1、2}は、ファイルT [0]、T [1]、およびT [2]に「is」という用語が含まれることを意味します。

TF-IDF

TF-IDFはテキスト処理アルゴリズムで、Term Frequency-Inverse Document Frequencyの略です。 これは、一般的なWeb分析アルゴリズムの1つです。 ここで、「頻度」という用語は、ドキュメント内で用語が出現する回数を指します。

期間頻度(TF)

ドキュメント内で特定の用語が出現する頻度を測定します。 これは、ドキュメント内の単語の出現回数をそのドキュメント内の単語の総数で割って計算されます。

TF(the) = (Number of times term the ‘the’ appears in a document)/(Total number of terms in the document)

逆ドキュメント頻度(IDF)

用語の重要性を測定します。 テキストデータベース内のドキュメントの数を特定の用語が出現するドキュメントの数で割って計算されます。

TFの計算中、すべての用語は等しく重要であると見なされます。 つまり、TFは、「is」、「a」、「what」などの通常の単語の用語頻度をカウントします。 したがって、以下を計算することにより、まれな用語をスケールアップしながら頻繁な用語を知る必要があります-

IDF(the) = log_e(Total number of documents/Number of documents with term ‘the’ in it).

このアルゴリズムは、小さな例を使用して以下に説明されています。

単語 hive が50回出現する、1000個の単語を含むドキュメントを考えます。 hive のTFは(50/1000)= 0.05です。

ここで、1,000万のドキュメントがあり、これらの1000個に hive という単語が表示されているとします。 次に、IDFはlog(10,000,000/1,000)= 4として計算されます。

TF-IDF重量は、これらの量の積-0.05×4 = 0.20です。

MapReduce-インストール

MapReduceはLinuxフレーバーオペレーティングシステムでのみ動作し、Hadoopフレームワークが組み込まれています。 Hadoopフレームワークをインストールするには、次の手順を実行する必要があります。

JAVAインストールの検証

Hadoopをインストールする前に、Javaをシステムにインストールする必要があります。 次のコマンドを使用して、システムにJavaがインストールされているかどうかを確認します。

$ java –version

Javaがすでにシステムにインストールされている場合、次の応答が表示されます-

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

システムにJavaがインストールされていない場合は、以下の手順に従ってください。

Javaのインストール

ステップ1

次のリンクから最新バージョンのJavaをダウンロードします-http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260l [このリンク]。

ダウンロード後、Downloadsフォルダーで jdk-7u71-linux-x64.tar.gz ファイルを見つけることができます。

ステップ2

次のコマンドを使用して、jdk-7u71-linux-x64.gzの内容を抽出します。

$ cd Downloads/
$ ls
jdk-7u71-linux-x64.gz
$ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz

ステップ3

Javaをすべてのユーザーが利用できるようにするには、Javaを「/usr/local/」の場所に移動する必要があります。 ルートに移動し、次のコマンドを入力します-

$ su
password:
# mv jdk1.7.0_71/usr/local/java
# exit

ステップ4

PATHおよびJAVA_HOME変数を設定するには、次のコマンドを〜/.bashrcファイルに追加します。

export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin

現在実行中のシステムにすべての変更を適用します。

$ source ~/.bashrc

ステップ5

次のコマンドを使用して、Javaの代替を構成します-

# alternatives --install/usr/bin/java java usr/local/java/bin/java 2

# alternatives --install/usr/bin/javac javac usr/local/java/bin/javac 2

# alternatives --install/usr/bin/jar jar usr/local/java/bin/jar 2

# alternatives --set java usr/local/java/bin/java

# alternatives --set javac usr/local/java/bin/javac

# alternatives --set jar usr/local/java/bin/jar

次に、ターミナルからコマンド java -version を使用してインストールを確認します。

Hadoopインストールの検証

MapReduceをインストールする前に、Hadoopをシステムにインストールする必要があります。 次のコマンドを使用してHadoopのインストールを確認しましょう-

$ hadoop version

Hadoopが既にシステムにインストールされている場合、次の応答が表示されます-

Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

Hadoopがシステムにインストールされていない場合は、次の手順に進みます。

Hadoopをダウンロードする

Apache Software FoundationからHadoop 2.4.1をダウンロードし、次のコマンドを使用してその内容を抽出します。

$ su
password:
# cd/usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

擬似分散モードでのHadoopのインストール

以下の手順を使用して、Hadoop 2.4.1を擬似分散モードでインストールします。

ステップ1-Hadoopのセットアップ

〜/.bashrcファイルに次のコマンドを追加することにより、Hadoop環境変数を設定できます。

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

現在実行中のシステムにすべての変更を適用します。

$ source ~/.bashrc

ステップ2-Hadoopの構成

すべてのHadoop設定ファイルは、「$ HADOOP_HOME/etc/hadoop」の場所にあります。 Hadoopインフラストラクチャに応じて、これらの構成ファイルに適切な変更を加える必要があります。

$ cd $HADOOP_HOME/etc/hadoop

Javaを使用してHadoopプログラムを開発するには、JAVA_HOME値をシステム内のJavaの場所に置き換えて、 hadoop-env.sh ファイルのJava環境変数をリセットする必要があります。

export JAVA_HOME=/usr/local/java

Hadoopを設定するには、次のファイルを編集する必要があります-

  • core-site.xml
  • hdfs-site.xml
  • yarn-site.xml
  • mapred-site.xml

core-site.xml

core-site.xmlには次の情報が含まれています

  • Hadoopインスタンスに使用されるポート番号
  • ファイルシステムに割り当てられたメモリ
  • データを保存するためのメモリ制限
  • 読み取り/書き込みバッファのサイズ

core-site.xmlを開き、<configuration>タグと</configuration>タグの間に次のプロパティを追加します。

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000 </value>
   </property>
</configuration>

hdfs-site.xml

hdfs-site.xmlには次の情報が含まれています-

  • 複製データの価値
  • namenodeパス
  • ローカルファイルシステムのデータノードパス(Hadoopインフラを保存する場所)

次のデータを想定します。

dfs.replication (data replication value) = 1

(In the following path/hadoop/is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path =//home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path =//home/hadoop/hadoopinfra/hdfs/datanode

このファイルを開き、<configuration>タグと</configuration>タグの間に次のプロパティを追加します。

<configuration>

   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>

   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
   </property>

   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
   </property>

</configuration>

-上記のファイルでは、すべてのプロパティ値はユーザー定義であり、Hadoopインフラストラクチャに応じて変更できます。

yarn-site.xml

このファイルは、Hadoopに糸を設定するために使用されます。 yarn-site.xmlファイルを開き、<configuration>タグと</configuration>タグの間に次のプロパティを追加します。

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

このファイルは、使用しているMapReduceフレームワークを指定するために使用されます。 デフォルトでは、Hadoopにはyarn-site.xmlのテンプレートが含まれています。 まず、次のコマンドを使用して、mapred-site.xml.templateからmapred-site.xmlファイルにファイルをコピーする必要があります。

$ cp mapred-site.xml.template mapred-site.xml

mapred-site.xmlファイルを開き、<configuration>タグと</configuration>タグの間に次のプロパティを追加します。

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

Hadoopインストールの検証

次の手順を使用して、Hadoopのインストールを確認します。

ステップ1-ノードのセットアップに名前を付ける

次のようにコマンド「hdfs namenode -format」を使用してネームノードを設定します-

$ cd ~
$ hdfs namenode -format

期待される結果は次のとおりです-

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:

/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

ステップ2-Hadoop dfの検証

次のコマンドを実行して、Hadoopファイルシステムを起動します。

$ start-dfs.sh

予想される出力は次のとおりです-

10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to/home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to/home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

ステップ3-糸スクリプトの検証

次のコマンドを使用して、糸スクリプトを開始します。 このコマンドを実行すると、糸デーモンが起動します。

$ start-yarn.sh

予想される出力は次のとおりです-

starting yarn daemons
starting resourcemanager, logging to/home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to/home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

ステップ4-ブラウザーでHadoopにアクセスする

Hadoopにアクセスするためのデフォルトのポート番号は50070です。 次のURLを使用して、ブラウザーでHadoopサービスを取得します。

http://localhost:50070/

次のスクリーンショットは、Hadoopブラウザーを示しています。

Hadoopブラウザ

ステップ5-クラスターのすべてのアプリケーションを確認する

クラスターのすべてのアプリケーションにアクセスするためのデフォルトのポート番号は8088です。 このサービスを使用するには、次のURLを使用してください。

http://localhost:8088/

次のスクリーンショットは、Hadoopクラスターブラウザーを示しています。

Hadoopクラスターブラウザー

MapReduce-API

この章では、MapReduceプログラミングの操作に関係するクラスとそのメソッドを詳しく見ていきます。 私たちは主に次のものに焦点を当て続けます-

  • JobContextインターフェイス
  • 職種
  • マッパークラス *減速機クラス

JobContextインターフェイス

JobContextインターフェイスは、すべてのクラスのスーパーインターフェイスであり、MapReduceのさまざまなジョブを定義します。 タスクの実行中にタスクに提供されるジョブの読み取り専用ビューを提供します。

以下は、JobContextインターフェースのサブインターフェースです。

S.No. Subinterface Description
1.
  • MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>*

マッパーに与えられるコンテキストを定義します。

2.

ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

レデューサーに渡されるコンテキストを定義します。

ジョブクラスは、JobContextインターフェイスを実装するメインクラスです。

職種

Jobクラスは、MapReduce APIで最も重要なクラスです。 これにより、ユーザーはジョブの構成、送信、実行の制御、状態の照会を行うことができます。 setメソッドは、ジョブが送信されるまでのみ機能し、その後IllegalStateExceptionをスローします。

通常、ユーザーはアプリケーションを作成し、ジョブのさまざまな側面を説明してから、ジョブを送信してその進行状況を監視します。

ジョブを送信する方法の例を次に示します-

//Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);

//Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

//Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

コンストラクタ

Jobクラスのコンストラクターの概要を以下に示します。

S.No Constructor Summary
1 Job()
2 Job(Configuration conf)
3 Job(Configuration conf, String jobName)

方法

ジョブクラスの重要なメソッドのいくつかは次のとおりです-

S.No Method Description
1

getJobName()

ユーザー指定のジョブ名。

2

getJobState()

ジョブの現在の状態を返します。

3

isComplete()

ジョブが終了したかどうかを確認します。

4

setInputFormatClass()

ジョブのInputFormatを設定します。

5

setJobName(String name)

ユーザー指定のジョブ名を設定します。

6

setOutputFormatClass()

ジョブの出力形式を設定します。

7

setMapperClass(Class)

ジョブのマッパーを設定します。

8

setReducerClass(Class)

ジョブのレデューサーを設定します。

9

setPartitionerClass(Class)

ジョブのパーティショナーを設定します。

10

setCombinerClass(Class)

ジョブの結合器を設定します。

マッパークラス

Mapperクラスは、Mapジョブを定義します。 入力キーと値のペアを一連の中間キーと値のペアにマップします。 マップは、入力レコードを中間レコードに変換する個々のタスクです。 変換された中間レコードは、入力レコードと同じタイプである必要はありません。 特定の入力ペアは、ゼロまたは多数の出力ペアにマッピングできます。

方法

*map* は、Mapperクラスの最も顕著なメソッドです。 構文は以下に定義されています-
map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)

このメソッドは、入力スプリットのキーと値のペアごとに1回呼び出されます。

減速機クラス

Reducerクラスは、MapReduceでReduceジョブを定義します。 キーを共有する中間値のセットをより小さな値のセットに減らします。 レデューサーの実装は、JobContext.getConfiguration()メソッドを介してジョブの構成にアクセスできます。 レデューサーには、シャッフル、ソート、リデュースの3つの主要なフェーズがあります。

  • シャッフル-レデューサーは、ネットワーク全体でHTTPを使用して各マッパーからソートされた出力をコピーします。
  • ソート-フレームワークは、Reducerの入力をキーでマージソートします(異なるマッパーが同じキーを出力する可能性があるため)。 シャッフルフェーズとソートフェーズは同時に発生します。つまり、出力のフェッチ中にマージされます。
  • Reduce -このフェーズでは、ソートされた入力の各<key、(values of collection)>に対してreduce(Object、Iterable、Context)メソッドが呼び出されます。

方法

*reduce* は、Reducerクラスの最も顕著なメソッドです。 構文は以下に定義されています-
reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)

このメソッドは、キーと値のペアのコレクションのキーごとに1回呼び出されます。

MapReduce-Hadoopの実装

MapReduceは、商品ハードウェアの大規模なクラスターで大量のデータを信頼性の高い方法で処理するアプリケーションを作成するために使用されるフレームワークです。 この章では、Javaを使用したHadoopフレームワークでのMapReduceの操作について説明します。

MapReduceアルゴリズム

通常、MapReduceパラダイムは、実際のデータが存在するコンピューターにmap-reduceプログラムを送信することに基づいています。

  • MapReduceジョブ中に、HadoopはMapおよびReduceタスクをクラスター内の適切なサーバーに送信します。
  • フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター全体のデータのコピーなど、データの受け渡しに関するすべての詳細を管理します。
  • ほとんどの計算は、ネットワークトラフィックを削減するローカルディスク上のデータを使用してノードで実行されます。
  • 所定のタスクを完了すると、クラスターはデータを収集および削減して適切な結果を生成し、Hadoopサーバーに送り返します。

MapReduceアルゴリズム

入力と出力(Javaパースペクティブ)

MapReduceフレームワークは、キーと値のペアで動作します。つまり、フレームワークは、ジョブへの入力をキーと値のペアのセットと見なし、ジョブの出力として、おそらく異なるタイプのキーと値のペアのセットを生成します。

キーと値のクラスはフレームワークによってシリアル化できる必要があるため、Writableインターフェイスを実装する必要があります。 さらに、キークラスは、フレームワークによる並べ替えを容易にするためにWritableComparableインターフェイスを実装する必要があります。

MapReduceジョブの入力と出力の両方の形式は、キーと値のペアの形式です-

(入力)<k1、v1>→ map→ <k2、v2>→ reduce→ <k3、v3>(出力)。

Input Output
Map <k1, v1> list (<k2, v2>)
Reduce <k2, list(v2)> list (<k3, v3>)

MapReduceの実装

次の表は、組織の電力消費に関するデータを示しています。 この表には、5年間連続の月間電力消費量と年間平均が含まれています。

Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec Avg
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

指定された表の入力データを処理するアプリケーションを作成して、最大使用年、最小使用年などを見つける必要があります。 このタスクは、必要な出力を生成するロジックを記述し、書き込まれたアプリケーションにデータを渡すだけなので、レコード数が限られているプログラマーにとっては簡単です。

入力データのスケールを上げましょう。 特定の州のすべての大規模産業の電力消費量を分析する必要があるとします。 このようなバルクデータを処理するアプリケーションを作成する場合、

  • それらの実行には多くの時間がかかります。
  • ソースからネットワークサーバーにデータを移動すると、大量のネットワークトラフィックが発生します。

これらの問題を解決するために、MapReduceフレームワークがあります。

入力データ

上記のデータは sample.txt として保存され、入力として提供されます。 入力ファイルは次のようになります。

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

サンプルプログラム

サンプルデータ用の次のプログラムは、MapReduceフレームワークを使用しています。

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
  //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable, /*Input key Type */
   Text,                  /*Input value Type*/
   Text,                  /*Output key Type*/
   IntWritable>           /*Output value Type*/
   {
     //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();

         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }

         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }

  //Reducer class

   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
     //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }

  //Main function

   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);

      conf.setJobName("max_eletricityunits");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
   }
}

上記のプログラムを ProcessUnits.java に保存します。 プログラムのコンパイルと実行を以下に示します。

ProcessUnitsプログラムのコンパイルと実行

Hadoopユーザーのホームディレクトリにいると仮定しましょう(例:/home/hadoop)。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

  • ステップ1 *-次のコマンドを使用して、コンパイル済みのJavaクラスを保存するディレクトリを作成します。
$ mkdir units
  • ステップ2 *-MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。 mvnrepository.comからjarをダウンロードします。 ダウンロードフォルダーが/home/hadoop/であると仮定しましょう。

ステップ3 *-次のコマンドを使用して、 *ProcessUnits.java プログラムをコンパイルし、プログラムのjarを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/.
  • ステップ4 *-次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

ステップ5 *-次のコマンドを使用して、HDFSの入力ディレクトリにある *sample.txt という名前の入力ファイルをコピーします。

$HADOOP_HOME/bin/hadoop fs -put/home/hadoop/sample.txt input_dir
  • ステップ6 *-次のコマンドを使用して、入力ディレクトリ内のファイルを確認します
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
  • ステップ7 *-次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。 実行後、出力にはいくつかの入力分割、Mapタスク、Reducerタスクなどが含まれます。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters

   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2

   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120

   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120

   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework

   Map input records=5

   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67

   Input split bytes=208
   Combine input records=5
   Combine output records=5

   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5

   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2

   GC time elapsed (ms)=948
   CPU time spent (ms)=5160

   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504

   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40
  • ステップ8 *-次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/

ステップ9 *- *Part-00000 ファイルの出力を確認するには、次のコマンドを使用します。 このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下はMapReduceプログラムによって生成された出力です-

1981 34
1984 40
1985 45
  • ステップ10 *-次のコマンドを使用して、出力フォルダーをHDFSからローカルファイルシステムにコピーします。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir/home/hadoop

MapReduce-パーティショナー

パーティショナーは、入力データセットを処理する条件のように機能します。 分割フェーズは、マップフェーズの後、削減フェーズの前に行われます。

パーティショナーの数は、レデューサーの数と同じです。 つまり、パーティショナーはレデューサーの数に応じてデータを分割します。 したがって、単一のパーティショナーから渡されたデータは、単一のReducerによって処理されます。

パーティショナー

パーティショナーは、中間のMap出力のキーと値のペアをパーティション分割します。 ハッシュ関数のように機能するユーザー定義の条件を使用してデータを分割します。 パーティションの合計数は、ジョブのReducerタスクの数と同じです。 パーティショナーがどのように機能するかを理解するために例を挙げましょう。

MapReduce Partitionerの実装

便宜上、次のデータを持つEmployeeという小さなテーブルがあると仮定します。 このサンプルデータを入力データセットとして使用して、パーティショナーの動作を示します。

Id Name Age Gender Salary
1201 gopal 45 Male 50,000
1202 manisha 40 Female 50,000
1203 khalil 34 Male 30,000
1204 prasanth 30 Male 30,000
1205 kiran 20 Male 40,000
1206 laxmi 25 Female 35,000
1207 bhavya 20 Female 15,000
1208 reshma 19 Female 15,000
1209 kranthi 22 Male 22,000
1210 Satish 24 Male 25,000
1211 Krishna 25 Male 25,000
1212 Arshad 28 Male 20,000
1213 lavanya 18 Female 8,000

入力データセットを処理して、さまざまな年齢層(20歳未満、21歳から30歳、30歳以上など)の性別ごとに最高給与の従業員を見つけるアプリケーションを作成する必要があります。

入力データ

上記のデータは、「/home/hadoop/hadoopPartitioner」ディレクトリに input.txt として保存され、入力として提供されます。

1201 gopal 45 Male 50000
1202 manisha 40 Female 51000
1203 khaleel 34 Male 30000
1204 prasanth 30 Male 31000
1205 kiran 20 Male 40000
1206 laxmi 25 Female 35000
1207 bhavya 20 Female 15000
1208 reshma 19 Female 14000
1209 kranthi 22 Male 22000
1210 Satish 24 Male 25000
1211 Krishna 25 Male 26000
1212 Arshad 28 Male 20000
1213 lavanya 18 Female 8000

指定された入力に基づいて、プログラムのアルゴリズムの説明を以下に示します。

マップタスク

マップタスクは、テキストファイルにテキストデータがある間は、キーと値のペアを入力として受け入れます。 このマップタスクの入力は次のとおりです-

入力-キーは、「任意の特別なキー&plus;」などのパターンになります。ファイル名&plus;行番号」(例:キー= @ input1)、値はその行のデータになります(例:値= 1201 \ t gopal \ t 45 \ t男性\ t 50000)。

方法-このマップタスクの操作は次のとおりです-

  • 文字列の引数リストから入力値として取得される*値*(レコードデータ)を読み取ります。
  • split関数を使用して、性別を分離し、文字列変数に保存します。
String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • マップタスクから*パーティションタスク*に出力キーと値のペアとして性別情報とレコードデータ*値*を送信します。
context.write(new Text(gender), new Text(value));
  • テキストファイル内のすべてのレコードに対して上記のすべての手順を繰り返します。

出力-性別データとレコードデータ値をキーと値のペアとして取得します。

パーティショナータスク

パーティショナータスクは、マップタスクからのキーと値のペアを入力として受け入れます。 パーティションとは、データをセグメントに分割することを意味します。 パーティションの指定された条件基準に従って、入力キーと値のペアのデータは、年齢基準に基づいて3つの部分に分割できます。

入力-キーと値のペアのコレクション内のデータ全体。

キー=レコード内の性別フィールド値。

値=その性別のレコードデータ値全体。

方法-パーティションロジックのプロセスは次のように実行されます。

  • 入力キーと値のペアから年齢フィールドの値を読み取ります。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 次の条件で年齢の値を確認します。
  • 20歳以下
  • 年齢20歳以上30歳以下。
  • 30歳以上。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

出力-キーと値のペアのデータ全体が、キーと値のペアの3つのコレクションに分割されます。 レデューサーは、各コレクションで個別に機能します。

タスクを減らす

パーティショナータスクの数は、レデューサータスクの数と同じです。 ここでは、3つのパーティショナータスクがあり、したがって3つのReducerタスクが実行されます。

入力-レデューサーは、キーと値のペアの異なるコレクションで3回実行されます。

キー=レコード内の性別フィールド値。

値=その性別のレコードデータ全体。

方法-次のロジックが各コレクションに適用されます。

  • 各レコードの給与フィールド値を読み取ります。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • max変数で給与を確認してください。 str [4]が最大給与の場合、str [4]をmaxに割り当てます。それ以外の場合は、ステップをスキップします。
if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • キーコレクションごとに手順1と2を繰り返します(キーコレクションは男性と女性です)。 これらの3つのステップを実行すると、男性キーコレクションから最大給与が1つ、女性キーコレクションから最大給与が1つ見つかります。
context.write(new Text(key), new IntWritable(max));

出力-最後に、異なる年齢層の3つのコレクションでキーと値のペアのデータのセットを取得します。 各年齢層の男性コレクションの最高給与と女性コレクションの最高給与がそれぞれ含まれています。

Map、Partitioner、およびReduceタスクを実行した後、キーと値のペアのデータの3つのコレクションは、出力として3つの異なるファイルに保存されます。

3つのタスクはすべて、MapReduceジョブとして扱われます。 これらのジョブの次の要件と仕様は、構成で指定する必要があります-

  • 職種名
  • キーと値の入力および出力形式
  • Map、Reduce、およびPartitionerタスクの個々のクラス
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

//File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

サンプルプログラム

次のプログラムは、MapReduceプログラムで特定の条件にパーティショナーを実装する方法を示しています。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
  //Map class

   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }

  //Reducer class

   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;

         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }

         context.write(new Text(key), new IntWritable(max));
      }
   }

  //Partitioner class

   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);

         if(numReduceTasks == 0)
         {
            return 0;
         }

         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }

   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();

      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);

      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));

      job.setMapperClass(MapClass.class);

      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);

     //set partitioner statement

      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);

      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }

   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

上記のコードを「/home/hadoop/hadoopPartitioner」に PartitionerExample.java として保存します。 プログラムのコンパイルと実行を以下に示します。

コンパイルと実行

Hadoopユーザーのホームディレクトリ(たとえば、/home/hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

  • ステップ1 *-MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。 jarはhttp://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1[mvnrepository.com]からダウンロードできます。

ダウンロードしたフォルダが「/home/hadoop/hadoopPartitioner」であると仮定しましょう

ステップ2 *-次のコマンドは、プログラム *PartitionerExample.java をコンパイルし、プログラムのjarを作成するために使用されます。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
  • ステップ3 *-次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

ステップ4 *-次のコマンドを使用して、HDFSの入力ディレクトリにある *input.txt という名前の入力ファイルをコピーします。

$HADOOP_HOME/bin/hadoop fs -put/home/hadoop/hadoopPartitioner/input.txt input_dir
  • ステップ5 *-次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
  • ステップ6 *-次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してTop salaryアプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

ファイルが実行されるまでしばらく待ちます。 実行後、出力にはいくつかの入力分割、マップタスク、およびReducerタスクが含まれます。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6

Job Counters

   Launched map tasks=1
   Launched reduce tasks=3

   Data-local map tasks=1

   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858

   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858

   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592

Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467

   Input split bytes=119

   Combine input records=0
   Combine output records=0

   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6

   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690

   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688

   Total committed heap usage (bytes)=334102528

Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0

   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0

File Input Format Counters

   Bytes Read=361

File Output Format Counters

   Bytes Written=72
  • ステップ7 *-次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/

プログラムで3つのパーティショナーと3つのレデューサーを使用しているため、出力は3つのファイルにあります。

ステップ8 *-次のコマンドを使用して、 *Part-00000 ファイルの出力を確認します。 このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
  • Part-00000で出力*
Female   15000
Male     40000

次のコマンドを使用して、 Part-00001 ファイルの出力を確認します。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
  • Part-00001での出力*
Female   35000
Male    31000

次のコマンドを使用して、 Part-00002 ファイルの出力を確認します。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
  • Part-00002の出力*
Female  51000
Male   50000

MapReduce-コンバイナー

Combinerは、 semi-reducer とも呼ばれ、Mapクラスからの入力を受け入れてから、出力のキーと値のペアをReducerクラスに渡すことで動作するオプションのクラスです。

Combinerの主な機能は、マップ出力レコードを同じキーで要約することです。 コンバイナの出力(キーと値のコレクション)は、ネットワークを介して入力として実際のReducerタスクに送信されます。

コンバイナー

MapクラスとReduceクラスの間でCombinerクラスを使用して、MapとReduce間のデータ転送量を削減します。 通常、mapタスクの出力は大きく、reduceタスクに転送されるデータは大きくなります。

次のMapReduceタスク図は、COMBINER PHASEを示しています。

Combiner

Combinerの仕組み

MapReduce Combinerの仕組みに関する簡単な概要を次に示します-

  • コンバイナには事前定義されたインターフェースがなく、Reducerインターフェースのreduce()メソッドを実装する必要があります。
  • コンバイナは、各マップ出力キーで動作します。 Reducerクラスと同じ出力Key-Valueタイプが必要です。
  • コンバイナは、元のマップ出力を置き換えるため、大規模なデータセットから要約情報を生成できます。

ただし、Combinerはオプションですが、データを複数のグループに分けてリデュースフェーズを支援し、処理を容易にします。

MapReduce Combinerの実装

次の例は、コンバイナに関する理論的なアイデアを提供します。 MapReduceの input.txt という名前の次の入力テキストファイルがあるとします。

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

Combinerを使用したMapReduceプログラムの重要なフェーズを以下で説明します。

レコードリーダー

これはMapReduceの最初のフェーズであり、レコードリーダーは入力テキストファイルからすべての行をテキストとして読み取り、キーと値のペアとして出力を生成します。

入力-入力ファイルの行ごとのテキスト。

出力-キーと値のペアを形成します。 以下は、予想されるキーと値のペアのセットです。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

マップフェーズ

マップフェーズでは、レコードリーダーから入力を取得して処理し、キーと値のペアの別のセットとして出力を生成します。

入力-次のキーと値のペアは、レコードリーダーから取得した入力です。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Mapフェーズでは、各キーと値のペアを読み取り、StringTokenizerを使用して各単語を値から除算し、各単語をキーとして扱い、その単語のカウントを値として扱います。 次のコードスニペットは、Mapperクラスとmap関数を示しています。

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();

   public void map(Object key, Text value, Context context) throws IOException, InterruptedException
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens())
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

出力-予想される出力は次のとおりです-

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

コンバイナーフェーズ

Combinerフェーズは、Mapフェーズから各キーと値のペアを取得して処理し、*キーと値のコレクション*のペアとして出力を生成します。

入力-次のキーと値のペアは、マップフェーズから取得した入力です。

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

コンバイナフェーズでは、各キーと値のペアを読み取り、キーとして一般的な単語を組み合わせ、コレクションとして値を組み合わせます。 通常、Combinerのコードと操作はReducerのコードと操作に似ています。 以下は、Mapper、Combiner、およびReducerのクラス宣言のコードスニペットです。

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

出力-予想される出力は次のとおりです-

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

減速機フェーズ

Reducerフェーズでは、Combinerフェーズから各キーと値のコレクションペアを取得して処理し、出力をキーと値のペアとして渡します。 Combiner機能はReducerと同じであることに注意してください。

入力-次のキーと値のペアは、コンバイナフェーズから取得した入力です。

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

レデューサーフェーズは、各キーと値のペアを読み取ります。 以下は、Combinerのコードスニペットです。

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
   private IntWritable result = new IntWritable();

   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
   {
      int sum = 0;
      for (IntWritable val : values)
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

出力-レデューサーフェーズから予想される出力は次のとおりです-

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

レコードライター

これはMapReduceの最後のフェーズで、Record WriterはReducerフェーズからのすべてのキーと値のペアを書き込み、出力をテキストとして送信します。

入力-出力形式とともに、Reducerフェーズからの各キーと値のペア。

出力-キーと値のペアをテキスト形式で提供します。 予想される出力は次のとおりです。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

サンプルプログラム

次のコードブロックは、プログラム内の単語の数をカウントします。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens())
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }

   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
      {
         int sum = 0;
         for (IntWritable val : values)
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }

   public static void main(String[] args) throws Exception
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");

      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);

      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);

      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

上記のプログラムを WordCount.java として保存します。 プログラムのコンパイルと実行を以下に示します。

コンパイルと実行

Hadoopユーザーのホームディレクトリ(たとえば、/home/hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

  • ステップ1 *-次のコマンドを使用して、コンパイル済みのJavaクラスを保存するディレクトリを作成します。
$ mkdir units
  • ステップ2 *-MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。 jarはhttp://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1[mvnrepository.com]からダウンロードできます。

ダウンロードしたフォルダーが/home/hadoop/であると仮定します。

ステップ3 *-次のコマンドを使用して、 *WordCount.java プログラムをコンパイルし、プログラムのjarを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/.
  • ステップ4 *-次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

ステップ5 *-次のコマンドを使用して、HDFSの入力ディレクトリにある *input.txt という名前の入力ファイルをコピーします。

$HADOOP_HOME/bin/hadoop fs -put/home/hadoop/input.txt input_dir
  • ステップ6 *-次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
  • ステップ7 *-次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してワードカウントアプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。 実行後、出力にはいくつかの入力分割、マップタスク、およびレデューサータスクが含まれます。

  • ステップ8 *-次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/

ステップ9 *-次のコマンドを使用して、 *Part-00000 ファイルの出力を確認します。 このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下は、MapReduceプログラムによって生成された出力です。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

MapReduce-Hadoop管理

この章では、HDFS管理とMapReduce管理の両方を含むHadoop管理について説明します。

  • HDFS管理には、HDFSファイル構造、場所、および更新されたファイルの監視が含まれます。
  • MapReduceの管理には、アプリケーションのリスト、ノードの構成、アプリケーションのステータスなどの監視が含まれます。

HDFSモニタリング

HDFS(Hadoop Distributed File System)には、ユーザーディレクトリ、入力ファイル、および出力ファイルが含まれています。 保存および取得には、MapReduceコマンド put および get を使用します。

「/$ HADOOP_HOME/sbin」でコマンド「start-all.sh」を渡してHadoopフレームワーク(デーモン)を起動した後、次のURLをブラウザー「http://localhost:50070」に渡します。 ブラウザに次の画面が表示されます。

次のスクリーンショットは、ブラウズHDFSをブラウズする方法を示しています。

HDFSモニタリング

次のスクリーンショットは、HDFSのファイル構造を示しています。 「/user/hadoop」ディレクトリ内のファイルが表示されます。

HDFSファイル

次のスクリーンショットは、クラスター内のデータノード情報を示しています。 ここで、1つのノードとその構成と容量を見つけることができます。

データノダ情報

MapReduceジョブ監視

MapReduceアプリケーションは、ジョブ(Mapジョブ、Combiner、Partitioner、およびReduceジョブ)のコレクションです。 以下を監視し、維持することが必須です-

  • アプリケーションが適しているデータノードの構成。
  • アプリケーションごとに使用されるデータノードとリソースの数。

これらすべてを監視するには、ユーザーインターフェイスが必要です。 「/$ HADOOP_HOME/sbin」でコマンド「start-all.sh」を渡してHadoopフレームワークを開始した後、次のURLをブラウザー「http://localhost:8080」に渡します。 ブラウザに次の画面が表示されます。

ジョブ監視

上記のスクリーンショットでは、ハンドポインターはアプリケーションID上にあります。 クリックするだけで、ブラウザで次の画面が表示されます。 以下について説明します-

  • 現在のアプリケーションが実行されているユーザー
  • アプリケーション名
  • そのアプリケーションのタイプ
  • 現在のステータス、最終ステータス
  • 監視時に完了した場合は、アプリケーションの開始時間、経過(完了時間)
  • このアプリケーションの履歴、つまりログ情報
  • 最後に、ノード情報、つまりアプリケーションの実行に参加したノード。

次のスクリーンショットは、特定のアプリケーションの詳細を示しています-

アプリケーションID

次のスクリーンショットは、現在実行中のノード情報を説明しています。 ここで、スクリーンショットには1つのノードのみが含まれています。 ハンドポインターは、実行中のノードのローカルホストアドレスを示します。

すべてのノード