シグナル—Pythonドキュメント
信号
シグナルにより、分離されたアプリケーションは、アプリケーションの他の場所で特定のアクションが発生したときに通知を受信できます。
Celeryには、アプリケーションが特定のアクションの動作を強化するためにフックできる多くのシグナルが付属しています。
基本
いくつかの種類のイベントがシグナルをトリガーします。これらのシグナルに接続して、トリガー時にアクションを実行できます。
:signal: `after_task_publish` シグナルへの接続例:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
一部の信号には、フィルタリングできる送信者もあります。 たとえば、:signal: `after_task_publish` シグナルはタスク名を送信者として使用するため、sender引数をconnectに指定することで、呼び出されるハンドラーを接続できます。 「proj.tasks.add」という名前のタスクが公開されるたびに:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
信号はdjango.core.dispatchと同じ実装を使用します。 その結果、他のキーワードパラメータ(例:signal)がデフォルトですべてのシグナルハンドラに渡されます。
シグナルハンドラのベストプラクティスは、任意のキーワード引数(つまり、**kwargs)を受け入れることです。 そうすれば、新しいCeleryバージョンは、ユーザーコードを壊すことなく引数を追加できます。
信号
タスク信号
before_task_publish
バージョン3.1の新機能。
タスクが公開される前にディスパッチされます。 これは、タスクを送信するプロセスで実行されることに注意してください。
送信者は、送信されるタスクの名前です。
引数を提供します:
bodyexchange送信先の取引所の名前または
Exchangeオブジェクト。routing_keyメッセージの送信時に使用するルーティングキー。
headersアプリケーションヘッダーのマッピング(変更可能)。
propertiesメッセージのプロパティ(変更可能)
declareメッセージを公開する前に宣言するエンティティのリスト(
Exchange、Queue、またはbinding)。 変更できます。retry_policy再試行オプションのマッピング。
kombu.Connection.ensure()の任意の引数にすることができ、変更することができます。
after_task_publish
タスクがブローカーに送信されたときにディスパッチされます。 これは、タスクを送信したプロセスで実行されることに注意してください。
送信者は、送信されるタスクの名前です。
引数を提供します:
headersbodyexchange交換の名前または使用された
Exchangeオブジェクト。routing_key使用されるルーティングキー。
task_prerun
タスクが実行される前にディスパッチされます。
Senderは、実行中のタスクオブジェクトです。
引数を提供します:
task_id実行するタスクのID。
task実行中のタスク。
argsタスクの位置引数。
kwargsタスクのキーワード引数。
task_postrun
タスクが実行された後にディスパッチされます。
Senderは、実行されるタスクオブジェクトです。
引数を提供します:
task_id実行するタスクのID。
task実行中のタスク。
argsタスクの位置引数。
kwargsタスクのキーワード引数。
retvalタスクの戻り値。
state結果の状態の名前。
task_retry
タスクが再試行されるときにディスパッチされます。
送信者はタスクオブジェクトです。
引数を提供します:
request現在のタスクリクエスト。
reason再試行の理由(通常は例外インスタンスですが、常に
strに強制変換できます)。einfoトレースバック(
billiard.einfo.ExceptionInfoオブジェクト)を含む詳細な例外情報。
task_success
タスクが成功したときにディスパッチされます。
Senderは、実行されるタスクオブジェクトです。
引数を提供します
- *;
result- タスクの戻り値。
task_failure
タスクが失敗したときにディスパッチされます。
Senderは、実行されるタスクオブジェクトです。
引数を提供します:
task_idタスクのID。
exception例外インスタンスが発生しました。
argsタスクが呼び出された位置引数。
kwargsタスクが呼び出されたキーワード引数。
tracebackスタックトレースオブジェクト。
einfobilliard.einfo.ExceptionInfoインスタンス。
task_internal_error
タスクの実行中に内部Celeryエラーが発生したときにディスパッチされます。
Senderは、実行されるタスクオブジェクトです。
引数を提供します:
task_idタスクのID。
argsタスクが呼び出された位置引数。
kwargsタスクが呼び出されたキーワード引数。
request元のリクエスト辞書。 これは、例外が発生するまでに
task.requestの準備ができていない可能性があるためです。exception例外インスタンスが発生しました。
tracebackスタックトレースオブジェクト。
einfobilliard.einfo.ExceptionInfoインスタンス。
task_received
タスクがブローカーから受信され、実行の準備ができたときにディスパッチされます。
送信者はコンシューマーオブジェクトです。
引数を提供します:
requestこれは
Requestインスタンスであり、task.requestではありません。 プリフォークプールを使用する場合、このシグナルは親プロセスでディスパッチされるため、task.requestは使用できないため、使用しないでください。 同じフィールドの多くを共有するため、代わりにこのオブジェクトを使用してください。
task_revoked
タスクがワーカーによって取り消された/終了したときにディスパッチされます。
送信者は、取り消された/終了したタスクオブジェクトです。
引数を提供します:
requestこれは
Requestインスタンスであり、task.requestではありません。 プリフォークプールを使用する場合、このシグナルは親プロセスでディスパッチされるため、task.requestは使用できないため、使用しないでください。 同じフィールドの多くを共有するため、代わりにこのオブジェクトを使用してください。terminatedタスクが終了した場合は、
Trueに設定してください。signumタスクを終了するために使用される信号番号。 これが
Noneで、終了がTrueの場合、:sig: `TERM` と見なす必要があります。expiredタスクの有効期限が切れた場合は、
Trueに設定してください。
task_unknown
ワーカーが登録されていないタスクのメッセージを受信したときにディスパッチされます。
送信者はワーカーConsumerです。
引数を提供します:
nameレジストリにタスクの名前が見つかりません。
idメッセージにあるタスクID。
message生のメッセージオブジェクト。
exc発生したエラー。
task_rejected
ワーカーがタスクキューの1つに不明なタイプのメッセージを受信したときにディスパッチされます。
送信者はワーカーConsumerです。
引数を提供します:
message生のメッセージオブジェクト。
exc発生したエラー(ある場合)。
アプリのシグナル
import_modules
このシグナルは、プログラム(ワーカー、ビート、シェル)などが:setting: `include` および:setting:` imports` 設定のモジュールをインポートするように要求したときに送信されます。 。
送信者はアプリインスタンスです。
労働者の合図
celeryd_after_setup
このシグナルは、ワーカーインスタンスがセットアップされた後、runを呼び出す前に送信されます。 これは、celery worker -Qオプションのすべてのキューが有効になっている、ログが設定されているなどを意味します。
celery worker -Qオプションを無視して、常に消費されるカスタムキューを追加するために使用できます。 ワーカーごとに直接キューを設定する例を次に示します。これらのキューを使用して、タスクを特定のワーカーにルーティングできます。
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
instance.app.amqp.queues.select_add(queue_name)
引数を提供します:
senderワーカーのノード名。
instanceconf現在のアプリの構成。
celeryd_init
これは、セロリワーカーが起動したときに送信される最初の信号です。 senderはワーカーのホスト名であるため、このシグナルを使用してワーカー固有の構成をセットアップできます。
from celery.signals import celeryd_init
@celeryd_init.connect(sender='[email protected]')
def configure_worker12(conf=None, **kwargs):
conf.task_default_rate_limit = '10/m'
または、複数のワーカーの構成をセットアップするには、接続時に送信者の指定を省略できます。
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
if sender in ('[email protected]', '[email protected]'):
conf.task_default_rate_limit = '10/m'
if sender == '[email protected]':
conf.worker_prefetch_multiplier = 0
引数を提供します:
senderワーカーのノード名。
instanceconf現在のアプリの構成。
optionsコマンドライン引数からワーカーに渡されるオプション(デフォルトを含む)。
worker_init
ワーカーが開始される前に派遣されます。
worker_ready
労働者が仕事を受け入れる準備ができたときに派遣されます。
heartbeat_sent
セロリがワーカーハートビートを送信したときにディスパッチされます。
送信者はcelery.worker.heartbeat.Heartインスタンスです。
worker_shutting_down
ワーカーがシャットダウンプロセスを開始するとディスパッチされます。
引数を提供します:
sig受信したPOSIX信号。
howシャットダウン方法、ウォームまたはコールド。
exitcodeメインプロセスが終了するときに使用される出口コード。
worker_process_init
開始時にすべてのプール子プロセスにディスパッチされます。
このシグナルにアタッチされたハンドラーは、4秒を超えてブロックしてはならないことに注意してください。そうしないと、プロセスが開始に失敗したと想定してプロセスが強制終了されます。
worker_process_shutdown
すべてのプール子プロセスで、終了する直前にディスパッチされます。
注:finallyブロックと同様に、このシグナルがディスパッチされる保証はありません。シャットダウン時にハンドラーが呼び出されることを保証することは不可能であり、呼び出された場合、中に中断される可能性があります。
引数を提供します:
pidシャットダウンしようとしている子プロセスのpid。
exitcode子プロセスが終了するときに使用される出口コード。
worker_shutdown
ワーカーがシャットダウンしようとしているときにディスパッチされます。
ビートシグナル
beat_init
セロリビートの開始時にディスパッチされます(スタンドアロンまたは組み込み)。
送信者はcelery.beat.Serviceインスタンスです。
beat_embedded_init
セロリビートが組み込みプロセスとして開始されると、:signal: `beat_init` シグナルに加えてディスパッチされます。
送信者はcelery.beat.Serviceインスタンスです。
イベントレット信号
eventlet_pool_started
イベントレットプールが開始されたときに送信されます。
送信者はcelery.concurrency.eventlet.TaskPoolインスタンスです。
eventlet_pool_preshutdown
イベントレットプールが残りのワーカーを待機するように要求される直前に、ワーカーがシャットダウンしたときに送信されます。
送信者はcelery.concurrency.eventlet.TaskPoolインスタンスです。
eventlet_pool_postshutdown
プールが参加し、ワーカーがシャットダウンする準備ができたときに送信されます。
送信者はcelery.concurrency.eventlet.TaskPoolインスタンスです。
eventlet_pool_apply
タスクがプールに適用されるたびに送信されます。
送信者はcelery.concurrency.eventlet.TaskPoolインスタンスです。
引数を提供します:
targetターゲット関数。
args位置引数。
kwargsキーワード引数。
ロギングシグナル
setup_logging
この信号が接続されている場合、Celeryはロガーを構成しないため、これを使用して、ログ構成を独自のもので完全にオーバーライドできます。
Celeryによるロギング構成設定を拡張したい場合は、:signal: `after_setup_logger` および:signal:` after_setup_task_logger` シグナルを使用できます。
引数を提供します:
loglevelロギングオブジェクトのレベル。
logfileログファイルの名前。
formatログ形式の文字列。
colorizeログメッセージに色を付けるかどうかを指定します。
after_setup_logger
すべてのグローバルロガー(タスクロガーではない)のセットアップ後に送信されます。 ロギング構成を拡張するために使用されます。
引数を提供します:
loggerロガーオブジェクト。
loglevelロギングオブジェクトのレベル。
logfileログファイルの名前。
formatログ形式の文字列。
colorizeログメッセージに色を付けるかどうかを指定します。
after_setup_task_logger
すべてのタスクロガーのセットアップ後に送信されます。 ロギング構成を拡張するために使用されます。
引数を提供します:
loggerロガーオブジェクト。
loglevelロギングオブジェクトのレベル。
logfileログファイルの名前。
formatログ形式の文字列。
colorizeログメッセージに色を付けるかどうかを指定します。
コマンド信号
user_preload_options
この信号は、Celeryコマンドラインプログラムのいずれかがユーザーのプリロードオプションの解析を終了した後に送信されます。
celery アンブレラコマンドにコマンドライン引数を追加するために使用できます。
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
'--monitoring', action='store_true',
help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
if options['monitoring']:
enable_monitoring()
送信者はCommandインスタンスであり、値は呼び出されたプログラムによって異なります(たとえば、アンブレラコマンドの場合はCeleryCommand)オブジェクトになります)。
引数を提供します:
appアプリインスタンス。
options解析されたユーザープリロードオプションのマッピング(デフォルト値を使用)。
非推奨のシグナル