Hcatalog-input-output-format

提供:Dev Guides
移動先:案内検索

HCatalog-入力出力フォーマット

*HCatInputFormat* および *HCatOutputFormat* インターフェースは、HDFSからデータを読み取り、処理後、MapReduceジョブを使用して結果のデータをHDFSに書き込むために使用されます。 入力および出力形式のインターフェイスについて詳しく説明します。

HCatInputFormat

*HCatInputFormat* はMapReduceジョブで使用され、HCatalog管理のテーブルからデータを読み取ります。 HCatInputFormatは、テーブルにパブリッシュされたかのようにデータを読み取るためのHadoop 0.20 MapReduce APIを公開します。
Sr.No. Method Name & Description
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException

ジョブに使用する入力を設定します。 指定された入力仕様でメタストアを照会し、MapReduceタスクのジョブ構成に一致するパーティションをシリアル化します。

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

ジョブに使用する入力を設定します。 指定された入力仕様でメタストアを照会し、MapReduceタスクのジョブ構成に一致するパーティションをシリアル化します。

3

public HCatInputFormat setFilter(String filter)throws IOException

入力テーブルにフィルターを設定します。

4

public HCatInputFormat setProperties(Properties properties) throws IOException

入力形式のプロパティを設定します。

*HCatInputFormat* APIには次のメソッドが含まれています-
  • setInput

  • setOutputSchema

  • getTableSchema

    *HCatInputFormat* を使用してデータを読み取るには、まず、読み取られるテーブルの必要な情報を使用して *InputJobInfo* をインスタンス化し、次に *InputJobInfo* を使用して *setInput* を呼び出します。
    *setOutputSchema* メソッドを使用して *projection schema* を含め、出力フィールドを指定できます。 スキーマが指定されていない場合、テーブル内のすべての列が返されます。 getTableSchemaメソッドを使用して、指定した入力テーブルのテーブルスキーマを決定できます。

HCatOutputFormat

HCatOutputFormatはMapReduceジョブで使用され、HCatalog管理のテーブルにデータを書き込みます。 HCatOutputFormatは、テーブルにデータを書き込むためのHadoop 0.20 MapReduce APIを公開します。 MapReduceジョブがHCatOutputFormatを使用して出力を書き込む場合、テーブルに設定されたデフォルトのOutputFormatが使用され、ジョブの完了後に新しいパーティションがテーブルに公開されます。

Sr.No. Method Name & Description
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

ジョブについて書き込む出力に関する情報を設定します。 メタデータサーバーに照会して、テーブルに使用するStorageHandlerを見つけます。 パーティションが既に公開されている場合、エラーがスローされます。

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

パーティションに書き出されるデータのスキーマを設定します。 テーブルスキーマは、これが呼び出されない場合、デフォルトでパーティションに使用されます。

3

public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

ジョブのレコードライターを取得します。 StorageHandlerのデフォルトのOutputFormatを使用して、レコードライターを取得します。

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

この出力形式の出力コミッターを取得します。 出力が正しくコミットされるようにします。

*HCatOutputFormat* APIには次のメソッドが含まれています-
  • setOutput
  • setSchema
  • getTableSchema

HCatOutputFormatの最初の呼び出しは setOutput でなければなりません。他の呼び出しは、出力形式が初期化されていないことを示す例外をスローします。

書き出されるデータのスキーマは、 setSchema メソッドによって指定されます。 書き込むデータのスキーマを提供して、このメソッドを呼び出す必要があります。 データのスキーマがテーブルスキーマと同じ場合、* HCatOutputFormat.getTableSchema()を使用してテーブルスキーマを取得し、それを setSchema()*に渡すことができます。

次のMapReduceプログラムは、2番目の列(「列1」)に整数があると見なされる1つのテーブルからデータを読み取り、検出した各個別値のインスタンス数をカウントします。 つまり、 "* select col1、count(*)from $ table group by col1; *"と同等です。

たとえば、2番目の列の値が\ {1、1、1、3、3、5}である場合、プログラムは値とカウントの次の出力を生成します-

1, 3
3, 2
5, 1

私たちは今プログラムコードを見てみましょう-

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable,
      HCatRecord, IntWritable, IntWritable> {
      int age;

      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }

   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();

         while (iter.hasNext()) {
            sum++;
            iter.next();
         }

         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }

   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();

      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System

      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

     //initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);

      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);

      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }

   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

上記のプログラムをコンパイルする前に、いくつかの jar をダウンロードし、それらをこのアプリケーションの classpath に追加する必要があります。 すべてのHive jarおよびHCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、 libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。

次のコマンドを使用して、これらの jar ファイルを local から HDFS にコピーし、 classpath に追加します。

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar/tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar/tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar/tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar/tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar/tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar/tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar/tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

次のコマンドを使用して、指定されたプログラムをコンパイルおよび実行します。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

次に、出力(part_0000、part_0001)の出力ディレクトリ(hdfs:user/tmp/hive)を確認します。