ワーカーガイド—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/userguide/workers
移動先:案内検索

労働者ガイド

労働者を始める

デーモン化

おそらく、デーモン化ツールを使用して、バックグラウンドでワーカーを起動することをお勧めします。 一般的なサービスマネージャーを使用してワーカーをデーモンとして起動する方法については、デーモン化を参照してください。

次のコマンドを実行すると、フォアグラウンドでワーカーを起動できます。

$ celery -A proj worker -l INFO

使用可能なコマンドラインオプションの完全なリストについては、workerを参照するか、単に次のようにしてください。

$ celery worker --help

同じマシンで複数のワーカーを起動できますが、--hostname引数でノード名を指定して、個々のワーカーに名前を付けるようにしてください。

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

hostname引数は、次の変数を展開できます。

  • %h:ドメイン名を含むホスト名。
  • %n:ホスト名のみ。
  • %d:ドメイン名のみ。


現在のホスト名が george.example.com の場合、これらは次のように展開されます。

変数 レンプレート 結果
%h worker1@%h [email protected]
%n worker1@%n worker1 @ george
%d worker1@%d [email protected]

:pypi: `supervisor` ユーザーへの注意

%記号は、2番目の記号 %% h を追加してエスケープする必要があります。


労働者を止める

シャットダウンは、:sig: `TERM` 信号を使用して実行する必要があります。

シャットダウンが開始されると、ワーカーは実際に終了する前に、現在実行中のすべてのタスクを終了します。 これらのタスクが重要な場合は、:sig: `KILL` 信号を送信するなど、思い切った操作を行う前に、タスクが完了するのを待つ必要があります。

無限ループなどでスタックしたために、ワーカーが十分な時間後にシャットダウンしない場合は、:sig: `KILL` シグナルを使用して、ワーカーを強制的に終了できます。ただし、現在は注意してください。実行中のタスクは失われます(つまり、タスクにacks_lateオプションが設定されていない場合)。

また、プロセスは:sig: `KILL` シグナルをオーバーライドできないため、ワーカーは子を取得できません。 必ず手動で行ってください。 このコマンドは通常、トリックを実行します。

$ pkill -9 -f 'celery worker'

システムに pkill コマンドがない場合は、少し長いバージョンを使用できます。

$ ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9

バージョン5.2で変更: Linuxシステムで、Celeryはワーカー終了後にすべての子プロセスへの:sig: `KILL` シグナルの送信をサポートするようになりました。 これは、prctl(2)の PR_SET_PDEATHSIG オプションを介して行われます。


ワーカーの再起動

ワーカーを再起動するには、 TERM シグナルを送信して、新しいインスタンスを開始する必要があります。 開発用のワーカーを管理する最も簡単な方法は、 celery multi を使用することです。

$ celery multi start 1 -A proj -l INFO -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid

本番環境への展開では、init-scriptsまたはプロセス監視システムを使用する必要があります(デーモン化を参照)。

停止してからワーカーを起動して再起動する以外に、:sig: `HUP` 信号を使用してワーカーを再起動することもできます。 ワーカーは自分自身を再起動する責任があるため、これは問題が発生しやすく、本番環境では推奨されないことに注意してください。

$ kill -HUP $pid

ノート

:sig: `HUP` による再起動は、ワーカーがデーモンとしてバックグラウンドで実行されている場合にのみ機能します(制御端末はありません)。

:sig: `HUP` は、そのプラットフォームの制限により、macOSでは無効になっています。


プロセスシグナル

ワーカーのメインプロセスは、次のシグナルをオーバーライドします。

:sig: `TERM` ウォームシャットダウン、タスクが完了するのを待ちます。
:sig: `QUIT` コールドシャットダウン、できるだけ早く終了
:sig: `USR1` すべてのアクティブなスレッドのトレースバックをダンプします。
:sig: `USR2` リモートデバッグ。celery.contrib.rdbを参照してください。


ファイルパスの変数

--logfile--pidfile、および--statedbのファイルパス引数には、ワーカーが展開する変数を含めることができます。

