シグナル—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/userguide/signals
移動先:案内検索

信号

シグナルにより、分離されたアプリケーションは、アプリケーションの他の場所で特定のアクションが発生したときに通知を受信できます。

Celeryには、アプリケーションが特定のアクションの動作を強化するためにフックできる多くのシグナルが付属しています。

基本

いくつかの種類のイベントがシグナルをトリガーします。これらのシグナルに接続して、トリガー時にアクションを実行できます。

:signal: `after_task_publish` シグナルへの接続例:

from celery.signals import after_task_publish

@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

一部の信号には、フィルタリングできる送信者もあります。 たとえば、:signal: `after_task_publish` シグナルはタスク名を送信者として使用するため、sender引数をconnectに指定することで、呼び出されるハンドラーを接続できます。 「proj.tasks.add」という名前のタスクが公開されるたびに:

@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

信号はdjango.core.dispatchと同じ実装を使用します。 その結果、他のキーワードパラメータ(例:signal)がデフォルトですべてのシグナルハンドラに渡されます。

シグナルハンドラのベストプラクティスは、任意のキーワード引数(つまり、**kwargs)を受け入れることです。 そうすれば、新しいCeleryバージョンは、ユーザーコードを壊すことなく引数を追加できます。


信号

タスク信号

before_task_publish

バージョン3.1の新機能。


タスクが公開される前にディスパッチされます。 これは、タスクを送信するプロセスで実行されることに注意してください。

送信者は、送信されるタスクの名前です。

引数を提供します:

  • body

    タスクメッセージ本文。

    これは、タスクメッセージフィールドを含むマッピングです。定義できる可能なフィールドのリファレンスについては、バージョン2 およびバージョン1 を参照してください。

  • exchange

    送信先の取引所の名前またはExchangeオブジェクト。

  • routing_key

    メッセージの送信時に使用するルーティングキー。

  • headers

    アプリケーションヘッダーのマッピング(変更可能)。

  • properties

    メッセージのプロパティ(変更可能)

  • declare

    メッセージを公開する前に宣言するエンティティのリスト(ExchangeQueue、またはbinding)。 変更できます。

  • retry_policy

    再試行オプションのマッピング。 kombu.Connection.ensure()の任意の引数にすることができ、変更することができます。


after_task_publish

タスクがブローカーに送信されたときにディスパッチされます。 これは、タスクを送信したプロセスで実行されることに注意してください。

送信者は、送信されるタスクの名前です。

引数を提供します:

  • headers

    タスクメッセージヘッダー。定義できるフィールドのリファレンスについては、バージョン2 およびバージョン1 を参照してください。

  • body

    タスクメッセージの本文。定義できるフィールドのリファレンスについては、バージョン2 およびバージョン1 を参照してください。

  • exchange

    交換の名前または使用されたExchangeオブジェクト。

  • routing_key

    使用されるルーティングキー。


task_prerun

タスクが実行される前にディスパッチされます。

Senderは、実行中のタスクオブジェクトです。

引数を提供します:

  • task_id

    実行するタスクのID。

  • task

    実行中のタスク。

  • args

    タスクの位置引数。

  • kwargs

    タスクのキーワード引数。


task_postrun

タスクが実行された後にディスパッチされます。

Senderは、実行されるタスクオブジェクトです。

引数を提供します:

  • task_id

    実行するタスクのID。

  • task

    実行中のタスク。

  • args

    タスクの位置引数。

  • kwargs

    タスクのキーワード引数。

  • retval

    タスクの戻り値。

  • state

    結果の状態の名前。


task_retry

タスクが再試行されるときにディスパッチされます。

送信者はタスクオブジェクトです。

引数を提供します:

  • request

    現在のタスクリクエスト。

  • reason

    再試行の理由(通常は例外インスタンスですが、常にstrに強制変換できます)。

  • einfo

    トレースバック(billiard.einfo.ExceptionInfoオブジェクト)を含む詳細な例外情報。


task_success

タスクが成功したときにディスパッチされます。

Senderは、実行されるタスクオブジェクトです。

引数を提供します

  • *; result
    タスクの戻り値。


task_failure

タスクが失敗したときにディスパッチされます。

Senderは、実行されるタスクオブジェクトです。

引数を提供します:

  • task_id

    タスクのID。

  • exception

    例外インスタンスが発生しました。

  • args

    タスクが呼び出された位置引数。

  • kwargs

    タスクが呼び出されたキーワード引数。

  • traceback

    スタックトレースオブジェクト。

  • einfo

    billiard.einfo.ExceptionInfoインスタンス。


task_internal_error

タスクの実行中に内部Celeryエラーが発生したときにディスパッチされます。

Senderは、実行されるタスクオブジェクトです。

