メッセージプロトコル—Pythonドキュメント
メッセージプロトコル
タスクメッセージ
バージョン2
意味
properties = {
'correlation_id': uuid task_id,
'content_type': string mimetype,
'content_encoding': string encoding,
# optional
'reply_to': string queue_or_url,
}
headers = {
'lang': string 'py'
'task': string task,
'id': uuid task_id,
'root_id': uuid root_id,
'parent_id': uuid parent_id,
'group': uuid group_id,
# optional
'meth': string method_name,
'shadow': string alias_name,
'eta': iso8601 ETA,
'expires': iso8601 expires,
'retries': int retries,
'timelimit': (soft, hard),
'argsrepr': str repr(args),
'kwargsrepr': str repr(kwargs),
'origin': str nodename,
'replaced_task_nesting': int
}
body = (
object[] args,
Mapping kwargs,
Mapping embed {
'callbacks': Signature[] callbacks,
'errbacks': Signature[] errbacks,
'chain': Signature[] chain,
'chord': Signature chord_callback,
}
)
例
この例では、プロトコルのバージョン2を使用してタスクメッセージを送信します。
# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
import json
import os
import socket
task_id = uuid()
args = (2, 2)
kwargs = {}
basic_publish(
message=json.dumps((args, kwargs, None)),
application_headers={
'lang': 'py',
'task': 'proj.tasks.add',
'argsrepr': repr(args),
'kwargsrepr': repr(kwargs),
'origin': '@'.join([os.getpid(), socket.gethostname()])
}
properties={
'correlation_id': task_id,
'content_type': 'application/json',
'content_encoding': 'utf-8',
}
)
バージョン1からの変更
task
メッセージヘッダーの存在によって検出されたプロトコルバージョン。lang
ヘッダーによる複数言語のサポート。ワーカーは、その言語をサポートするワーカーにメッセージをリダイレクトできます。
メタデータはヘッダーに移動しました。
これは、ワーカー/中間体がペイロードをデコードせずにメッセージを検査し、ヘッダーに基づいて決定を下すことができることを意味します(たとえば、Python固有のpickleシリアライザーによってシリアル化される言語固有の場合があります)。
常にUTC
utc
フラグはもうないので、タイムゾーンが欠落している時間情報はUTC時間であると予想されます。本文は言語固有のデータ専用です。
Pythonは、args / kwargsと埋め込み署名をbodyに格納します。
メッセージがrawエンコーディングを使用している場合、rawデータは単一の引数として関数に渡されます。
Java / Cなど Thrift / protobufドキュメントを本文として使用できます
origin
は、タスクを送信するノードの名前です。task
、meth
ヘッダーに基づくアクターへのディスパッチmeth
はPythonでは使用されませんが、将来、クラスとメソッドのペアを指定するために使用される可能性があります。チェーンは専用フィールドを獲得します。
チェーンを再帰的な
callbacks
引数に減らすと、再帰制限を超えたときに問題が発生します。これは、署名のリストを指定することによって新しいメッセージプロトコルで修正され、各タスクは次のメッセージを送信するときにリストからタスクをポップします。
execute_task(message) chain = embed['chain'] if chain: sig = maybe_signature(chain.pop()) sig.apply_async(chain=chain)
correlation_id
はtask_id
フィールドを置き換えます。root_id
およびparent_id
フィールドは、ワークフローの追跡に役立ちます。shadow
を使用すると、ログに別の名前を指定できます。モニターは、引数として指定された関数を呼び出すタスクなどの概念に使用できます。from celery.utils.imports import qualname class PickleTask(Task): def unpack_args(self, fun, args=()): return fun, args def apply_async(self, args, kwargs, **options): fun, real_args = self.unpack_args(*args) return super().apply_async( (fun, real_args, kwargs), shadow=qualname(fun), **options ) @app.task(base=PickleTask) def call(fun, args, kwargs): return fun(*args, **kwargs)
バージョン1
プロトコルのバージョン1では、すべてのフィールドがメッセージ本文に格納されます。つまり、ワーカーと中間コンシューマーは、フィールドを読み取るためにペイロードを逆シリアル化する必要があります。
メッセージ本文
task
- ストリング
タスクの名前。 必須
id
- ストリング
タスクの一意のID(UUID)。 必須
args
- リスト
引数のリスト。 指定しない場合は空のリストになります。
kwargs
- 辞書
キーワード引数の辞書。 指定しない場合、空の辞書になります。
retries
- int
このタスクが再試行された現在の回数。 指定しない場合、デフォルトは 0 です。
eta
- 文字列(ISO 8601)
到着予定時刻。 これは、ISO8601形式の日付と時刻です。 指定されていない場合、メッセージはスケジュールされていませんが、できるだけ早く実行されます。
expires
- 文字列(ISO 8601)
バージョン2.0.2の新機能。
有効期限。 これは、ISO8601形式の日付と時刻です。 指定しない場合、メッセージは期限切れになりません。 メッセージを受信し、有効期限を過ぎると、メッセージは期限切れになります。
taskset
- ストリング
このタスクが含まれるグループ(存在する場合)。
chord
- サイン
バージョン2.3の新機能。
このタスクがコードのヘッダー部分の1つであることを示します。 このキーの値は、ヘッダー内のすべてのタスクが戻ったときに実行する必要があるコードの本体です。
utc
- ブール
バージョン2.5の新機能。
真の時刻がUTCタイムゾーンを使用する場合、そうでない場合は現在のローカルタイムゾーンを使用する必要があります。
callbacks
- サイン
バージョン3.0の新機能。
タスクが正常に終了した場合に呼び出す署名のリスト。
errbacks
- サイン
バージョン3.0の新機能。
タスクの実行中にエラーが発生した場合に呼び出す署名のリスト。
timelimit
- (float、float)
バージョン3.1の新機能。
タスク実行時間制限の設定。 これは、ハードとソフトの制限時間値のタプルです( int / float または
None
は制限なし)。3秒のソフト制限時間と10秒のハード時間制限を指定する値の例:
{'timelimit': (3.0, 10.0)}
メッセージの例
これは、json形式の celery.task.ping タスクの呼び出し例です。
{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
タスクのシリアル化
content_type メッセージヘッダーを使用して、いくつかのタイプのシリアル化形式がサポートされています。
次の表に、デフォルトでサポートされているMIMEタイプを示します。
図式 MIMEタイプ json アプリケーション/ json yaml application / x-yaml きゅうりのピクルス application / x-python-serialize msgpack application / x-msgpack
イベントメッセージ
イベントメッセージは常にJSONでシリアル化され、任意のメッセージ本文フィールドを含めることができます。
バージョン4.0以降。 本文は、単一のマッピング(1つのイベント)またはマッピングのリスト(複数のイベント)のいずれかで構成できます。
イベントメッセージに常に存在しなければならない標準フィールドもあります。
標準のボディフィールド
ストリング
type
イベントの種類。 これは、ダッシュ区切り文字(
task-succeeded
など)で区切られたカテゴリとアクションを含む文字列です。ストリング
hostname
イベントが発生した場所の完全修飾ホスト名。
unsigned long long
clock
このイベントの論理クロック値(ランポートタイムスタンプ)。
浮く
timestamp
イベントが発生した時刻に対応するUNIXタイムスタンプ。
短い署名
utcoffset
このフィールドは、発信元ホストのタイムゾーンを記述し、UTCの前後の時間数として指定されます(例:-2または+1)。
unsigned long long
pid
イベントが発生したプロセスのプロセスID。
メッセージの例
これは、task-succeeded
イベントのメッセージフィールドです。
properties = {
'routing_key': 'task.succeeded',
'exchange': 'celeryev',
'content_type': 'application/json',
'content_encoding': 'utf-8',
'delivery_mode': 1,
}
headers = {
'hostname': '[email protected]',
}
body = {
'type': 'task-succeeded',
'hostname': '[email protected]',
'pid': 6335,
'clock': 393912923921,
'timestamp': 1401717709.101747,
'utcoffset': -1,
'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
'retval': '4',
'runtime': 0.0003212,
)