ノード名の置換

  • %p:完全なノード名。
  • %h:ドメイン名を含むホスト名。
  • %n:ホスト名のみ。
  • %d:ドメイン名のみ。
  • %i:プリフォークプールプロセスインデックス。MainProcessの場合は0。
  • %I:区切り文字付きのプリフォークプールプロセスインデックス。

たとえば、現在のホスト名が[email protected]の場合、これらは次のように展開されます。

  • --logfile=%p.log-> [email protected]
  • --logfile=%h.log-> foo.example.com.log
  • --logfile=%n.log-> george.log
  • --logfile=%d.log-> example.com.log


プレフォークプール工程指数

プリフォークプールプロセスインデックス指定子は、最終的にファイルを開く必要があるプロセスに応じて、異なるファイル名に展開されます。

これを使用して、子プロセスごとに1つのログファイルを指定できます。

プロセスが終了した場合、または自動スケール/ maxtasksperchild /時間制限が使用されている場合でも、数値はプロセス制限内にとどまることに注意してください。 つまり、数値は工程指数であり、工程数やpidではありません。

  • %i-プールプロセスインデックス。MainProcessの場合は0。

    -n [email protected] -c2 -f %n-%i.logは次の3つのログファイルになります。

    • worker1-0.log(メインプロセス)

    • worker1-1.log(プールプロセス1)

    • worker1-2.log(プールプロセス2)


  • %I-区切り文字付きのプールプロセスインデックス。

    -n [email protected] -c2 -f %n%I.logは次の3つのログファイルになります。

    • worker1.log(メインプロセス)

    • worker1-1.log(プールプロセス1)

    • worker1-2.log(プールプロセス2)



並行性

デフォルトでは、マルチプロセッシングはタスクの同時実行を実行するために使用されますが、 Eventlet を使用することもできます。 ワーカープロセス/スレッドの数は、--concurrency引数を使用して変更でき、デフォルトでは、マシンで使用可能なCPUの数になります。

プロセス数(マルチプロセッシング/プリフォークプール)

通常はプールプロセスが多いほど良いですが、プールプロセスを追加するとパフォーマンスに悪影響を与えるカットオフポイントがあります。 複数のワーカーインスタンスを実行すると、単一のワーカーよりもパフォーマンスが向上する可能性があることを裏付ける証拠さえあります。 たとえば、それぞれ10個のプールプロセスを持つ3人のワーカー。 これは、アプリケーション、作業負荷、タスクの実行時間、およびその他の要因によって異なるため、自分に最適な数値を見つけるために実験する必要があります。


リモコン

バージョン2.0の新機能。


celeryコマンド

celery プログラムは、コマンドラインからリモートコントロールコマンドを実行するために使用されます。 以下にリストされているすべてのコマンドをサポートします。 詳細については、管理コマンドラインユーティリティ(検査/制御)を参照してください。

プールのサポート
prefork、eventlet、gevent、thread 、ブロッキング: solo (注を参照)
ブローカーサポート
amqp、redis

ワーカーは、優先度の高いブロードキャストメッセージキューを使用してリモート制御することができます。 コマンドは、すべてのワーカー、または特定のワーカーのリストに送信できます。

コマンドは応答を持つこともできます。 その後、クライアントはそれらの応答を待って収集できます。 クラスター内で使用可能なワーカーの数を知る中央権限がないため、応答を送信できるワーカーの数を見積もる方法もありません。そのため、クライアントには構成可能なタイムアウト(応答が到着するまでの秒単位の期限)があります。 このタイムアウトのデフォルトは1秒です。 ワーカーが期限内に返信しない場合は、必ずしもワーカーが返信しなかった、またはさらに悪いことに死んでいることを意味するわけではありませんが、単にネットワークの遅延またはワーカーのコマンド処理が遅いことが原因である可能性があるため、それに応じてタイムアウトを調整してください。

タイムアウトに加えて、クライアントは待機する応答の最大数を指定できます。 宛先が指定されている場合、この制限は宛先ホストの数に設定されます。

