Apache-flink-api-concepts

提供:Dev Guides
移動先:案内検索

Apache Flink-APIの概念

Flinkには、開発者がバッチデータとリアルタイムデータの両方で変換を実行できるAPIの豊富なセットがあります。 さまざまな変換には、マッピング、フィルタリング、並べ替え、結合、グループ化、および集約が含まれます。 Apache Flinkによるこれらの変換は、分散データに対して実行されます。 Apache Flinkが提供するさまざまなAPIについて説明しましょう。

データセットAPI

Apache FlinkのデータセットAPIは、一定期間にわたってデータに対してバッチ操作を実行するために使用されます。 このAPIは、Java、Scala、Pythonで使用できます。 フィルタリング、マッピング、集約、結合、グループ化などのさまざまな種類の変換をデータセットに適用できます。

データセットはローカルファイルなどのソースから作成されるか、特定のソースからファイルを読み取ることで作成され、結果ファイルは分散ファイルやコマンドライン端末などのさまざまなシンクに書き込むことができます。 このAPIは、JavaとScalaプログラミング言語の両方でサポートされています。

ここにDataset APIのWordcountプログラムがあります-

public class WordCountProg {
   public static void main(String[] args) throws Exception {
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet<String> text = env.fromElements(
      "Hello",
      "My Dataset API Flink Program");

      DataSet<Tuple2<String, Integer>> wordCounts = text
      .flatMap(new LineSplitter())
      .groupBy(0)
      .sum(1);

      wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
         for (String word : line.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

DataStream API

このAPIは、連続ストリームでデータを処理するために使用されます。 ストリームデータのフィルタリング、マッピング、ウィンドウ化、集計などのさまざまな操作を実行できます。 このデータストリームには、メッセージキュー、ファイル、ソケットストリームなどのさまざまなソースがあり、コマンドラインターミナルなどのさまざまなシンクに結果データを書き込むことができます。 JavaとScalaプログラミング言語の両方がこのAPIをサポートしています。

DataStream APIのストリーミングワードカウントプログラムを次に示します。ここでは、ワードカウントの連続ストリームがあり、データは2番目のウィンドウにグループ化されます。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCountProg {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream<Tuple2<String, Integer>> dataStream = env
      .socketTextStream("localhost", 9999)
      .flatMap(new Splitter())
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      dataStream.print();
      env.execute("Streaming WordCount Example");
   }
   public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
         for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}