Apache-kafka-basic-operations

提供:Dev Guides
移動先:案内検索

Apache Kafka-基本操作

最初に「単一ノード-単一ブローカー」構成の実装を開始してから、セットアップを単一ノード-複数ブローカー構成に移行します。

これで、Java、ZooKeeper、Kafkaがマシンにインストールされたことを願っています。 Kafka ClusterはZooKeeperを使用するため、Kafka Cluster Setupに移行する前に、まずZooKeeperを起動する必要があります。

ZooKeeperを起動します

新しいターミナルを開き、次のコマンドを入力します-

bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka Brokerを起動するには、次のコマンドを入力します-

bin/kafka-server-start.sh config/server.properties

Kafka Brokerを起動した後、ZooKeeperターミナルでコマンド「 jps」を入力すると、次の応答が表示されます-

821 QuorumPeerMain
928 Kafka
931 Jps

これで、QuorumPeerMainがZooKeeperデーモンで、もう1つがKafkaデーモンであるターミナルで実行されている2つのデーモンを確認できます。

単一ノード-単一ブローカー構成

この構成では、単一のZooKeeperおよびブローカーIDインスタンスがあります。 それを設定する手順は次のとおりです-

  • Kafkaトピックの作成*-Kafkaは、サーバー上にトピックを作成するための「 kafka-topics.sh」という名前のコマンドラインユーティリティを提供します。 新しいターミナルを開き、次の例を入力します。

構文

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka

1つのパーティションと1つのレプリカファクターを持つ「 Hello-Kafka」という名前のトピックを作成しました。 上記の作成された出力は、次の出力に似ています-

出力-作成されたトピック「 Hello-Kafka

トピックが作成されると、Kafkaブローカーターミナルウィンドウで通知を取得し、config/server.propertiesファイルの「/tmp/kafka-logs/」で指定された作成済みトピックのログを取得できます。

トピックス一覧

Kafkaサーバーのトピックのリストを取得するには、次のコマンドを使用できます-

構文

bin/kafka-topics.sh --list --zookeeper localhost:2181

出力

Hello-Kafka

トピックを作成したので、「 Hello-Kafka」のみがリストされます。 複数のトピックを作成する場合、出力でトピック名を取得するとします。

Producerを起動してメッセージを送信する

構文

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

上記の構文から、プロデューサーコマンドラインクライアントには2つの主要なパラメーターが必要です-

*Broker-list* -メッセージの送信先のブローカーのリスト。 この場合、ブローカーは1つしかありません。 Config/server.propertiesファイルにはブローカーポートIDが含まれています。これは、ブローカーがポート9092でリッスンしていることがわかっているため、直接指定できます。

トピック名-これはトピック名の例です。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

プロデューサーは、stdinからの入力を待機し、Kafkaクラスターに公開します。 デフォルトでは、すべての新しい行は新しいメッセージとして公開され、デフォルトのプロデューサーのプロパティは「 config/producer.properties」ファイルで指定されます。 次のように、ターミナルで数行のメッセージを入力できます。

出力

$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

コンシューマーを起動してメッセージを受信する

プロデューサーと同様に、デフォルトのコンシューマープロパティは「 config/consumer.proper-ties」ファイルで指定されます。 新しいターミナルを開き、メッセージを消費するための以下の構文を入力します。

構文

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning

出力

Hello
My first message
My second message

最後に、生産者の端末からメッセージを入力し、消費者の端末に表示されるのを確認できます。 現時点では、単一のブローカーを持つ単一ノードクラスターについて非常によく理解しています。 次に、複数のブローカー構成に進みましょう。

単一ノード-複数ブローカー構成

複数のブローカーのクラスター設定に進む前に、まずZooKeeperサーバーを起動します。

複数のKafkaブローカーを作成-con-fig/server.propertiesにすでに1つのKafkaブローカーインスタンスがあります。 複数のブローカーインスタンスが必要になったため、既存のserver.prop-ertiesファイルを2つの新しい構成ファイルにコピーし、server-one.propertiesおよびserver-two.prop-ertiesに名前を変更します。 次に、両方の新しいファイルを編集し、次の変更を割り当てます-

config/server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config/server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

複数のブローカーを開始-3つのサーバーですべての変更を行った後、3つの新しいターミナルを開いて各ブローカーを1つずつ開始します。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

これで、3つの異なるブローカーがマシン上で実行されました。 ZooKeeperターミナルで jps と入力して、自分ですべてのデーモンをチェックしてみてください。応答が表示されます。

トピックを作成する

3つの異なるブローカーが実行されているため、このトピックのレプリケーション係数値を3として割り当てましょう。 2つのブローカーがある場合、割り当てられたレプリカ値は2になります。

構文

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication

出力

created topic “Multibrokerapplication”

Describe」コマンドは、以下に示すように、現在作成されているトピックをリッスンしているブローカーを確認するために使用されます-

bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation

出力

bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1
ReplicationFactor:3 Configs:

Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1

上記の出力から、最初の行はすべてのパーティションの概要を提供し、トピック名、パーティション数、および既に選択したレプリケーション係数を示していると結論付けることができます。 2行目では、各ノードがパーティションのランダムに選択された部分のリーダーになります。

この場合、最初のブローカー(broker.id 0)がリーダーであることがわかります。 レプリカ:0,2,1は、すべてのブローカーがトピックを最終的に複製することを意味します。最終的に「 Isr」は「` in-sync`」レプリカのセットです。 まあ、これは現在生きており、リーダーに追いついたレプリカのサブセットです。

Producerを起動してメッセージを送信する

この手順は、単一ブローカーのセットアップの場合と同じです。

bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication

出力

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

コンシューマーを起動してメッセージを受信する

この手順は、単一ブローカーのセットアップで示したものと同じままです。

bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning

出力

bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

基本的なトピック操作

この章では、さまざまな基本的なトピック操作について説明します。

トピックの変更

Kafka Clusterでトピックを作成する方法を既に理解しているように。 次のコマンドを使用して、作成したトピックを変更しましょう

構文

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count

We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2

出力

WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

トピックを削除する

トピックを削除するには、次の構文を使用できます。

構文

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

出力

> Topic Hello-kafka marked for deletion

注- delete.topic.enable がtrueに設定されていない場合、影響はありません