ノート

soloプールはリモート制御コマンドをサポートしますが、実行中のタスクは待機中の制御コマンドをブロックするため、ワーカーが非常に忙しい場合は使用が制限されます。 その場合、クライアントでの応答を待機するタイムアウトを増やす必要があります。


broadcast()機能

これは、ワーカーにコマンドを送信するために使用されるクライアント関数です。 一部のリモートコントロールコマンドには、rate_limit()ping()など、バックグラウンドでbroadcast()を使用する高レベルのインターフェイスもあります。

:control: `rate_limit` コマンドとキーワード引数の送信:

>>> app.control.broadcast('rate_limit',
...                          arguments={'task_name': 'myapp.mytask',
...                                     'rate_limit': '200/m'})

これにより、応答を待たずにコマンドが非同期に送信されます。 返信をリクエストするには、 reply 引数を使用する必要があります。

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)
[{'worker1.example.com': 'New rate limit set successfully'},
 {'worker2.example.com': 'New rate limit set successfully'},
 {'worker3.example.com': 'New rate limit set successfully'}]

destination 引数を使用して、コマンドを受け取るワーカーのリストを指定できます。

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask',
...     'rate_limit': '200/m'}, reply=True,
...                             destination=['[email protected]'])
[{'worker1.example.com': 'New rate limit set successfully'}]

もちろん、高レベルのインターフェースを使用してレート制限を設定する方がはるかに便利ですが、broadcast()を使用してのみ要求できるコマンドがあります。


コマンド

revoke:タスクを取り消す

プールのサポート
すべて、preforkとeventletでのみサポートされている終了
ブローカーサポート
amqp、redis
指図
セロリ-プロジェクトコントロールの取り消し

すべてのワーカーノードは、取り消されたタスクIDのメモリを、メモリ内またはディスク上に永続的に保持します(永続的な取り消しを参照)。

ワーカーが取り消し要求を受信すると、タスクの実行をスキップしますが、 terminate オプションが設定されていない限り、すでに実行中のタスクを終了しません。

ノート

終了オプションは、タスクがスタックした場合の管理者にとっての最後の手段です。 タスクを終了するためではなく、タスクを実行しているプロセスを終了するためであり、そのプロセスはシグナルが送信された時点ですでに別のタスクの処理を開始している可能性があるため、これをプログラムで呼び出すことは絶対にしないでください。


terminate が設定されている場合、タスクを処理しているワーカー子プロセスは終了します。 送信されるデフォルトの信号は TERM ですが、 signal 引数を使用してこれを指定できます。 Signalは、Python標準ライブラリのsignalモジュールで定義されている任意の信号の大文字の名前にすることができます。

タスクを終了すると、タスクも取り消されます。

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True)

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True, signal='SIGKILL')

複数のタスクを取り消す

バージョン3.1の新機能。


revokeメソッドは、複数のタスクを一度に取り消すlist引数も受け入れます。

