RabbitMQおよびPukaPythonライブラリを使用してルーティングキーに基づいてメッセージを配信する方法
入門
前提条件
このテキストは、 RabbitMQとPythonのPukaを使用して複数のコンシューマーにメッセージを配信する方法の続きであり、同じソフトウェアバンドルを正しく実行する必要があります。 また、同じ定義が記事全体で使用されており、読者は前のテキストの主題に精通していることを前提としています。
取引所
fanout
交換についてはすでに説明しました。これは、追加のルールを設定せずに、その交換にバインドされたすべてのキューにメッセージを配信します。 これは非常に便利なメカニズムですが、柔軟性に欠けています。 多くの場合、プロデューサーが取引所に発行するすべてのものを受け取ることは望ましくありません。 RabbitMQ は、より複雑なシナリオを実装するために使用できる2つの異なる交換タイプを提供します。 これらの1つは、direct
交換です。
直接交換
序章
直接交換は、RabbitMQでシンプルなキーベースのルーティングメカニズムを提供します。 これは、最初の例で使用された名前のない交換にいくぶん似ています。この例では、メッセージのルーティングキーに等しい名前のキューにメッセージが配信されました。 ただし、名前のない交換では、明示的なキューバインディングを定義する必要はありませんでしたが、直接交換では、バインディングは重要で必須です。
直接交換を使用する場合、その交換に対して生成される各メッセージには、任意の名前文字列であるルーティングキーを指定する必要があります。 テキサス。 次に、メッセージは、同じルーティングキーを使用してこの交換にバインドされているすべてのキューに配信されます( Texas ルーティングキーを使用してメッセージに関心があると明示的に宣言されたすべてのキュー)。
基本的な名前のない交換とdirect
交換の最大の違いは、後者にはバインディングが必要であり、その前にその交換でメッセージをリッスンするキューがないことです。 その結果、3つの大きな利点が得られます。
- 1つのキューをバインドして、同じ交換で多くの異なるルーティングキーをリッスンできます
- 1つのキューをバインドして、多くのの異なる交換を一度にリッスンできます
- 多くのキューをバインドして、交換で同じルーティングキーをリッスンできます
大都市のハブを想像してみましょう。鉄道とバスの駅が1つになっていて、両方の交通手段で多くの目的地にアクセスできます。 そして、ステーションがRabbitMQを使用して出発通知をディスパッチしたいとします。 タスクは、シアトル、トゥーイル、またはボストン行きのバスまたは電車がまもなく出発することを関心のあるすべての人に知らせることです。
このようなプログラムは、関心のあるすべての顧客がキューをサブスクライブできる直接のdepartures
交換を定義します。 次に、出発時刻を含むメッセージが、宛先を含むルーティングキーを使用してその交換に生成されます。 例えば:
- ルーティングキー
Tooele
および本体2014-01-03 15:23
とのdepartures
交換へのメッセージ - ルーティングキー
Boston
および本体2014-01-03 15:41
とのdepartures
交換へのメッセージ - ルーティングキー
Seattle
および本体2014-01-03 15:55
とのdepartures
交換へのメッセージ
1つのキューが一度に多くのルーティングキーにバインドされる可能性があり、多くのキューが同じキーにバインドされる可能性があるため、次のように簡単に設定できます。
- Tooeleのみに関心のある1人の顧客
- ボストンのみに関心のある1人の顧客
- TooeleとBostonに同時に興味を持っている別の顧客
すべてが同時に情報を待っています。 彼らは私たちの直接交換を使用して適切なメッセージを受信します。
プロデューサー
この例のタスクを少し単純化するために、1つのコマンドラインパラメーターを受け入れる基本的な通知ディスパッチャーを作成しましょう。 宛先を指定し、アプリケーションは関心のあるすべての消費者に現在の時刻を送信します。
direct_notify.py
という名前のサンプルPythonスクリプトを作成します
vim direct_notify.py
スクリプトの内容を貼り付けます。
import puka import datetime import time import sys # declare and connect a producer producer = puka.Client("amqp://localhost/") connect_promise = producer.connect() producer.wait(connect_promise) # create a direct exchange named departures exchange_promise = producer.exchange_declare(exchange='departures', type='direct') producer.wait(exchange_promise) # send current time to destination specified with command line argument message = "%s" % datetime.datetime.now() message_promise = producer.basic_publish(exchange='departures', routing_key=sys.argv[1], body=message) producer.wait(message_promise) print "Departure to %s at %s" % (sys.argv[1], message) producer.close()
:wq を押してファイルを保存し、終了します。
1つのパラメーターを使用してスクリプトを実行すると、現在の時刻と使用されている宛先が出力されます。 出力は次のようになります。
root@rabbitmq:~# python direct_notify.py Tooele Departure to Tooele at 2014-02-18 15:57:29.035000 root@rabbitmq:~#
スクリプトを段階的に見ていきましょう。
- Producer クライアントが作成され、ローカルのRabbitMQインスタンスに接続されます。 これからは、RabbitMQと自由に通信できるようになります。
- 名前付きの
departures
直接交換が作成されます。 その取引所に公開されたメッセージには異なるキーを割り当てることができるため、作成時にルーティングキーを指定する必要はありません。 そのステップの後、交換はRabbitMQサーバー上に存在し、キューをサーバーにバインドしてメッセージを送信するために使用できます。 - コマンドラインパラメータをルーティングキーとして使用して、現在の時刻を含むメッセージがその取引所に公開されます。 サンプル実行では、Tooeleがパラメータとして使用されているため、出発地(ルーティングキー)として使用されています。
注:簡単にするために、スクリプトは必須のコマンドライン引数が指定されているかどうかをチェックしません。 パラメータなしで実行すると正しく動作しません。
消費者
この例の消費者向けアプリケーションは、駅から到達可能な1つ以上の目的地に関心のある公共交通機関の顧客として機能します。
direct_watch.py
という名前のサンプルPythonスクリプトを作成します
vim direct_watch.py
スクリプトの内容を貼り付けます。
import puka import sys # declare and connect a consumer consumer = puka.Client("amqp://localhost/") connect_promise = consumer.connect() consumer.wait(connect_promise) # create temporary queue queue_promise = consumer.queue_declare(exclusive=True) queue = consumer.wait(queue_promise)['queue'] # bind the queue to all routing keys specified by command line arguments for destination in sys.argv[1:]: print "Watching departure times for %s" % destination bind_promise = consumer.queue_bind(exchange='departures', queue=queue, routing_key=destination) consumer.wait(bind_promise) # start waiting for messages on the queue created beforehand and print them out message_promise = consumer.basic_consume(queue=queue, no_ack=True) while True: message = consumer.wait(message_promise) print "Departure for %s at %s" % (message['routing_key'], message['body']) consumer.close()
:wq を押してファイルを保存し、終了します。
1つのパラメータTooeleを使用してスクリプトを実行すると、スクリプトが Tooele の出発時刻を監視していることを通知する必要がありますが、複数のパラメータを使用してスクリプトを実行すると、多くの宛先の出発時刻を監視していることが通知されます。
root@rabbitmq:~# python direct_watch.py Tooele Watching departure times for Tooele (...) root@rabbitmq:~# python direct_watch.py Tooele Boston Watching departure times for Tooele Watching departure times for Boston (...) root@rabbitmq:~#
スクリプトの機能を説明するために、スクリプトを段階的に見ていきましょう。
- Consumer クライアントが作成され、ローカルのRabbitMQインスタンスに接続されます。 これからは、RabbitMQと自由に通信できるようになります。
- この特定のコンシューマーの一時キューが作成され、RabbitMQによって自動生成された名前が付けられます。 スクリプトが終了すると、キューは破棄されます。
- キューは、コマンドラインパラメータを使用して指定されたすべてのルーティングキー(宛先)のすべての
departures
交換にバインドされ、各宛先の情報を画面に出力します。 - スクリプトは、キューでメッセージの待機を開始します。 バインドされたルーティングキーに一致するすべてのメッセージを受信する必要があります。 Tooele を単一のパラメーターとして実行する場合(TooeleとBostonの両方を使用する場合のみ)-両方で。 各出発時刻は画面に印刷されます。
テスト
両方のスクリプトが期待どおりに機能するかどうかを確認するには、サーバーに対して3つのターミナルウィンドウを開きます。 1つは、通知を送信するための公共交通機関として使用されます。 別の2つは、出発を待つ顧客として機能します。
最初のターミナルで、direct_notify.py
スクリプトを任意のパラメーターで1回実行します。
root@rabbitmq:~# python direct_notify.py Tooele Departure to Tooele at 2014-02-18 15:57:29.035000 root@rabbitmq:~#
重要: direct_notify.py
スクリプトは、キューをバインドする前に交換を作成する必要があるため、コンシューマーの前に少なくとも1回実行する必要があります。 実行後、交換はRabbitMQサーバーに残り、自由に使用できます。
2番目の端末で、direct_watch.py
スクリプトを1つのパラメーター( Tooele )を指定して実行します。
root@rabbitmq:~# python direct_watch.py Tooele Watching departure times for Tooele (...) root@rabbitmq:~#
3番目の端末で、TooeleとBostonの2つのパラメーターを使用してdirect_watch.py
スクリプトを実行します。
root@rabbitmq:~# python direct_watch.py Tooele Boston Watching departure times for Tooele Watching departure times for Boston (...) root@rabbitmq:~#
次に、最初のターミナルに戻り、3つの出発通知を送信します。 1つはTooele、1つは Boston 、もう1つはChicagoです。
root@rabbitmq:~# python direct_notify.py Tooele Departure to Tooele at 2014-02-18 15:57:29.035000 root@rabbitmq:~# python direct_notify.py Boston Departure to Tooele at 2014-02-18 15:57:31.035000 root@rabbitmq:~# python direct_notify.py Chicago Departure to Tooele at 2014-02-18 15:57:35.035000 root@rabbitmq:~#
最初の通知は、トゥーイルへの出発を待っている両方の消費者のみが受信する必要があります。 2つ目は、ボストンへの出発を待っている消費者だけに届くようにする必要があります。 3つ目は、シカゴへの出発を待つ消費者がいないため、これらの消費者が受け取るべきではありません。
これは予想される動作です。 これらの簡単な例は、ルーティングキーで指定された特定のコンシューマーのみが受信するメッセージをディスパッチする方法を示しています。
参考文献
直接ルーティングでは、メッセージの配信先を完全に制御することはできませんが、以前の交換で使用されていたfanout
交換から、どこにでも盲目的にメッセージを配信するという大きな一歩です。 direct
交換を使用すると、多くの実際のメッセージングシナリオを提供でき、プロセスはそれほど難しくありません。
このテキストの主な目的は、単純な現実世界の状況を使用した基本的な直接ルーティングを紹介することでした。 他の多くの使用法については、公式のRabbitMQドキュメントで詳しく説明されています。これはRabbitMQユーザーと管理者にとって優れたリソースです。