引数を提供します:

  • task_id

    タスクのID。

  • args

    タスクが呼び出された位置引数。

  • kwargs

    タスクが呼び出されたキーワード引数。

  • request

    元のリクエスト辞書。 これは、例外が発生するまでにtask.requestの準備ができていない可能性があるためです。

  • exception

    例外インスタンスが発生しました。

  • traceback

    スタックトレースオブジェクト。

  • einfo

    billiard.einfo.ExceptionInfoインスタンス。


task_received

タスクがブローカーから受信され、実行の準備ができたときにディスパッチされます。

送信者はコンシューマーオブジェクトです。

引数を提供します:

  • request

    これはRequestインスタンスであり、task.requestではありません。 プリフォークプールを使用する場合、このシグナルは親プロセスでディスパッチされるため、task.requestは使用できないため、使用しないでください。 同じフィールドの多くを共有するため、代わりにこのオブジェクトを使用してください。


task_revoked

タスクがワーカーによって取り消された/終了したときにディスパッチされます。

送信者は、取り消された/終了したタスクオブジェクトです。

引数を提供します:

  • request

    これはRequestインスタンスであり、task.requestではありません。 プリフォークプールを使用する場合、このシグナルは親プロセスでディスパッチされるため、task.requestは使用できないため、使用しないでください。 同じフィールドの多くを共有するため、代わりにこのオブジェクトを使用してください。

  • terminated

    タスクが終了した場合は、Trueに設定してください。

  • signum

    タスクを終了するために使用される信号番号。 これがNoneで、終了がTrueの場合、:sig: `TERM` と見なす必要があります。

  • expired

    タスクの有効期限が切れた場合は、Trueに設定してください。


task_unknown

ワーカーが登録されていないタスクのメッセージを受信したときにディスパッチされます。

送信者はワーカーConsumerです。

引数を提供します:

  • name

    レジストリにタスクの名前が見つかりません。

  • id

    メッセージにあるタスクID。

  • message

    生のメッセージオブジェクト。

  • exc

    発生したエラー。


task_rejected

ワーカーがタスクキューの1つに不明なタイプのメッセージを受信したときにディスパッチされます。

送信者はワーカーConsumerです。

引数を提供します:

  • message

    生のメッセージオブジェクト。

  • exc

    発生したエラー(ある場合)。


アプリのシグナル

import_modules

このシグナルは、プログラム(ワーカー、ビート、シェル)などが:setting: `include` および:setting:` imports` 設定のモジュールをインポートするように要求したときに送信されます。 。

送信者はアプリインスタンスです。


労働者の合図

celeryd_after_setup

このシグナルは、ワーカーインスタンスがセットアップされた後、runを呼び出す前に送信されます。 これは、celery worker -Qオプションのすべてのキューが有効になっている、ログが設定されているなどを意味します。

celery worker -Qオプションを無視して、常に消費されるカスタムキューを追加するために使用できます。 ワーカーごとに直接キューを設定する例を次に示します。これらのキューを使用して、タスクを特定のワーカーにルーティングできます。

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    instance.app.amqp.queues.select_add(queue_name)

引数を提供します:

  • sender

    ワーカーのノード名。

  • instance

    これは、初期化されるcelery.apps.worker.Workerインスタンスです。 これまでのところ、 app 属性と hostname (ノード名)属性のみが設定されており、残りの__init__は実行されていないことに注意してください。

  • conf

    現在のアプリの構成。


celeryd_init

これは、セロリワーカーが起動したときに送信される最初の信号です。 senderはワーカーのホスト名であるため、このシグナルを使用してワーカー固有の構成をセットアップできます。

from celery.signals import celeryd_init

@celeryd_init.connect(sender='[email protected]')
def configure_worker12(conf=None, **kwargs):
    conf.task_default_rate_limit = '10/m'

または、複数のワーカーの構成をセットアップするには、接続時に送信者の指定を省略できます。

from celery.signals import celeryd_init

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    if sender in ('[email protected]', '[email protected]'):
        conf.task_default_rate_limit = '10/m'
    if sender == '[email protected]':
        conf.worker_prefetch_multiplier = 0

引数を提供します:

  • sender

    ワーカーのノード名。

  • instance

    これは、初期化されるcelery.apps.worker.Workerインスタンスです。 これまでのところ、 app 属性と hostname (ノード名)属性のみが設定されており、残りの__init__は実行されていないことに注意してください。

  • conf

    現在のアプリの構成。

  • options

    コマンドライン引数からワーカーに渡されるオプション(デフォルトを含む)。


worker_init

ワーカーが開始される前に派遣されます。


worker_ready

労働者が仕事を受け入れる準備ができたときに派遣されます。


heartbeat_sent

セロリがワーカーハートビートを送信したときにディスパッチされます。

送信者はcelery.worker.heartbeat.Heartインスタンスです。


worker_shutting_down