>>> app.control.revoke([
...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
...    'f565793e-b041-4b2b-9ca4-dca22762a55d',
...    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

GroupResult.revokeメソッドは、バージョン3.1以降これを利用しています。


永続的な取り消し

タスクの取り消しは、すべてのワーカーにブロードキャストメッセージを送信することで機能し、ワーカーは取り消されたタスクのリストをメモリに保持します。 ワーカーが起動すると、取り消されたタスクがクラスター内の他のワーカーと同期されます。

取り消されたタスクのリストはメモリ内にあるため、すべてのワーカーを再起動すると、取り消されたIDのリストも表示されなくなります。 再起動間でこのリストを保持する場合は、セロリワーカーの –statedb 引数を使用して、これらを保存するファイルを指定する必要があります。

$ celery -A proj worker -l INFO --statedb=/var/run/celery/worker.state

または、 celery multi を使用する場合は、ワーカーインスタンスごとに1つのファイルを作成するため、%n 形式を使用して現在のノード名を展開します。

celery multi start 2 -l INFO --statedb=/var/run/celery/%n.state

ファイルパスの変数も参照してください。

取り消しが機能するには、リモートコントロールコマンドが機能している必要があることに注意してください。 現時点では、リモートコントロールコマンドはRabbitMQ(amqp)とRedisでのみサポートされています。


制限時間

バージョン2.0の新機能。


プールのサポート
prefork / gevent(下記の注を参照)

ソフト、それともハード?

制限時間は、ソフトとハードの2つの値で設定されます。 ソフト制限時間により、タスクは例外をキャッチして、強制終了する前にクリーンアップできます。ハードタイムアウトはキャッチできず、タスクを強制的に終了します。

単一のタスクが永久に実行される可能性があります。発生しないイベントを待機しているタスクが多数ある場合、ワーカーが新しいタスクを無期限に処理するのをブロックします。 このシナリオの発生を防ぐ最善の方法は、時間制限を有効にすることです。

制限時間( –time-limit )は、タスクを実行しているプロセスが終了して新しいプロセスに置き換えられるまでにタスクを実行できる最大秒数です。 ソフト制限時間( –soft-time-limit )を有効にすることもできます。これにより、ハード制限時間が終了する前にタスクがクリーンアップするためにキャッチできる例外が発生します。

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

制限時間は、:setting: `task_time_limit` / :setting:` task_soft_time_limit` 設定を使用して設定することもできます。

ノート

現在、:sig: `SIGUSR1` 信号をサポートしていないプラットフォームでは時間制限は機能しません。


ノート

geventプールは、ソフト時間制限を実装していません。 さらに、タスクがブロックしている場合、ハードタイム制限は適用されません。


実行時の制限時間の変更

バージョン2.3の新機能。


ブローカーサポート
amqp、redis

time_limitという名前の、タスクのソフト時間制限とハード時間制限の両方を変更できるリモートコントロールコマンドがあります。

tasks.crawl_the_webタスクの制限時間を1分のソフト制限時間と、2分のハード制限時間に変更する例:

>>> app.control.time_limit('tasks.crawl_the_web',
                           soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

制限時間の変更後に実行を開始するタスクのみが影響を受けます。


レート制限

実行時のレート制限の変更

myapp.mytask タスクのレート制限を変更して、そのタイプの最大200のタスクを毎分実行する例:

>>> app.control.rate_limit('myapp.mytask', '200/m')

上記は宛先を指定していないため、変更要求はクラスター内のすべてのワーカーインスタンスに影響します。 ワーカーの特定のリストにのみ影響を与えたい場合は、destination引数を含めることができます。

>>> app.control.rate_limit('myapp.mytask', '200/m',
...            destination=['[email protected]'])

警告

これは、:setting: `worker_disable_rate_limits` 設定が有効になっているワーカーには影響しません。


子設定あたりの最大タスク

バージョン2.0の新機能。


プールのサポート
プリフォーク

このオプションを使用すると、ワーカーが新しいプロセスに置き換えられる前に実行できるタスクの最大数を構成できます。

これは、たとえばクローズドソースのC拡張機能から制御できないメモリリークがある場合に役立ちます。

このオプションは、workers --max-tasks-per-child引数を使用するか、:setting: `worker_max_tasks_per_child` 設定を使用して設定できます。


子設定あたりの最大メモリ

バージョン4.0の新機能。


プールのサポート
プリフォーク

このオプションを使用すると、ワーカーが新しいプロセスに置き換えられる前に実行できる常駐メモリの最大量を構成できます。

これは、たとえばクローズドソースのC拡張機能から制御できないメモリリークがある場合に役立ちます。

このオプションは、workers --max-memory-per-child引数を使用するか、:setting: `worker_max_memory_per_child` 設定を使用して設定できます。


自動スケーリング

バージョン2.2の新機能。


プールのサポート
preforkgevent

autoscaler コンポーネントは、負荷に基づいてプールのサイズを動的に変更するために使用されます。

  • *; オートスケーラーは、実行する作業がある場合にプールプロセスを追加します。
    *;* ワークロードが低いときにプロセスの削除を開始します。

これは、--autoscaleオプションによって有効になります。このオプションには、プールプロセスの最大数と最小数の2つの数値が必要です。

--autoscale=AUTOSCALE
     Enable autoscaling by providing
     max_concurrency,min_concurrency.  Example:
       --autoscale=10,3 (always keep 3 processes, but grow to
      10 if necessary).

Autoscalerをサブクラス化することにより、オートスケーラーの独自のルールを定義することもできます。 メトリックのいくつかのアイデアには、負荷平均または使用可能なメモリの量が含まれます。 :setting: `worker_autoscaler` 設定でカスタムオートスケーラーを指定できます。


キュー

ワーカーインスタンスは、任意の数のキューから消費できます。 デフォルトでは、:setting: `task_queues` 設定で定義されたすべてのキューから消費されます(指定されていない場合は、celeryという名前のデフォルトキューにフォールバックします)。

-Qオプションにキューのコンマ区切りリストを指定することにより、起動時に消費するキューを指定できます。

$ celery -A proj worker -l INFO -Q foo,bar,baz

キュー名が:setting: `task_queues` で定義されている場合、その構成が使用されますが、キューのリストで定義されていない場合、Celeryは自動的に新しいキューを生成します([X215X ]:setting: `task_create_missing_queues` オプション)。

リモートコントロールコマンド:control: `add_consumer` および:control:` cancel_consumer` を使用して、実行時にキューからの消費を開始および停止するようにワーカーに指示することもできます。

キュー:コンシューマーの追加

:control: `add_consumer` 制御コマンドは、1人以上のワーカーにキューからの消費を開始するように指示します。 この操作はべき等です。

クラスタ内のすべてのワーカーに「foo」という名前のキューから消費を開始するように指示するには、セロリコントロールプログラムを使用できます。

$ celery -A proj control add_consumer foo
-> worker1.local: OK
    started consuming from u'foo'

特定のワーカーを指定する場合は、--destination引数を使用できます。

$ celery -A proj control add_consumer foo -d [email protected]

@control.add_consumer()メソッドを使用して、同じことを動的に実行できます。

>>> app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

>>> app.control.add_consumer('foo', reply=True,
...                          destination=['[email protected]'])
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

ここまでは、自動キューを使用した例のみを示しました。さらに制御が必要な場合は、exchange、routing_key、その他のオプションを指定することもできます。

>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['[email protected]', '[email protected]'])

キュー:消費者のキャンセル

:control: `cancel_consumer` 制御コマンドを使用して、キュー名でコンシューマーをキャンセルできます。

クラスター内のすべてのワーカーにキューからの消費をキャンセルさせるには、セロリコントロールプログラムを使用できます。

$ celery -A proj control cancel_consumer foo

--destination引数を使用して、コマンドを実行するワーカーまたはワーカーのリストを指定できます。

$ celery -A proj control cancel_consumer foo -d [email protected]

@control.cancel_consumer()メソッドを使用して、プログラムでコンシューマーをキャンセルすることもできます。

>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]

キュー:アクティブなキューのリスト

:control: `active_queues` 制御コマンドを使用して、ワーカーが消費するキューのリストを取得できます。

$ celery -A proj inspect active_queues
[...]

他のすべてのリモートコントロールコマンドと同様に、これは、要求に応答する必要があるワーカーを指定するために使用される--destination引数もサポートします。

$ celery -A proj inspect active_queues -d [email protected]
[...]

これは、active_queues()メソッドを使用してプログラムで実行することもできます。

>>> app.control.inspect().active_queues()
[...]

>>> app.control.inspect(['worker1.local']).active_queues()
[...]

労働者の検査

@control.inspectを使用すると、実行中のワーカーを検査できます。 内部でリモートコントロールコマンドを使用します。

celeryコマンドを使用してワーカーを検査することもでき、@controlインターフェイスと同じコマンドをサポートします。

>>> # Inspect all nodes.
>>> i = app.control.inspect()

>>> # Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
                            'worker2.example.com'])

