メッセージプロトコル—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/internals/protocol
移動先:案内検索

メッセージプロトコル

タスクメッセージ

バージョン2

意味

properties = {
    'correlation_id': uuid task_id,
    'content_type': string mimetype,
    'content_encoding': string encoding,

    # optional
    'reply_to': string queue_or_url,
}
headers = {
    'lang': string 'py'
    'task': string task,
    'id': uuid task_id,
    'root_id': uuid root_id,
    'parent_id': uuid parent_id,
    'group': uuid group_id,

    # optional
    'meth': string method_name,
    'shadow': string alias_name,
    'eta': iso8601 ETA,
    'expires': iso8601 expires,
    'retries': int retries,
    'timelimit': (soft, hard),
    'argsrepr': str repr(args),
    'kwargsrepr': str repr(kwargs),
    'origin': str nodename,
    'replaced_task_nesting': int
}

body = (
    object[] args,
    Mapping kwargs,
    Mapping embed {
        'callbacks': Signature[] callbacks,
        'errbacks': Signature[] errbacks,
        'chain': Signature[] chain,
        'chord': Signature chord_callback,
    }
)

この例では、プロトコルのバージョン2を使用してタスクメッセージを送信します。

# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8

import json
import os
import socket

task_id = uuid()
args = (2, 2)
kwargs = {}
basic_publish(
    message=json.dumps((args, kwargs, None)),
    application_headers={
        'lang': 'py',
        'task': 'proj.tasks.add',
        'argsrepr': repr(args),
        'kwargsrepr': repr(kwargs),
        'origin': '@'.join([os.getpid(), socket.gethostname()])
    }
    properties={
        'correlation_id': task_id,
        'content_type': 'application/json',
        'content_encoding': 'utf-8',
    }
)

バージョン1からの変更

  • taskメッセージヘッダーの存在によって検出されたプロトコルバージョン。

  • langヘッダーによる複数言語のサポート。

    ワーカーは、その言語をサポートするワーカーにメッセージをリダイレクトできます。

  • メタデータはヘッダーに移動しました。

    これは、ワーカー/中間体がペイロードをデコードせずにメッセージを検査し、ヘッダーに基づいて決定を下すことができることを意味します(たとえば、Python固有のpickleシリアライザーによってシリアル化される言語固有の場合があります)。

  • 常にUTC

    utcフラグはもうないので、タイムゾーンが欠落している時間情報はUTC時間であると予想されます。

  • 本文は言語固有のデータ専用です。

    • Pythonは、args / kwargsと埋め込み署名をbodyに格納します。

    • メッセージがrawエンコーディングを使用している場合、rawデータは単一の引数として関数に渡されます。

    • Java / Cなど Thrift / protobufドキュメントを本文として使用できます


  • originは、タスクを送信するノードの名前です。

  • taskmethヘッダーに基づくアクターへのディスパッチ

    methはPythonでは使用されませんが、将来、クラスとメソッドのペアを指定するために使用される可能性があります。

  • チェーンは専用フィールドを獲得します。

    チェーンを再帰的なcallbacks引数に減らすと、再帰制限を超えたときに問題が発生します。

    これは、署名のリストを指定することによって新しいメッセージプロトコルで修正され、各タスクは次のメッセージを送信するときにリストからタスクをポップします。

    execute_task(message)
    chain = embed['chain']
    if chain:
        sig = maybe_signature(chain.pop())
        sig.apply_async(chain=chain)
  • correlation_idtask_idフィールドを置き換えます。

  • root_idおよびparent_idフィールドは、ワークフローの追跡に役立ちます。

  • shadowを使用すると、ログに別の名前を指定できます。モニターは、引数として指定された関数を呼び出すタスクなどの概念に使用できます。

    from celery.utils.imports import qualname
    
    class PickleTask(Task):
    
        def unpack_args(self, fun, args=()):
            return fun, args
    
        def apply_async(self, args, kwargs, **options):
            fun, real_args = self.unpack_args(*args)
            return super().apply_async(
                (fun, real_args, kwargs), shadow=qualname(fun), **options
            )
    
    @app.task(base=PickleTask)
    def call(fun, args, kwargs):
        return fun(*args, **kwargs)


バージョン1

プロトコルのバージョン1では、すべてのフィールドがメッセージ本文に格納されます。つまり、ワーカーと中間コンシューマーは、フィールドを読み取るためにペイロードを逆シリアル化する必要があります。

メッセージ本文

  • task
    ストリング

    タスクの名前。 必須

  • id
    ストリング

    タスクの一意のID(UUID)。 必須

  • args
    リスト

    引数のリスト。 指定しない場合は空のリストになります。

  • kwargs
    辞書

    キーワード引数の辞書。 指定しない場合、空の辞書になります。

  • retries
    int

    このタスクが再試行された現在の回数。 指定しない場合、デフォルトは 0 です。

  • eta
    文字列(ISO 8601)

    到着予定時刻。 これは、ISO8601形式の日付と時刻です。 指定されていない場合、メッセージはスケジュールされていませんが、できるだけ早く実行されます。

  • expires
    文字列(ISO 8601)

    バージョン2.0.2の新機能。

    有効期限。 これは、ISO8601形式の日付と時刻です。 指定しない場合、メッセージは期限切れになりません。 メッセージを受信し、有効期限を過ぎると、メッセージは期限切れになります。

  • taskset
    ストリング

    このタスクが含まれるグループ(存在する場合)。

  • chord
    サイン

    バージョン2.3の新機能。

    このタスクがコードのヘッダー部分の1つであることを示します。 このキーの値は、ヘッダー内のすべてのタスクが戻ったときに実行する必要があるコードの本体です。

  • utc
    ブール

    バージョン2.5の新機能。

    真の時刻がUTCタイムゾーンを使用する場合、そうでない場合は現在のローカルタイムゾーンを使用する必要があります。

  • callbacks
    サイン

    バージョン3.0の新機能。

    タスクが正常に終了した場合に呼び出す署名のリスト。

  • errbacks
    サイン

    バージョン3.0の新機能。

    タスクの実行中にエラーが発生した場合に呼び出す署名のリスト。

  • timelimit
    (float、float)

    バージョン3.1の新機能。

    タスク実行時間制限の設定。 これは、ハードとソフトの制限時間値のタプルです( int / float またはNoneは制限なし)。

    3秒のソフト制限時間と10秒のハード時間制限を指定する値の例:

    {'timelimit': (3.0, 10.0)}


メッセージの例

これは、json形式の celery.task.ping タスクの呼び出し例です。

{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
 "task": "celery.task.PingTask",
 "args": [],
 "kwargs": {},
 "retries": 0,
 "eta": "2009-11-17T12:30:56.527191"}

タスクのシリアル化

content_type メッセージヘッダーを使用して、いくつかのタイプのシリアル化形式がサポートされています。

次の表に、デフォルトでサポートされているMIMEタイプを示します。

図式 MIMEタイプ
json アプリケーション/ json
yaml application / x-yaml
きゅうりのピクルス application / x-python-serialize
msgpack application / x-msgpack


イベントメッセージ

イベントメッセージは常にJSONでシリアル化され、任意のメッセージ本文フィールドを含めることができます。

バージョン4.0以降。 本文は、単一のマッピング(1つのイベント)またはマッピングのリスト(複数のイベント)のいずれかで構成できます。

イベントメッセージに常に存在しなければならない標準フィールドもあります。

標準のボディフィールド

  • ストリング type

    イベントの種類。 これは、ダッシュ区切り文字(task-succeededなど)で区切られたカテゴリアクションを含む文字列です。

  • ストリング hostname

    イベントが発生した場所の完全修飾ホスト名。

  • unsigned long long clock

    このイベントの論理クロック値(ランポートタイムスタンプ)。

  • 浮く timestamp

    イベントが発生した時刻に対応するUNIXタイムスタンプ。

  • 短い署名 utcoffset

    このフィールドは、発信元ホストのタイムゾーンを記述し、UTCの前後の時間数として指定されます(例:-2または+1)。

  • unsigned long long pid

    イベントが発生したプロセスのプロセスID。


標準のイベントタイプ

標準のイベントタイプとそのフィールドのリストについては、イベントリファレンスを参照してください。


メッセージの例

これは、task-succeededイベントのメッセージフィールドです。

properties = {
    'routing_key': 'task.succeeded',
    'exchange': 'celeryev',
    'content_type': 'application/json',
    'content_encoding': 'utf-8',
    'delivery_mode': 1,
}
headers = {
    'hostname': '[email protected]',
}
body = {
    'type': 'task-succeeded',
    'hostname': '[email protected]',
    'pid': 6335,
    'clock': 393912923921,
    'timestamp': 1401717709.101747,
    'utcoffset': -1,
    'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
    'retval': '4',
    'runtime': 0.0003212,
)