ワーカーがシャットダウンプロセスを開始するとディスパッチされます。

引数を提供します:

  • sig

    受信したPOSIX信号。

  • how

    シャットダウン方法、ウォームまたはコールド。

  • exitcode

    メインプロセスが終了するときに使用される出口コード。


worker_process_init

開始時にすべてのプール子プロセスにディスパッチされます。

このシグナルにアタッチされたハンドラーは、4秒を超えてブロックしてはならないことに注意してください。そうしないと、プロセスが開始に失敗したと想定してプロセスが強制終了されます。


worker_process_shutdown

すべてのプール子プロセスで、終了する直前にディスパッチされます。

注:finallyブロックと同様に、このシグナルがディスパッチされる保証はありません。シャットダウン時にハンドラーが呼び出されることを保証することは不可能であり、呼び出された場合、中に中断される可能性があります。

引数を提供します:

  • pid

    シャットダウンしようとしている子プロセスのpid。

  • exitcode

    子プロセスが終了するときに使用される出口コード。


worker_shutdown

ワーカーがシャットダウンしようとしているときにディスパッチされます。


ビートシグナル

beat_init

セロリビートの開始時にディスパッチされます(スタンドアロンまたは組み込み)。

送信者はcelery.beat.Serviceインスタンスです。


beat_embedded_init

セロリビートが組み込みプロセスとして開始されると、:signal: `beat_init` シグナルに加えてディスパッチされます。

送信者はcelery.beat.Serviceインスタンスです。


イベントレット信号

eventlet_pool_started

イベントレットプールが開始されたときに送信されます。

送信者はcelery.concurrency.eventlet.TaskPoolインスタンスです。


eventlet_pool_preshutdown

イベントレットプールが残りのワーカーを待機するように要求される直前に、ワーカーがシャットダウンしたときに送信されます。

送信者はcelery.concurrency.eventlet.TaskPoolインスタンスです。


eventlet_pool_postshutdown

プールが参加し、ワーカーがシャットダウンする準備ができたときに送信されます。

送信者はcelery.concurrency.eventlet.TaskPoolインスタンスです。


eventlet_pool_apply

タスクがプールに適用されるたびに送信されます。

送信者はcelery.concurrency.eventlet.TaskPoolインスタンスです。

引数を提供します:

  • target

    ターゲット関数。

  • args

    位置引数。

  • kwargs

    キーワード引数。


ロギングシグナル

setup_logging

この信号が接続されている場合、Celeryはロガーを構成しないため、これを使用して、ログ構成を独自のもので完全にオーバーライドできます。

Celeryによるロギング構成設定を拡張したい場合は、:signal: `after_setup_logger` および:signal:` after_setup_task_logger` シグナルを使用できます。

引数を提供します:

  • loglevel

    ロギングオブジェクトのレベル。

  • logfile

    ログファイルの名前。

  • format

    ログ形式の文字列。

  • colorize

    ログメッセージに色を付けるかどうかを指定します。


after_setup_logger

すべてのグローバルロガー(タスクロガーではない)のセットアップ後に送信されます。 ロギング構成を拡張するために使用されます。

引数を提供します:

  • logger

    ロガーオブジェクト。

  • loglevel

    ロギングオブジェクトのレベル。

  • logfile

    ログファイルの名前。

  • format

    ログ形式の文字列。

  • colorize

    ログメッセージに色を付けるかどうかを指定します。


after_setup_task_logger

すべてのタスクロガーのセットアップ後に送信されます。 ロギング構成を拡張するために使用されます。

引数を提供します:

  • logger

    ロガーオブジェクト。

  • loglevel

    ロギングオブジェクトのレベル。

  • logfile

    ログファイルの名前。

  • format

    ログ形式の文字列。

  • colorize

    ログメッセージに色を付けるかどうかを指定します。


コマンド信号

user_preload_options

この信号は、Celeryコマンドラインプログラムのいずれかがユーザーのプリロードオプションの解析を終了した後に送信されます。

celery アンブレラコマンドにコマンドライン引数を追加するために使用できます。

from celery import Celery
from celery import signals
from celery.bin.base import Option

app = Celery()
app.user_options['preload'].add(Option(
    '--monitoring', action='store_true',
    help='Enable our external monitoring utility, blahblah',
))

@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
    if options['monitoring']:
        enable_monitoring()

送信者はCommandインスタンスであり、値は呼び出されたプログラムによって異なります(たとえば、アンブレラコマンドの場合はCeleryCommand)オブジェクトになります)。

引数を提供します:

  • app

    アプリインスタンス。

  • options

    解析されたユーザープリロードオプションのマッピング(デフォルト値を使用)。


非推奨のシグナル

task_sent

このシグナルは非推奨です。代わりに:signal: `after_task_publish` を使用してください。