>>> # Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')

登録済みタスクのダンプ

registered()を使用して、ワーカーに登録されているタスクのリストを取得できます。

>>> i.registered()
[{'worker1.example.com': ['tasks.add',
                          'tasks.sleeptask']}]

現在実行中のタスクのダンプ

active()を使用して、アクティブなタスクのリストを取得できます。

>>> i.active()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

スケジュールされた(ETA)タスクのダンプ

scheduled()を使用すると、スケジュールを待機しているタスクのリストを取得できます。

>>> i.scheduled()
[{'worker1.example.com':
    [{'eta': '2010-06-07 09:07:52', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d',
        'args': '[1]',
        'kwargs': '{}'}},
     {'eta': '2010-06-07 09:07:53', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '49661b9a-aa22-4120-94b7-9ee8031d219d',
        'args': '[2]',
        'kwargs': '{}'}}]}]

ノート

これらはETA /カウントダウン引数を持つタスクであり、定期的なタスクではありません。


予約済みタスクのダンプ

予約済みタスクは、受信されたが、まだ実行されるのを待っているタスクです。

reserved()を使用して、これらのリストを取得できます。

>>> i.reserved()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

統計

リモートコントロールコマンドinspect stats(またはstats())は、ワーカーに関する有用な(またはあまり有用ではない)統計の長いリストを提供します。

