ルーティングタスク—Pythonドキュメント
ルーティングタスク
ノート
トピックやファンアウトなどの代替ルーティングの概念は、すべてのトランスポートで使用できるわけではありません。トランスポート比較表を参照してください。
基本
自動ルーティング
ルーティングを行う最も簡単な方法は、:setting: `task_create_missing_queues` 設定(デフォルトでオン)を使用することです。
この設定をオンにすると、:setting: `task_queues` でまだ定義されていない名前付きキューが自動的に作成されます。 これにより、単純なルーティングタスクを簡単に実行できます。
通常のタスクを処理する x と y の2つのサーバーと、フィード関連のタスクのみを処理する1つのサーバー z があるとします。 次の構成を使用できます。
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}
このルートを有効にすると、フィードのインポートタスクは「フィード」キューにルーティングされ、他のすべてのタスクはデフォルトのキュー(歴史的な理由から「セロリ」という名前)にルーティングされます。
または、globパターンマッチング、または正規表現を使用して、feed.tasks
名前空間内のすべてのタスクを照合することもできます。
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}
一致するパターンの順序が重要な場合は、代わりに items 形式でルーターを指定する必要があります。
task_routes = ([
('feed.tasks.*', {'queue': 'feeds'}),
('web.tasks.*', {'queue': 'web'}),
(re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)
ノート
:setting: `task_routes` 設定は、ディクショナリまたはルーターオブジェクトのリストのいずれかである可能性があるため、この場合、リストを含むタプルとして設定を指定する必要があります。
ルーターをインストールした後、サーバー z を起動して、次のようにフィードキューのみを処理できます。
user@z:/$ celery -A proj worker -Q feeds
必要な数のキューを指定できるため、このサーバーでデフォルトのキューを処理することもできます。
user@z:/$ celery -A proj worker -Q feeds,celery
デフォルトのキューの名前を変更する
次の構成を使用して、デフォルトのキューの名前を変更できます。
app.conf.task_default_queue = 'default'
キューの定義方法
この機能のポイントは、基本的なニーズのみを持つユーザーに対して複雑なAMQPプロトコルを非表示にすることです。 ただし、これらのキューがどのように宣言されているかに関心があるかもしれません。
「video」という名前のキューが次の設定で作成されます。
{'exchange': 'video',
'exchange_type': 'direct',
'routing_key': 'video'}
Redis や SQS などの非AMQPバックエンドは交換をサポートしていないため、交換にはキューと同じ名前を付ける必要があります。 この設計を使用すると、それらの設計でも機能することが保証されます。
手動ルーティング
通常のタスクを処理する x と y の2つのサーバーと、フィード関連のタスクのみを処理する1つのサーバー z がある場合、この構成を使用できます。 :
from kombu import Queue
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'
:setting: `task_queues` は、Queue
インスタンスのリストです。 キーの交換または交換タイプの値を設定しない場合、これらは:setting: `task_default_exchange` および:setting:` task_default_exchange_type` 設定から取得されます。
タスクを feed_tasks キューにルーティングするには、:setting: `task_routes` 設定にエントリを追加します。
task_routes = {
'feeds.tasks.import_feed': {
'queue': 'feed_tasks',
'routing_key': 'feed.import',
},
}
routing_key 引数をTask.apply_async()
またはsend_task()
に使用して、これをオーバーライドすることもできます。
>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
... queue='feed_tasks',
... routing_key='feed.import')
サーバー z をフィードキューから排他的に消費させるには、celery worker -Q
オプションを使用してサーバーを起動できます。
user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h
サーバー x および y は、デフォルトのキューから消費するように構成する必要があります。
user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h
必要に応じて、フィード処理ワーカーに通常のタスクを処理させることもできます。おそらく、やるべきことがたくさんあるときです。
user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h
別のキューがあるが、追加したい別の取引所にある場合は、カスタム取引所と取引所タイプを指定するだけです。
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('feed_tasks', routing_key='feed.#'),
Queue('regular_tasks', routing_key='task.#'),
Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'),
routing_key='image.compress'),
)
これらの用語について混乱している場合は、AMQPを確認する必要があります。
も参照してください
以下の Redisメッセージの優先度に加えて、 Rabbits and Warrens があります。これは、キューと交換について説明している優れたブログ投稿です。 CloudAMQPチュートリアルもあります。RabbitMQのユーザーにとって、 RabbitMQ FAQ は情報源として役立つ可能性があります。
特別なルーティングオプション
RabbitMQメッセージの優先順位
- サポートされているトランスポート
- RabbitMQ
バージョン4.0の新機能。
x-max-priority
引数を設定することにより、優先順位をサポートするようにキューを構成できます。
from kombu import Exchange, Queue
app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks',
queue_arguments={'x-max-priority': 10}),
]
すべてのキューのデフォルト値は、:setting: `task_queue_max_priority` 設定を使用して設定できます。
app.conf.task_queue_max_priority = 10
すべてのタスクのデフォルトの優先度は、:setting: `task_default_priority` 設定を使用して指定することもできます。
app.conf.task_default_priority = 5
Redisメッセージの優先順位
- サポートされているトランスポート
- Redis
Celery Redisトランスポートは優先度フィールドを尊重しますが、Redis自体には優先度の概念がありません。 予期しない動作が発生する可能性があるため、Redisで優先順位を実装する前に、このメモをお読みください。
優先度に基づいてタスクのスケジューリングを開始するには、queue_order_strategyトランスポートオプションを構成する必要があります。
app.conf.broker_transport_options = {
'queue_order_strategy': 'priority',
}
優先度のサポートは、キューごとにn個のリストを作成することによって実装されます。 これは、10(0〜9)の優先度レベルがある場合でも、リソースを節約するために、これらはデフォルトで4つのレベルに統合されることを意味します。 これは、celeryという名前のキューが実際には4つのキューに分割されることを意味します。
最も優先度の高いキューにはceleryという名前が付けられ、他のキューには区切り文字(デフォルトでは x06x16 )と優先度番号がキュー名に追加されます。
['celery', 'celery\x06\x163', 'celery\x06\x166', 'celery\x06\x169']
より多くの優先度レベルまたは別の区切り文字が必要な場合は、priority_stepsおよびsepトランスポートオプションを設定できます。
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'sep': ':',
'queue_order_strategy': 'priority',
}
上記の設定では、次のキュー名が提供されます。
['celery', 'celery:1', 'celery:2', 'celery:3', 'celery:4', 'celery:5', 'celery:6', 'celery:7', 'celery:8', 'celery:9']
とは言うものの、これはサーバーレベルで実装された優先順位ほど良くなることはなく、せいぜい概算である可能性があることに注意してください。 ただし、それでもアプリケーションには十分な場合があります。
AMQP入門書
メッセージ
メッセージはヘッダーと本文で構成されます。 Celeryはヘッダーを使用して、メッセージのコンテンツタイプとそのコンテンツエンコーディングを格納します。 コンテンツタイプは通常、メッセージのシリアル化に使用されるシリアル化形式です。 本文には、実行するタスクの名前、タスクID(UUID)、それを適用するための引数、および再試行回数やETAなどの追加のメタデータが含まれます。
これは、Pythonディクショナリとして表されるタスクメッセージの例です。
{'task': 'myapp.tasks.add',
'id': '54086c5e-6193-4575-8308-dbab76798756',
'args': [4, 4],
'kwargs': {}}
プロデューサー、コンシューマー、ブローカー
メッセージを送信するクライアントは通常、パブリッシャーまたはプロデューサーと呼ばれ、メッセージを受信するエンティティはコンシューマーと呼ばれます。
ブローカーはメッセージサーバーであり、プロデューサーからコンシューマーにメッセージをルーティングします。
これらの用語は、AMQP関連の資料でよく使用されています。
交換、キュー、およびルーティングキー
- メッセージは取引所に送信されます。
- 交換は、メッセージを1つ以上のキューにルーティングします。 いくつかの交換タイプが存在し、ルーティングを行うためのさまざまな方法を提供したり、さまざまなメッセージングシナリオを実装したりします。
- メッセージは、誰かがそれを消費するまでキューで待機します。
- メッセージは、確認応答されるとキューから削除されます。
メッセージを送受信するために必要な手順は次のとおりです。
- 交換を作成する
- キューを作成する
- キューを取引所にバインドします。
Celeryは、:setting: `task_queues` のキューが機能するために必要なエンティティを自動的に作成します(キューの auto_declare 設定がFalse
に設定されている場合を除く)。
これは、3つのキューを持つキュー構成の例です。 1つはビデオ用、もう1つは画像用、もう1つはその他すべてのデフォルトキューです。
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('videos', Exchange('media'), routing_key='media.video'),
Queue('images', Exchange('media'), routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'
取引所の種類
交換タイプは、メッセージが交換を介してルーティングされる方法を定義します。 標準で定義されている交換タイプは、ダイレクト、トピック、ファンアウト、ヘッダーです。 また、Michael Bridgenによる last-value-cacheプラグインのように、非標準の交換タイプがRabbitMQへのプラグインとして利用できます。
直接交換
直接交換は正確なルーティングキーと一致するため、ルーティングキー video でバインドされたキューは、そのルーティングキーを持つメッセージのみを受信します。
トピック交換
トピック交換は、ドットで区切られた単語とワイルドカード文字*
(1つの単語に一致)、および#
(0個以上の単語に一致)を使用してルーティングキーに一致します。
usa.news
、usa.weather
、norway.news
、norway.weather
などのルーティングキーを使用すると、バインディングは*.news
(すべてのニュース)、usa.#
(米国内のすべてのアイテム)、またはusa.weather
(米国内のすべての天気アイテム)。
APIの実践
Celeryには、AMQPAPIへのコマンドラインアクセスに使用される celery amqp というツールが付属しており、キューと交換の作成/削除、キューのパージ、メッセージの送信などの管理タスクにアクセスできます。 AMQP以外のブローカーにも使用できますが、実装が異なるとすべてのコマンドが実装されるとは限りません。
celery amqp の引数に直接コマンドを書き込むことも、引数なしで開始してシェルモードで開始することもできます。
$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>
ここで1>
はプロンプトです。 数字の1は、これまでに実行したコマンドの数です。 使用可能なコマンドのリストについては、help
と入力してください。 オートコンプリートもサポートされているため、コマンドの入力を開始してから tab キーを押すと、一致する可能性のあるリストが表示されます。
メッセージを送信できるキューを作成しましょう。
$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.
これにより、直接交換testexchange
と、testqueue
という名前のキューが作成されました。 キューは、ルーティングキーtestkey
を使用して交換にバインドされます。
今後、ルーティングキーtestkey
を使用して取引所testexchange
に送信されるすべてのメッセージは、このキューに移動されます。 basic.publish
コマンドを使用して、メッセージを送信できます。
4> basic.publish 'This is a message!' testexchange testkey
ok.
メッセージが送信されたので、再度取得できます。 ここでbasic.get
コマンドを使用して、キュー上の新しいメッセージを同期的にポーリングできます(これはメンテナンスタスクでは問題ありませんが、代わりにbasic.consume
を使用するサービスでは)
キューからメッセージをポップします。
5> basic.get testqueue
{'body': 'This is a message!',
'delivery_info': {'delivery_tag': 1,
'exchange': u'testexchange',
'message_count': 0,
'redelivered': False,
'routing_key': u'testkey'},
'properties': {}}
AMQPは確認応答を使用して、メッセージが正常に受信および処理されたことを示します。 メッセージが確認されておらず、コンシューマーチャネルが閉じられている場合、メッセージは別のコンシューマーに配信されます。
上記の構造にリストされている配信タグに注意してください。 接続チャネル内では、受信したすべてのメッセージに一意の配信タグがあります。このタグは、メッセージを確認するために使用されます。 また、配信タグは接続間で一意ではないため、別のクライアントでは、配信タグ 1 がこのチャネルとは異なるメッセージを指している可能性があることにも注意してください。
basic.ack
を使用して、受信したメッセージを確認できます。
6> basic.ack 1
ok.
テストセッション後にクリーンアップするには、作成したエンティティを削除する必要があります。
7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.
ルーティングタスク
キューの定義
Celeryで使用可能なキューは、:setting: `task_queues` 設定によって定義されます。
これは、3つのキューを持つキュー構成の例です。 1つはビデオ用、もう1つは画像用、もう1つはその他すべてのデフォルトキューです。
default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')
app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
ここでは、:setting: `task_default_queue` を使用して、明示的なルートがないタスクをルーティングします。
デフォルトの交換、交換タイプ、およびルーティングキーは、タスクのデフォルトのルーティング値として、および:setting: `task_queues` のエントリのデフォルト値として使用されます。
単一のキューへの複数のバインディングもサポートされています。 両方が同じキューにバインドされている2つのルーティングキーの例を次に示します。
from kombu import Exchange, Queue, binding
media_exchange = Exchange('media', type='direct')
CELERY_QUEUES = (
Queue('media', [
binding(media_exchange, routing_key='media.video'),
binding(media_exchange, routing_key='media.image'),
]),
)
タスクの宛先を指定する
タスクの宛先は、以下によって(順番に)決定されます。
Task.apply_async()
へのルーティング引数。Task
自体で定義されたルーティング関連の属性。- :setting: `task_routes` で定義されている Routers 。
これらの設定をハードコーディングするのではなく、ルーターを使用して構成オプションとして残すことをお勧めします。 これは最も柔軟なアプローチですが、適切なデフォルトをタスク属性として設定することもできます。
ルーター
ルーターは、タスクのルーティングオプションを決定する機能です。
新しいルーターを定義するために必要なのは、署名(name, args, kwargs, options, task=None, **kw)
を使用して関数を定義することだけです。
def route_task(name, args, kwargs, options, task=None, **kw):
if name == 'myapp.tasks.compress_video':
return {'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
queue
キーを返すと、:setting: `task_queues` でそのキューの定義済み設定で展開されます:
{'queue': 'video', 'routing_key': 'video.compress'}
になる–>
{'queue': 'video',
'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
:setting: `task_routes` 設定にルータークラスを追加して、ルータークラスをインストールします。
task_routes = (route_task,)
ルーター機能は、名前で追加することもできます。
task_routes = ('myapp.routers.route_task',)
上記のルーターの例のような単純なタスク名->ルートマッピングの場合、dictを:setting: `task_routes` にドロップするだけで、同じ動作を得ることができます。
task_routes = {
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
},
}
次に、ルーターは順番にトラバースされ、真の値を返す最初のルーターで停止し、それをタスクの最終ルートとして使用します。
複数のルーターを順番に定義することもできます。
task_routes = [
route_task,
{
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
},
]
次にルーターが順番に訪問され、最初に値を返すものが選択されます。
RedisまたはRabbitMQを使用している場合は、ルートでキューのデフォルトの優先度を指定することもできます。
task_routes = {
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
'priority': 10,
},
}
同様に、タスクで apply_async を呼び出すと、そのデフォルトの優先度が上書きされます。
task.apply_async(priority=0)
優先順位とクラスターの応答性
ワーカーのプリフェッチが原因で、同時に送信された一連のタスクが最初は優先順位から外れている可能性があることに注意することが重要です。 ワーカーのプリフェッチを無効にすると、この問題は回避されますが、小さくて高速なタスクでは理想的なパフォーマンスが得られない可能性があります。 ほとんどの場合、 worker_prefetch_multiplier を1に減らすだけで、プリフェッチを完全に無効にするコストをかけずに、システムの応答性を向上させるためのより簡単でクリーンな方法になります。
redisブローカーを使用する場合、優先度の値は逆にソートされることに注意してください。0が最高の優先度です。
ブロードキャスト
Celeryはブロードキャストルーティングもサポートできます。 次に、接続されているすべてのワーカーにタスクのコピーを配信する交換broadcast_tasks
の例を示します。
from kombu.common import Broadcast
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
'tasks.reload_cache': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks'
}
}
これで、tasks.reload_cache
タスクが、このキューから消費するすべてのワーカーに送信されます。
ブロードキャストルーティングの別の例を次に示します。今回はセロリビートスケジュールを使用します。
from kombu.common import Broadcast
from celery.schedules import crontab
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.beat_schedule = {
'test-task': {
'task': 'tasks.reload_cache',
'schedule': crontab(minute=0, hour='*/3'),
'options': {'exchange': 'broadcast_tasks'}
},
}
放送と結果
Celeryの結果は、2つのタスクが同じtask_idを持っている場合に何が起こるかを定義しないことに注意してください。 同じタスクが複数のワーカーに分散されている場合、状態履歴が保存されない可能性があります。
この場合、task.ignore_result
属性を設定することをお勧めします。