シグナル—Pythonドキュメント
信号
シグナルにより、分離されたアプリケーションは、アプリケーションの他の場所で特定のアクションが発生したときに通知を受信できます。
Celeryには、アプリケーションが特定のアクションの動作を強化するためにフックできる多くのシグナルが付属しています。
基本
いくつかの種類のイベントがシグナルをトリガーします。これらのシグナルに接続して、トリガー時にアクションを実行できます。
:signal: `after_task_publish` シグナルへの接続例:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
一部の信号には、フィルタリングできる送信者もあります。 たとえば、:signal: `after_task_publish` シグナルはタスク名を送信者として使用するため、sender
引数をconnect
に指定することで、呼び出されるハンドラーを接続できます。 「proj.tasks.add」という名前のタスクが公開されるたびに:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
信号はdjango.core.dispatch
と同じ実装を使用します。 その結果、他のキーワードパラメータ(例:signal)がデフォルトですべてのシグナルハンドラに渡されます。
シグナルハンドラのベストプラクティスは、任意のキーワード引数(つまり、**kwargs
)を受け入れることです。 そうすれば、新しいCeleryバージョンは、ユーザーコードを壊すことなく引数を追加できます。
信号
タスク信号
before_task_publish
バージョン3.1の新機能。
タスクが公開される前にディスパッチされます。 これは、タスクを送信するプロセスで実行されることに注意してください。
送信者は、送信されるタスクの名前です。
引数を提供します:
body
exchange
送信先の取引所の名前または
Exchange
オブジェクト。routing_key
メッセージの送信時に使用するルーティングキー。
headers
アプリケーションヘッダーのマッピング(変更可能)。
properties
メッセージのプロパティ(変更可能)
declare
メッセージを公開する前に宣言するエンティティのリスト(
Exchange
、Queue
、またはbinding
)。 変更できます。retry_policy
再試行オプションのマッピング。
kombu.Connection.ensure()
の任意の引数にすることができ、変更することができます。
after_task_publish
タスクがブローカーに送信されたときにディスパッチされます。 これは、タスクを送信したプロセスで実行されることに注意してください。
送信者は、送信されるタスクの名前です。
引数を提供します:
headers
body
exchange
交換の名前または使用された
Exchange
オブジェクト。routing_key
使用されるルーティングキー。
task_prerun
タスクが実行される前にディスパッチされます。
Senderは、実行中のタスクオブジェクトです。
引数を提供します:
task_id
実行するタスクのID。
task
実行中のタスク。
args
タスクの位置引数。
kwargs
タスクのキーワード引数。
task_postrun
タスクが実行された後にディスパッチされます。
Senderは、実行されるタスクオブジェクトです。
引数を提供します:
task_id
実行するタスクのID。
task
実行中のタスク。
args
タスクの位置引数。
kwargs
タスクのキーワード引数。
retval
タスクの戻り値。
state
結果の状態の名前。
task_retry
タスクが再試行されるときにディスパッチされます。
送信者はタスクオブジェクトです。
引数を提供します:
request
現在のタスクリクエスト。
reason
再試行の理由(通常は例外インスタンスですが、常に
str
に強制変換できます)。einfo
トレースバック(
billiard.einfo.ExceptionInfo
オブジェクト)を含む詳細な例外情報。
task_success
タスクが成功したときにディスパッチされます。
Senderは、実行されるタスクオブジェクトです。
引数を提供します
- *;
result
- タスクの戻り値。
task_failure
タスクが失敗したときにディスパッチされます。
Senderは、実行されるタスクオブジェクトです。
引数を提供します:
task_id
タスクのID。
exception
例外インスタンスが発生しました。
args
タスクが呼び出された位置引数。
kwargs
タスクが呼び出されたキーワード引数。
traceback
スタックトレースオブジェクト。
einfo
billiard.einfo.ExceptionInfo
インスタンス。
task_internal_error
タスクの実行中に内部Celeryエラーが発生したときにディスパッチされます。
Senderは、実行されるタスクオブジェクトです。
引数を提供します:
task_id
タスクのID。
args
タスクが呼び出された位置引数。
kwargs
タスクが呼び出されたキーワード引数。
request
元のリクエスト辞書。 これは、例外が発生するまでに
task.request
の準備ができていない可能性があるためです。exception
例外インスタンスが発生しました。
traceback
スタックトレースオブジェクト。
einfo
billiard.einfo.ExceptionInfo
インスタンス。
task_received
タスクがブローカーから受信され、実行の準備ができたときにディスパッチされます。
送信者はコンシューマーオブジェクトです。
引数を提供します:
request
これは
Request
インスタンスであり、task.request
ではありません。 プリフォークプールを使用する場合、このシグナルは親プロセスでディスパッチされるため、task.request
は使用できないため、使用しないでください。 同じフィールドの多くを共有するため、代わりにこのオブジェクトを使用してください。
task_revoked
タスクがワーカーによって取り消された/終了したときにディスパッチされます。
送信者は、取り消された/終了したタスクオブジェクトです。
引数を提供します:
request
これは
Request
インスタンスであり、task.request
ではありません。 プリフォークプールを使用する場合、このシグナルは親プロセスでディスパッチされるため、task.request
は使用できないため、使用しないでください。 同じフィールドの多くを共有するため、代わりにこのオブジェクトを使用してください。terminated
タスクが終了した場合は、
True
に設定してください。signum
タスクを終了するために使用される信号番号。 これが
None
で、終了がTrue
の場合、:sig: `TERM` と見なす必要があります。expired
タスクの有効期限が切れた場合は、
True
に設定してください。
task_unknown
ワーカーが登録されていないタスクのメッセージを受信したときにディスパッチされます。
送信者はワーカーConsumer
です。
引数を提供します:
name
レジストリにタスクの名前が見つかりません。
id
メッセージにあるタスクID。
message
生のメッセージオブジェクト。
exc
発生したエラー。
task_rejected
ワーカーがタスクキューの1つに不明なタイプのメッセージを受信したときにディスパッチされます。
送信者はワーカーConsumer
です。
引数を提供します:
message
生のメッセージオブジェクト。
exc
発生したエラー(ある場合)。
アプリのシグナル
import_modules
このシグナルは、プログラム(ワーカー、ビート、シェル)などが:setting: `include` および:setting:` imports` 設定のモジュールをインポートするように要求したときに送信されます。 。
送信者はアプリインスタンスです。
労働者の合図
celeryd_after_setup
このシグナルは、ワーカーインスタンスがセットアップされた後、runを呼び出す前に送信されます。 これは、celery worker -Q
オプションのすべてのキューが有効になっている、ログが設定されているなどを意味します。
celery worker -Q
オプションを無視して、常に消費されるカスタムキューを追加するために使用できます。 ワーカーごとに直接キューを設定する例を次に示します。これらのキューを使用して、タスクを特定のワーカーにルーティングできます。
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
instance.app.amqp.queues.select_add(queue_name)
引数を提供します:
sender
ワーカーのノード名。
instance
conf
現在のアプリの構成。
celeryd_init
これは、セロリワーカーが起動したときに送信される最初の信号です。 sender
はワーカーのホスト名であるため、このシグナルを使用してワーカー固有の構成をセットアップできます。
from celery.signals import celeryd_init
@celeryd_init.connect(sender='[email protected]')
def configure_worker12(conf=None, **kwargs):
conf.task_default_rate_limit = '10/m'
または、複数のワーカーの構成をセットアップするには、接続時に送信者の指定を省略できます。
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
if sender in ('[email protected]', '[email protected]'):
conf.task_default_rate_limit = '10/m'
if sender == '[email protected]':
conf.worker_prefetch_multiplier = 0
引数を提供します:
sender
ワーカーのノード名。
instance
conf
現在のアプリの構成。
options
コマンドライン引数からワーカーに渡されるオプション(デフォルトを含む)。
worker_init
ワーカーが開始される前に派遣されます。
worker_ready
労働者が仕事を受け入れる準備ができたときに派遣されます。
heartbeat_sent
セロリがワーカーハートビートを送信したときにディスパッチされます。
送信者はcelery.worker.heartbeat.Heart
インスタンスです。
worker_shutting_down
ワーカーがシャットダウンプロセスを開始するとディスパッチされます。
引数を提供します:
sig
受信したPOSIX信号。
how
シャットダウン方法、ウォームまたはコールド。
exitcode
メインプロセスが終了するときに使用される出口コード。
worker_process_init
開始時にすべてのプール子プロセスにディスパッチされます。
このシグナルにアタッチされたハンドラーは、4秒を超えてブロックしてはならないことに注意してください。そうしないと、プロセスが開始に失敗したと想定してプロセスが強制終了されます。
worker_process_shutdown
すべてのプール子プロセスで、終了する直前にディスパッチされます。
注:finally
ブロックと同様に、このシグナルがディスパッチされる保証はありません。シャットダウン時にハンドラーが呼び出されることを保証することは不可能であり、呼び出された場合、中に中断される可能性があります。
引数を提供します:
pid
シャットダウンしようとしている子プロセスのpid。
exitcode
子プロセスが終了するときに使用される出口コード。
worker_shutdown
ワーカーがシャットダウンしようとしているときにディスパッチされます。
ビートシグナル
beat_init
セロリビートの開始時にディスパッチされます(スタンドアロンまたは組み込み)。
送信者はcelery.beat.Service
インスタンスです。
beat_embedded_init
セロリビートが組み込みプロセスとして開始されると、:signal: `beat_init` シグナルに加えてディスパッチされます。
送信者はcelery.beat.Service
インスタンスです。
イベントレット信号
eventlet_pool_started
イベントレットプールが開始されたときに送信されます。
送信者はcelery.concurrency.eventlet.TaskPool
インスタンスです。
eventlet_pool_preshutdown
イベントレットプールが残りのワーカーを待機するように要求される直前に、ワーカーがシャットダウンしたときに送信されます。
送信者はcelery.concurrency.eventlet.TaskPool
インスタンスです。
eventlet_pool_postshutdown
プールが参加し、ワーカーがシャットダウンする準備ができたときに送信されます。
送信者はcelery.concurrency.eventlet.TaskPool
インスタンスです。
eventlet_pool_apply
タスクがプールに適用されるたびに送信されます。
送信者はcelery.concurrency.eventlet.TaskPool
インスタンスです。
引数を提供します:
target
ターゲット関数。
args
位置引数。
kwargs
キーワード引数。
ロギングシグナル
setup_logging
この信号が接続されている場合、Celeryはロガーを構成しないため、これを使用して、ログ構成を独自のもので完全にオーバーライドできます。
Celeryによるロギング構成設定を拡張したい場合は、:signal: `after_setup_logger` および:signal:` after_setup_task_logger` シグナルを使用できます。
引数を提供します:
loglevel
ロギングオブジェクトのレベル。
logfile
ログファイルの名前。
format
ログ形式の文字列。
colorize
ログメッセージに色を付けるかどうかを指定します。
after_setup_logger
すべてのグローバルロガー(タスクロガーではない)のセットアップ後に送信されます。 ロギング構成を拡張するために使用されます。
引数を提供します:
logger
ロガーオブジェクト。
loglevel
ロギングオブジェクトのレベル。
logfile
ログファイルの名前。
format
ログ形式の文字列。
colorize
ログメッセージに色を付けるかどうかを指定します。
after_setup_task_logger
すべてのタスクロガーのセットアップ後に送信されます。 ロギング構成を拡張するために使用されます。
引数を提供します:
logger
ロガーオブジェクト。
loglevel
ロギングオブジェクトのレベル。
logfile
ログファイルの名前。
format
ログ形式の文字列。
colorize
ログメッセージに色を付けるかどうかを指定します。
コマンド信号
user_preload_options
この信号は、Celeryコマンドラインプログラムのいずれかがユーザーのプリロードオプションの解析を終了した後に送信されます。
celery アンブレラコマンドにコマンドライン引数を追加するために使用できます。
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
'--monitoring', action='store_true',
help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
if options['monitoring']:
enable_monitoring()
送信者はCommand
インスタンスであり、値は呼び出されたプログラムによって異なります(たとえば、アンブレラコマンドの場合はCeleryCommand
)オブジェクトになります)。
引数を提供します:
app
アプリインスタンス。
options
解析されたユーザープリロードオプションのマッピング(デフォルト値を使用)。
非推奨のシグナル