$ celery -A proj inspect stats

出力の詳細については、stats()のリファレンスドキュメントを参照してください。


追加コマンド

リモートシャットダウン

このコマンドは、ワーカーをリモートで正常にシャットダウンします。

>>> app.control.broadcast('shutdown') # shutdown all workers
>>> app.control.broadcast('shutdown', destination='[email protected]')

ping

このコマンドは、生きているワーカーにpingを要求します。 労働者は文字列「ポン」で応答します、そしてそれはちょうどそれについてです。 カスタムタイムアウトを指定しない限り、返信にはデフォルトの1秒のタイムアウトが使用されます。

>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
 {'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

ping()は destination 引数もサポートしているため、pingを実行するワーカーを指定できます。

>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

イベントの有効化/無効化

enable_events 、 disable_events コマンドを使用して、イベントを有効/無効にできます。 これは、セロリイベント / セロリモンを使用してワーカーを一時的に監視するのに役立ちます。

>>> app.control.enable_events()
>>> app.control.disable_events()

独自のリモコンコマンドを書く

リモートコントロールコマンドには次の2種類があります。

  • 検査コマンド

    副作用はありません。通常、現在登録されているタスクのリスト、アクティブなタスクのリストなど、ワーカーで見つかった値を返すだけです。

  • 制御コマンド

    消費する新しいキューを追加するなどの副作用を実行します。

リモートコントロールコマンドはコントロールパネルに登録されており、現在のControlDispatchインスタンスという1つの引数を取ります。 そこから、必要に応じてアクティブなConsumerにアクセスできます。

タスクのプリフェッチカウントをインクリメントする制御コマンドの例を次に示します。

from celery.worker.control import control_command

@control_command(
    args=[('n', int)],
    signature='[N=1]',  # <- used for help on the command-line.
)
def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}

ワーカーによってインポートされるモジュールにこのコードを追加してください。これは、Celeryアプリが定義されているモジュールと同じであるか、:setting: `imports` にモジュールを追加できます。設定。

ワーカーを再起動して、制御コマンドが登録されるようにします。これで、 celery control ユーティリティを使用してコマンドを呼び出すことができます。

$ celery -A proj control increase_prefetch_count 3

celery inspect プログラムにアクションを追加することもできます。たとえば、現在のプリフェッチカウントを読み取るアクションです。

from celery.worker.control import inspect_command

@inspect_command()
def current_prefetch_count(state):
    return {'prefetch_count': state.consumer.qos.value}

ワーカーを再起動した後、 celery inspect プログラムを使用してこの値を照会できるようになりました。

$ celery -A proj inspect current_prefetch_count