Apache-kafka-basic-operations
Apache Kafka-基本操作
最初に「単一ノード-単一ブローカー」構成の実装を開始してから、セットアップを単一ノード-複数ブローカー構成に移行します。
これで、Java、ZooKeeper、Kafkaがマシンにインストールされたことを願っています。 Kafka ClusterはZooKeeperを使用するため、Kafka Cluster Setupに移行する前に、まずZooKeeperを起動する必要があります。
ZooKeeperを起動します
新しいターミナルを開き、次のコマンドを入力します-
Kafka Brokerを起動するには、次のコマンドを入力します-
Kafka Brokerを起動した後、ZooKeeperターミナルでコマンド「 jps
」を入力すると、次の応答が表示されます-
これで、QuorumPeerMainがZooKeeperデーモンで、もう1つがKafkaデーモンであるターミナルで実行されている2つのデーモンを確認できます。
単一ノード-単一ブローカー構成
この構成では、単一のZooKeeperおよびブローカーIDインスタンスがあります。 それを設定する手順は次のとおりです-
- Kafkaトピックの作成*-Kafkaは、サーバー上にトピックを作成するための「
kafka-topics.sh
」という名前のコマンドラインユーティリティを提供します。 新しいターミナルを開き、次の例を入力します。
構文
例
1つのパーティションと1つのレプリカファクターを持つ「 Hello-Kafka
」という名前のトピックを作成しました。 上記の作成された出力は、次の出力に似ています-
出力-作成されたトピック「 Hello-Kafka
」
トピックが作成されると、Kafkaブローカーターミナルウィンドウで通知を取得し、config/server.propertiesファイルの「/tmp/kafka-logs/」で指定された作成済みトピックのログを取得できます。
トピックス一覧
Kafkaサーバーのトピックのリストを取得するには、次のコマンドを使用できます-
構文
出力
トピックを作成したので、「 Hello-Kafka
」のみがリストされます。 複数のトピックを作成する場合、出力でトピック名を取得するとします。
Producerを起動してメッセージを送信する
構文
上記の構文から、プロデューサーコマンドラインクライアントには2つの主要なパラメーターが必要です-
トピック名-これはトピック名の例です。
例
プロデューサーは、stdinからの入力を待機し、Kafkaクラスターに公開します。 デフォルトでは、すべての新しい行は新しいメッセージとして公開され、デフォルトのプロデューサーのプロパティは「 config/producer.properties
」ファイルで指定されます。 次のように、ターミナルで数行のメッセージを入力できます。
出力
コンシューマーを起動してメッセージを受信する
プロデューサーと同様に、デフォルトのコンシューマープロパティは「 config/consumer.proper-ties
」ファイルで指定されます。 新しいターミナルを開き、メッセージを消費するための以下の構文を入力します。
構文
例
出力
最後に、生産者の端末からメッセージを入力し、消費者の端末に表示されるのを確認できます。 現時点では、単一のブローカーを持つ単一ノードクラスターについて非常によく理解しています。 次に、複数のブローカー構成に進みましょう。
単一ノード-複数ブローカー構成
複数のブローカーのクラスター設定に進む前に、まずZooKeeperサーバーを起動します。
複数のKafkaブローカーを作成-con-fig/server.propertiesにすでに1つのKafkaブローカーインスタンスがあります。 複数のブローカーインスタンスが必要になったため、既存のserver.prop-ertiesファイルを2つの新しい構成ファイルにコピーし、server-one.propertiesおよびserver-two.prop-ertiesに名前を変更します。 次に、両方の新しいファイルを編集し、次の変更を割り当てます-
config/server-one.properties
config/server-two.properties
複数のブローカーを開始-3つのサーバーですべての変更を行った後、3つの新しいターミナルを開いて各ブローカーを1つずつ開始します。
これで、3つの異なるブローカーがマシン上で実行されました。 ZooKeeperターミナルで jps と入力して、自分ですべてのデーモンをチェックしてみてください。応答が表示されます。
トピックを作成する
3つの異なるブローカーが実行されているため、このトピックのレプリケーション係数値を3として割り当てましょう。 2つのブローカーがある場合、割り当てられたレプリカ値は2になります。
構文
例
出力
「 Describe
」コマンドは、以下に示すように、現在作成されているトピックをリッスンしているブローカーを確認するために使用されます-
出力
上記の出力から、最初の行はすべてのパーティションの概要を提供し、トピック名、パーティション数、および既に選択したレプリケーション係数を示していると結論付けることができます。 2行目では、各ノードがパーティションのランダムに選択された部分のリーダーになります。
この場合、最初のブローカー(broker.id 0)がリーダーであることがわかります。 レプリカ:0,2,1は、すべてのブローカーがトピックを最終的に複製することを意味します。最終的に「 Isr
」は「` in-sync`」レプリカのセットです。 まあ、これは現在生きており、リーダーに追いついたレプリカのサブセットです。
Producerを起動してメッセージを送信する
この手順は、単一ブローカーのセットアップの場合と同じです。
例
出力
コンシューマーを起動してメッセージを受信する
この手順は、単一ブローカーのセットアップで示したものと同じままです。
例
出力
基本的なトピック操作
この章では、さまざまな基本的なトピック操作について説明します。
トピックの変更
Kafka Clusterでトピックを作成する方法を既に理解しているように。 次のコマンドを使用して、作成したトピックを変更しましょう
構文
例
出力
トピックを削除する
トピックを削除するには、次の構文を使用できます。
構文
例
出力
注- delete.topic.enable がtrueに設定されていない場合、影響はありません