Apache-kafka-simple-producer-example
Apache Kafka-シンプルなプロデューサーの例
Javaクライアントを使用してメッセージをパブリッシュおよびコンシュームするためのアプリケーションを作成してみましょう。 Kafkaプロデューサークライアントは、次のAPIで構成されています。
KafkaProducer API
このセクションでKafkaプロデューサーAPIの最も重要なセットを理解しましょう。 KafkaProducer APIの中心部分は「 KafkaProducer
」クラスです。 KafkaProducerクラスは、コンストラクターでKafkaブローカーを次のメソッドに接続するオプションを提供します。
- KafkaProducerクラスは、トピックに非同期でメッセージを送信するためのsendメソッドを提供します。 send()の署名は次のとおりです。
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
- ProducerRecord -プロデューサーは、送信を待機しているレコードのバッファーを管理します。
- Callback -レコードがサーバーによって承認されたときに実行するユーザー指定のコールバック(nullはコールバックがないことを示します)。
- KafkaProducerクラスは、以前に送信されたすべてのメッセージが実際に完了したことを確認するフラッシュメソッドを提供します。 フラッシュメソッドの構文は次のとおりです-
public void flush()
- KafkaProducerクラスは、特定のトピックのパーティションメタデータを取得するのに役立つpartitionForメソッドを提供します。 これは、カスタムパーティションに使用できます。 このメソッドのシグネチャは次のとおりです-
public Map metrics()
プロデューサーによって維持されている内部メトリックのマップを返します。
- public void close()-KafkaProducerクラスは、以前に送信されたすべての要求が完了するまで、closeメソッドブロックを提供します。
プロデューサーAPI
Producer APIの中心部分は「 Producer
」クラスです。 Producerクラスには、次のメソッドによってコンストラクターでKafkaブローカーを接続するオプションがあります。
プロデューサークラス
プロデューサークラスは、次の署名を使用して、単一または複数のトピックにメッセージを送信するためのsendメソッドを提供します。
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
プロデューサーには、 Sync と Async の2つのタイプがあります。
同じAPI設定が「 Sync
」プロデューサーにも適用されます。 それらの違いは、同期プロデューサーはメッセージを直接送信しますが、メッセージをバックグラウンドで送信することです。 より高いスループットが必要な場合は、非同期プロデューサが優先されます。 0.8などの以前のリリースでは、非同期プロデューサーには、エラーハンドラーを登録するためのsend()のコールバックがありません。 これは、0.9の現在のリリースでのみ使用可能です。
public void close()
プロデューサークラスは、すべてのKafkaブローカーへのプロデューサープール接続を閉じる close メソッドを提供します。
構成設定
Producer APIの主要な構成設定は、理解を深めるために次の表にリストされています-
S.No | Configuration Settings and Description |
---|---|
1 |
client.id プロデューサーアプリケーションを識別します |
2 |
producer.type 同期または非同期 |
3 |
acks acks configは、プロデューサーリクエストが完全であると見なされる条件を制御します。 |
4 |
retries プロデューサーのリクエストが失敗した場合、特定の値で自動的に再試行します。 |
5 |
bootstrap.servers ブローカーのブートストラップリスト。 |
6 |
linger.ms リクエストの数を減らしたい場合は、linger.msをある値より大きい値に設定できます。 |
7 |
key.serializer シリアライザーインターフェイスのキー。 |
8 |
value.serializer シリアライザーインターフェイスの値。 |
9 |
batch.size バッファサイズ。 |
10 |
buffer.memory プロデューサーがバフリングに使用できるメモリの総量を制御します。 |
ProducerRecord API
ProducerRecordは、Kafka cluster.ProducerRecordクラスコンストラクターに送信されるキー/値のペアで、次の署名を使用してパーティション、キー、および値のペアを含むレコードを作成します。
public ProducerRecord (string topic, int partition, k key, v value)
- トピック-レコードに追加されるユーザー定義のトピック名。
- パーティション-パーティション数
- Key -レコードに含まれるキー。
- 値-記録内容
public ProducerRecord (string topic, k key, v value)
ProducerRecordクラスコンストラクターは、キー、値のペア、およびパーティションなしでレコードを作成するために使用されます。
- トピック-レコードを割り当てるトピックを作成します。
- Key -レコードのキー。
- 値-内容を記録します。
public ProducerRecord (string topic, v value)
ProducerRecordクラスは、パーティションとキーなしでレコードを作成します。
- トピック-トピックを作成します。
- 値-内容を記録します。
ProducerRecordクラスのメソッドは、次の表に記載されています-
S.No | Class Methods and Description |
---|---|
1 |
public string topic() トピックがレコードに追加されます。 |
2 |
public K key() レコードに含まれるキー。 そのようなキーがない場合、ここでnullが返されます。 |
3 |
public V value() 記録内容。 |
4 |
partition() レコードのパーティション数 |
SimpleProducerアプリケーション
アプリケーションを作成する前に、まずZooKeeperとKafkaブローカーを起動してから、create topicコマンドを使用してKafkaブローカーに独自のトピックを作成します。 その後、「 Sim-pleProducer.java
」という名前のJavaクラスを作成し、次のコーディングを入力します。
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
//Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
//create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
コンパイル-アプリケーションは、次のコマンドを使用してコンパイルできます。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
実行-アプリケーションは、次のコマンドを使用して実行できます。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
出力
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
単純な消費者の例
現在のところ、Kafkaクラスターにメッセージを送信するプロデューサーを作成しています。 次に、Kafkaクラスターからのメッセージを消費するコンシューマーを作成します。 KafkaConsumer APIは、Kafkaクラスターからのメッセージを消費するために使用されます。 KafkaConsumerクラスのコンストラクターを以下に定義します。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
*configs* -コンシューマー構成のマップを返します。
KafkaConsumerクラスには、以下の表にリストされている以下の重要なメソッドがあります。
S.No | Method and Description |
---|---|
1 |
public java.util.Set<TopicPar-tition> assignment() 消費者によって現在割り当てられているパーティションのセットを取得します。 |
2 |
public string subscription() 指定されたトピックのリストをサブスクライブして、割り当てられたパーティションを動的に取得します。 |
3 |
public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) 指定されたトピックのリストをサブスクライブして、割り当てられたパーティションを動的に取得します。 |
4 |
public void unsubscribe() 指定されたパーティションのリストからトピックをサブスクライブ解除します。 |
5 |
public void sub-scribe(java.util.List<java.lang.String> topics) 指定されたトピックのリストをサブスクライブして、割り当てられたパーティションを動的に取得します。 指定されたトピックのリストが空の場合、unsubscribe()と同じように扱われます。 |
6 |
public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 引数パターンは、正規表現の形式でサブスクライブパターンを参照し、リスナー引数はサブスクライブパターンから通知を取得します。 |
7 |
public void as-sign(java.util.List<TopicParti-tion> partitions) パーティションのリストを顧客に手動で割り当てます。 |
8 |
poll() サブスクライブ/割り当てAPIのいずれかを使用して指定されたトピックまたはパーティションのデータを取得します。 データのポーリング前にトピックがサブスクライブされていない場合、これはエラーを返します。 |
9 |
public void commitSync() トピックおよびパーティションのすべてのサブスクライブ済みリストの最後のpoll()で返されたコミットオフセット。 同じ操作がcommitAsyn()に適用されます。 |
10 |
public void seek(TopicPartition partition, long offset) コンシューマが次のpoll()メソッドで使用する現在のオフセット値を取得します。 |
11 |
public void resume() 一時停止したパーティションを再開します。 |
12 |
public void wakeup() 消費者を起こします。 |
ConsumerRecord API
ConsumerRecord APIは、Kafkaクラスターからレコードを受信するために使用されます。 このAPIは、レコードの受信元であるトピック名、パーティション番号、およびKafkaパーティション内のレコードを指すオフセットで構成されます。 ConsumerRecordクラスは、特定のトピック名、パーティションカウント、および<key、value>のペアを持つコンシューマレコードを作成するために使用されます。 次のシグネチャがあります。
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
- トピック-Kafkaクラスターから受信したコンシューマーレコードのトピック名。
- Partition -トピックのパーティション。
- Key -キーが存在しない場合はレコードのキーが返されますnull。
- 値-内容を記録します。
ConsumerRecords API
ConsumerRecords APIは、ConsumerRecordのコンテナーとして機能します。 このAPIは、特定のトピックのパーティションごとにConsumerRecordのリストを保持するために使用されます。 そのコンストラクタは以下で定義されます。
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
- TopicPartition -特定のトピックのパーティションのマップを返します。
- レコード-ConsumerRecordのリストを返します。
ConsumerRecordsクラスには、次のメソッドが定義されています。
S.No | Methods and Description |
---|---|
1 |
public int count() すべてのトピックのレコード数。 |
2 |
public Set partitions() このレコードセットにデータがあるパーティションのセット(データが返されなかった場合、セットは空です)。 |
3 |
public Iterator iterator() イテレータを使用すると、コレクションを循環して、要素を取得または削除できます。 |
4 |
public List records() 指定されたパーティションのレコードのリストを取得します。 |
構成設定
コンシューマクライアントAPIのメイン構成設定の構成設定は以下のとおりです-
S.No | Settings and Description |
---|---|
1 |
bootstrap.servers ブローカーのブートストラップリスト。 |
2 |
group.id 個々の消費者をグループに割り当てます。 |
3 |
enable.auto.commit 値がtrueの場合はオフセットの自動コミットを有効にし、そうでない場合はコミットしません。 |
4 |
auto.commit.interval.ms 更新された消費オフセットがZooKeeperに書き込まれる頻度を返します。 |
5 |
session.timeout.ms KafkaがZooKeeperがリクエストに応答する(読み取りまたは書き込み)のを待ってからメッセージを消費し続けるまでのミリ秒数を示します。 |
SimpleConsumerアプリケーション
プロデューサーアプリケーションの手順はここでも同じです。 まず、ZooKeeperとKafkaブローカーを起動します。 次に、「 SimpleCon-sumer.java
」という名前のjavaクラスを使用して「` SimpleConsumer`」アプリケーションを作成し、次のコードを入力します。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
//print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
コンパイル-アプリケーションは、次のコマンドを使用してコンパイルできます。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
- 実行-*次のコマンドを使用してアプリケーションを実行できます
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
入力-プロデューサーCLIを開き、トピックにメッセージを送信します。 smple入力を「Hello Consumer」として入力できます。
出力-以下が出力になります。
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer