Apache-kafka-basic-operations
Apache Kafka-基本操作
最初に「単一ノード-単一ブローカー」構成の実装を開始してから、セットアップを単一ノード-複数ブローカー構成に移行します。
これで、Java、ZooKeeper、Kafkaがマシンにインストールされたことを願っています。 Kafka ClusterはZooKeeperを使用するため、Kafka Cluster Setupに移行する前に、まずZooKeeperを起動する必要があります。
ZooKeeperを起動します
新しいターミナルを開き、次のコマンドを入力します-
bin/zookeeper-server-start.sh config/zookeeper.properties
Kafka Brokerを起動するには、次のコマンドを入力します-
bin/kafka-server-start.sh config/server.properties
Kafka Brokerを起動した後、ZooKeeperターミナルでコマンド「 jps
」を入力すると、次の応答が表示されます-
821 QuorumPeerMain
928 Kafka
931 Jps
これで、QuorumPeerMainがZooKeeperデーモンで、もう1つがKafkaデーモンであるターミナルで実行されている2つのデーモンを確認できます。
単一ノード-単一ブローカー構成
この構成では、単一のZooKeeperおよびブローカーIDインスタンスがあります。 それを設定する手順は次のとおりです-
- Kafkaトピックの作成*-Kafkaは、サーバー上にトピックを作成するための「
kafka-topics.sh
」という名前のコマンドラインユーティリティを提供します。 新しいターミナルを開き、次の例を入力します。
構文
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
1つのパーティションと1つのレプリカファクターを持つ「 Hello-Kafka
」という名前のトピックを作成しました。 上記の作成された出力は、次の出力に似ています-
出力-作成されたトピック「 Hello-Kafka
」
トピックが作成されると、Kafkaブローカーターミナルウィンドウで通知を取得し、config/server.propertiesファイルの「/tmp/kafka-logs/」で指定された作成済みトピックのログを取得できます。
トピックス一覧
Kafkaサーバーのトピックのリストを取得するには、次のコマンドを使用できます-
構文
bin/kafka-topics.sh --list --zookeeper localhost:2181
出力
Hello-Kafka
トピックを作成したので、「 Hello-Kafka
」のみがリストされます。 複数のトピックを作成する場合、出力でトピック名を取得するとします。
Producerを起動してメッセージを送信する
構文
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
上記の構文から、プロデューサーコマンドラインクライアントには2つの主要なパラメーターが必要です-
*Broker-list* -メッセージの送信先のブローカーのリスト。 この場合、ブローカーは1つしかありません。 Config/server.propertiesファイルにはブローカーポートIDが含まれています。これは、ブローカーがポート9092でリッスンしていることがわかっているため、直接指定できます。
トピック名-これはトピック名の例です。
例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
プロデューサーは、stdinからの入力を待機し、Kafkaクラスターに公開します。 デフォルトでは、すべての新しい行は新しいメッセージとして公開され、デフォルトのプロデューサーのプロパティは「 config/producer.properties
」ファイルで指定されます。 次のように、ターミナルで数行のメッセージを入力できます。
出力
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
コンシューマーを起動してメッセージを受信する
プロデューサーと同様に、デフォルトのコンシューマープロパティは「 config/consumer.proper-ties
」ファイルで指定されます。 新しいターミナルを開き、メッセージを消費するための以下の構文を入力します。
構文
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
出力
Hello
My first message
My second message
最後に、生産者の端末からメッセージを入力し、消費者の端末に表示されるのを確認できます。 現時点では、単一のブローカーを持つ単一ノードクラスターについて非常によく理解しています。 次に、複数のブローカー構成に進みましょう。
単一ノード-複数ブローカー構成
複数のブローカーのクラスター設定に進む前に、まずZooKeeperサーバーを起動します。
複数のKafkaブローカーを作成-con-fig/server.propertiesにすでに1つのKafkaブローカーインスタンスがあります。 複数のブローカーインスタンスが必要になったため、既存のserver.prop-ertiesファイルを2つの新しい構成ファイルにコピーし、server-one.propertiesおよびserver-two.prop-ertiesに名前を変更します。 次に、両方の新しいファイルを編集し、次の変更を割り当てます-
config/server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config/server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
複数のブローカーを開始-3つのサーバーですべての変更を行った後、3つの新しいターミナルを開いて各ブローカーを1つずつ開始します。
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
これで、3つの異なるブローカーがマシン上で実行されました。 ZooKeeperターミナルで jps と入力して、自分ですべてのデーモンをチェックしてみてください。応答が表示されます。
トピックを作成する
3つの異なるブローカーが実行されているため、このトピックのレプリケーション係数値を3として割り当てましょう。 2つのブローカーがある場合、割り当てられたレプリカ値は2になります。
構文
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
出力
created topic “Multibrokerapplication”
「 Describe
」コマンドは、以下に示すように、現在作成されているトピックをリッスンしているブローカーを確認するために使用されます-
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
出力
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
上記の出力から、最初の行はすべてのパーティションの概要を提供し、トピック名、パーティション数、および既に選択したレプリケーション係数を示していると結論付けることができます。 2行目では、各ノードがパーティションのランダムに選択された部分のリーダーになります。
この場合、最初のブローカー(broker.id 0)がリーダーであることがわかります。 レプリカ:0,2,1は、すべてのブローカーがトピックを最終的に複製することを意味します。最終的に「 Isr
」は「` in-sync`」レプリカのセットです。 まあ、これは現在生きており、リーダーに追いついたレプリカのサブセットです。
Producerを起動してメッセージを送信する
この手順は、単一ブローカーのセットアップの場合と同じです。
例
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
出力
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
コンシューマーを起動してメッセージを受信する
この手順は、単一ブローカーのセットアップで示したものと同じままです。
例
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
出力
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
基本的なトピック操作
この章では、さまざまな基本的なトピック操作について説明します。
トピックの変更
Kafka Clusterでトピックを作成する方法を既に理解しているように。 次のコマンドを使用して、作成したトピックを変更しましょう
構文
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
例
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
出力
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
トピックを削除する
トピックを削除するには、次の構文を使用できます。
構文
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
例
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
出力
> Topic Hello-kafka marked for deletion
注- delete.topic.enable がtrueに設定されていない場合、影響はありません