タスクの呼び出し—Pythonドキュメント
呼び出しタスク
- 基本
- リンク(コールバック/エラーバック)
- メッセージについて
- ETAとカウントダウン
- 有効期限
- メッセージ送信の再試行
- 接続エラー処理
- シリアライザー
- 圧縮
- 接続
- ルーティングオプション
- 結果オプション
基本
このドキュメントでは、タスクインスタンスと canvas で使用されるCeleryの統一された「CallingAPI」について説明します。
APIは、実行オプションの標準セットと3つのメソッドを定義します。
apply_async(args[, kwargs[, …]])
タスクメッセージを送信します。
delay(*args, **kwargs)
タスクメッセージを送信するためのショートカットですが、実行オプションはサポートされていません。
呼び出し(
__call__
)呼び出し元のAPIをサポートするオブジェクト(
add(2, 2)
など)を適用すると、タスクはワーカーによって実行されるのではなく、現在のプロセスで実行されます(メッセージは送信されません)。
クイックチートシート
- *;
T.delay(arg, kwarg=value)
.apply_async
へのスター引数のショートカット。 (.delay(*args, **kwargs)
は.apply_async(args, kwargs)
を呼び出します)。
T.apply_async((arg,), {'kwarg': value})
- *;
T.apply_async(countdown=10)
- 今から10秒で実行されます。
- *;
T.apply_async(eta=now + timedelta(seconds=10))
eta
を使用して指定された今から10秒後に実行されます
- *;
T.apply_async(countdown=60, expires=120)
- 今から1分で実行されますが、2分後に期限切れになります。
- *;
T.apply_async(expires=now + timedelta(days=2))
datetime
を使用して設定すると2日で期限切れになります。
例
delay()
メソッドは、通常の関数を呼び出すように見えるので便利です。
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
代わりにapply_async()
を使用して、次のように記述する必要があります。
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
ヒント
タスクが現在のプロセスに登録されていない場合は、代わりに@send_task()
を使用してタスクを名前で呼び出すことができます。
したがって、 delay は明らかに便利ですが、追加の実行オプションを設定する場合は、apply_async
を使用する必要があります。
このドキュメントの残りの部分では、タスク実行オプションについて詳しく説明します。 すべての例では、 add というタスクを使用して、2つの引数の合計を返します。
@app.task
def add(x, y):
return x + y
別の方法があります…
これについては、後で Canvas について読みながら詳しく説明しますが、signature
は、タスク呼び出しの署名を渡すために使用されるオブジェクトです(たとえば、ネットワーク)、およびそれらはCalling APIもサポートします:
task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
リンク(コールバック/エラーバック)
Celeryは、あるタスクが別のタスクに続くように、タスクをリンクすることをサポートしています。 コールバックタスクは、親タスクの結果を部分引数として適用されます。
add.apply_async((2, 2), link=add.s(16))
s
とは何ですか?
ここで使用されるadd.s
呼び出しは、署名と呼ばれます。 それらが何であるかわからない場合は、キャンバスガイドでそれらについて読む必要があります。 ここでは、chain
についても学ぶことができます。これは、タスクを連鎖させるためのより簡単な方法です。
実際には、link
実行オプションは内部プリミティブと見なされ、おそらく直接使用するのではなく、代わりにチェーンを使用します。
ここで、最初のタスク(4)の結果は、前の結果に16を追加する新しいタスクに送信され、式 \((2 + 2)+ 16 = 20 \)を形成します。
タスクで例外が発生した場合にコールバックを適用することもできます( errback )。 ワーカーは実際にはerrbackをタスクとして呼び出すことはありませんが、代わりにerrback関数を直接呼び出して、生の要求、例外、およびトレースバックオブジェクトをタスクに渡すことができるようにします。
これはエラーコールバックの例です。
@app.task
def error_handler(request, exc, traceback):
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
request.id, exc, traceback))
link_error
実行オプションを使用してタスクに追加できます。
add.apply_async((2, 2), link_error=error_handler.s())
さらに、link
オプションとlink_error
オプションの両方をリストとして表すことができます。
add.apply_async((2, 2), link=[add.s(16), other_task.s()])
次に、コールバック/エラーバックが順番に呼び出され、すべてのコールバックが親タスクの戻り値を部分引数として呼び出されます。
メッセージについて
Celeryは、on_messageコールバックを設定することにより、すべての状態の変化をキャッチすることをサポートしています。
たとえば、実行時間の長いタスクでタスクの進行状況を送信するには、次のようにします。
@app.task(bind=True)
def hello(self, a, b):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 50})
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 90})
time.sleep(1)
return 'hello world: %i' % (a+b)
def on_raw_message(body):
print(body)
a, b = 1, 1
r = hello.apply_async(args=(a, b))
print(r.get(on_message=on_raw_message, propagate=False))
次のような出力を生成します:
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 50},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 90},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': 'hello world: 10',
'children': [],
'status': 'SUCCESS',
'traceback': None}
hello world: 10
ETAとカウントダウン
ETA(到着予定時刻)を使用すると、タスクが実行される最も早い時刻である特定の日時を設定できます。 countdown は、ETAを秒単位で設定するためのショートカットです。
>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get() # this takes at least 3 seconds to return
20
タスクは、指定された日時の後のある時点で実行されることが保証されていますが、必ずしもその正確な時刻である必要はありません。 期限が切れた理由として考えられるのは、キューで待機している多くのアイテムや、ネットワークの遅延が大きいことです。 タスクがタイムリーに実行されるようにするには、キューの輻輳を監視する必要があります。 Muninまたは同様のツールを使用してアラートを受信すると、適切なアクションを実行してワークロードを軽減できます。 Munin を参照してください。
countdown は整数ですが、 eta はdatetime
オブジェクトである必要があり、正確な日付と時刻(ミリ秒の精度、タイムゾーン情報を含む)を指定します。
>>> from datetime import datetime, timedelta
>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
警告
countdown
を15分以上指定するときにメッセージブローカーとしてRabbitMQを使用すると、ワーカーがPreconditionFailed
エラーで終了するという問題が発生する場合があります。
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - consumer ack timed out on channel
バージョン3.8.15以降のRabbitMQでは、consumer_timeout
のデフォルト値は15分です。 バージョン3.8.17以降、30分に延長されました。 コンシューマーがタイムアウト値を超えて配信を確認しない場合、そのチャネルはPRECONDITION_FAILED
チャネル例外で閉じられます。 詳細については、配信確認タイムアウトを参照してください。
この問題を解決するには、RabbitMQ構成ファイルrabbitmq.conf
で、カウントダウン値以上のconsumer_timeout
パラメーターを指定する必要があります。 たとえば、consumer_timeout = 31622400000
の非常に大きな値(ミリ秒単位で1年に相当)を指定して、将来の問題を回避できます。
有効期限
expires 引数は、オプションの有効期限を、タスクの公開後の秒数、またはdatetime
を使用した特定の日時として定義します。
>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)
>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
... expires=datetime.now() + timedelta(days=1)
ワーカーが期限切れのタスクを受信すると、タスクを:state: `REVOKED` (@TaskRevokedError
)としてマークします。
メッセージ送信の再試行
Celeryは、接続に失敗した場合にメッセージの送信を自動的に再試行します。再試行の動作(再試行の頻度や最大再試行回数など)を構成するか、すべて一緒に無効にすることができます。
再試行を無効にするには、retry
実行オプションをFalse
に設定できます。
add.apply_async((2, 2), retry=False)
再試行ポリシー
再試行ポリシーは、再試行の動作を制御するマッピングであり、次のキーを含めることができます。
max_retries
諦めるまでの最大再試行回数。この場合、再試行が失敗する原因となった例外が発生します。
None
の値は、永久に再試行することを意味します。デフォルトでは、3回再試行します。
interval_start
再試行の間に待機する秒数(浮動小数点または整数)を定義します。 デフォルトは0です(最初の再試行は瞬時に行われます)。
interval_step
連続して再試行するたびに、この数値が再試行遅延(浮動小数点または整数)に追加されます。 デフォルトは0.2です。
interval_max
再試行の間に待機する最大秒数(浮動小数点または整数)。 デフォルトは0.2です。
たとえば、デフォルトのポリシーは次のように相関します。
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
再試行に費やされる最大時間は0.4秒です。 ブローカー接続がダウンしている場合、接続障害が再試行パイル効果につながる可能性があるため、デフォルトでは比較的短く設定されています。たとえば、多くのWebサーバープロセスが再試行を待機し、他の着信要求をブロックします。
接続エラー処理
タスクを送信してメッセージ転送接続が失われた場合、または接続を開始できない場合、OperationalError
エラーが発生します。
>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 388, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 503, in apply_async
**options
File "celery/app/base.py", line 662, in send_task
amqp.send_task_message(P, name, message, **options)
File "celery/backends/rpc.py", line 275, in on_task_call
maybe_declare(self.binding(producer.channel), retry=True)
File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
channel = self._channel = channel()
File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
self.transport.connect()
File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
self.sock.connect(sa)
kombu.exceptions.OperationalError: [Errno 61] Connection refused
再試行を有効にしている場合、これは再試行が終了した後、またはすぐに無効にした場合にのみ発生します。
このエラーも処理できます。
>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... logger.exception('Sending task raised: %r', exc)
シリアライザー
安全
pickleモジュールでは、任意の機能を実行できます。セキュリティガイドを参照してください。
Celeryには、暗号化を使用してメッセージに署名する特別なシリアライザーも付属しています。
クライアントとワーカー間で転送されるデータはシリアル化する必要があるため、Celeryのすべてのメッセージには、エンコードに使用されるシリアル化方法を説明するcontent_type
ヘッダーがあります。
デフォルトのシリアライザーは JSON ですが、:setting: `task_serializer` 設定を使用して、または個々のタスクごとに、あるいはメッセージごとに、これを変更できます。
JSON 、pickle
、 YAML 、msgpack
のサポートが組み込まれており、独自のカスタムシリアライザーをに登録して追加することもできます。昆布シリアライザーレジストリ
も参照してください
昆布ユーザーガイドのメッセージシリアル化。
各オプションには長所と短所があります。
- json – JSONは多くのプログラミング言語でサポートされていますが、現在は
Pythonの標準部分(2.6以降)であり、:pypi: `simplejson` などの最新のPythonライブラリを使用してデコードするのはかなり高速です。
JSONの主な欠点は、文字列、Unicode、浮動小数点数、ブール値、辞書、リストのデータ型に制限されることです。 小数と日付が特に欠落しています。
Binary data will be transferred using Base64 encoding, increasing the size of the transferred data by 34% compared to an encoding format where native binary types are supported.
ただし、データが上記の制約内に収まり、言語間のサポートが必要な場合は、JSONのデフォルト設定がおそらく最良の選択です。
詳細については、 http://json.org を参照してください。
ノート
(Python公式ドキュメント https://docs.python.org/3.6/library/json.html から)JSONのキー/値ペアのキーは常に
str
タイプです。 辞書がJSONに変換されると、辞書のすべてのキーが文字列に強制されます。 この結果、辞書がJSONに変換されてから辞書に戻されると、辞書が元の辞書と一致しない場合があります。 つまり、xに文字列以外のキーがある場合はloads(dumps(x)) != x
です。- ピクルス–以外の言語をサポートしたくない場合
Pythonの場合、pickleエンコーディングを使用すると、すべての組み込みPythonデータ型(クラスインスタンスを除く)のサポート、バイナリファイル送信時の小さなメッセージ、JSON処理のわずかな高速化が得られます。
詳細については、
pickle
を参照してください。- yaml – YAMLには、jsonと同じ特性の多くがあります。
ただし、より多くのデータ型(日付、再帰参照など)をネイティブにサポートします。
ただし、YAML用のPythonライブラリは、JSON用のライブラリよりもかなり低速です。
より表現力豊かなデータ型のセットが必要で、言語間の互換性を維持する必要がある場合は、YAMLが上記よりも適している可能性があります。
詳細については、 http://yaml.org/を参照してください。
- msgpack – msgpackは、JSONに近いバイナリシリアル化形式です。
機能で。 ただし、これは非常に若いため、この時点でサポートは実験的なものと見なす必要があります。
詳細については、 http://msgpack.org/を参照してください。
使用されるエンコーディングはメッセージヘッダーとして利用できるため、ワーカーはタスクを逆シリアル化する方法を知っています。 カスタムシリアライザーを使用する場合、このシリアライザーはワーカーが使用できる必要があります。
次の順序は、タスクの送信時に使用されるシリアライザーを決定するために使用されます。
- シリアライザー実行オプション。
@-Task.serializer
属性- :setting: `task_serializer` 設定。
単一のタスク呼び出しにカスタムシリアライザーを設定する例:
>>> add.apply_async((10, 10), serializer='json')
圧縮
Celeryは、次の組み込みスキームを使用してメッセージを圧縮できます。
brotli
brotliは、Web、特に小さなテキストドキュメント用に最適化されています。 フォントやHTMLページなどの静的コンテンツを提供するのに最も効果的です。
これを使用するには、Celeryを次のコマンドでインストールします。
$ pip install celery[brotli]
bzip2
bzip2はgzipよりも小さいファイルを作成しますが、圧縮と解凍の速度はgzipよりも著しく遅くなります。
これを使用するには、Python実行可能ファイルがbzip2サポートでコンパイルされていることを確認してください。
次の
ImportError
を取得した場合:>>> import bz2 Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'bz2'
これは、Pythonバージョンをbzip2サポート付きで再コンパイルする必要があることを意味します。
gzip
gzipは、小さなメモリフットプリントを必要とするシステムに適しており、メモリが限られているシステムに最適です。 多くの場合、拡張子が「.tar.gz」のファイルを生成するために使用されます。
これを使用するには、Python実行可能ファイルがgzipサポートでコンパイルされていることを確認してください。
次の
ImportError
を取得した場合:>>> import gzip Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'gzip'
これは、Pythonバージョンをgzipサポート付きで再コンパイルする必要があることを意味します。
lzma
lzmaは優れた圧縮率を提供し、メモリ使用量が高くなる代わりに、高速の圧縮および解凍速度で実行されます。
これを使用するには、Python実行可能ファイルがlzmaサポートを使用してコンパイルされていること、およびPythonバージョンが3.3以降であることを確認してください。
次の
ImportError
を取得した場合:>>> import lzma Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'lzma'
これは、Pythonバージョンをlzmaサポート付きで再コンパイルする必要があることを意味します。
または、以下を使用してバックポートをインストールすることもできます。
$ pip install celery[lzma]
zlib
zlibは、ライブラリ形式のDeflateアルゴリズムを抽象化したもので、APIでgzipファイル形式と軽量ストリーム形式の両方のサポートが含まれています。 これは、多くのソフトウェアシステムの重要なコンポーネントです。ほんの数例を挙げると、LinuxカーネルとGitVCSです。
これを使用するには、Python実行可能ファイルがzlibサポートでコンパイルされていることを確認してください。
次の
ImportError
を取得した場合:>>> import zlib Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'zlib'
これは、Pythonバージョンをzlibサポート付きで再コンパイルする必要があることを意味します。
zstd
zstdは、zlibレベルおよびより優れた圧縮率でリアルタイムの圧縮シナリオを対象としています。 これは、Huff0およびFSEライブラリによって提供される非常に高速なエントロピーステージに支えられています。
これを使用するには、Celeryを次のコマンドでインストールします。
$ pip install celery[zstd]
独自の圧縮スキームを作成して、kombu compression registry
に登録することもできます。
次の順序は、タスクの送信時に使用される圧縮スキームを決定するために使用されます。
- 圧縮実行オプション。
@-Task.compression
属性。- :setting: `task_compression` 属性。
タスクを呼び出すときに使用される圧縮を指定する例:
>>> add.apply_async((2, 2), compression='zlib')
接続
自動プールサポート
バージョン2.3以降、自動接続プールがサポートされているため、接続を手動で処理したり、接続を再利用するためにパブリッシャーを処理したりする必要はありません。
バージョン2.5以降、接続プールはデフォルトで有効になっています。
詳細については、:setting: `broker_pool_limit` 設定を参照してください。
パブリッシャーを作成することにより、接続を手動で処理できます。
results = []
with add.app.pool.acquire(block=True) as connection:
with add.get_publisher(connection) as publisher:
try:
for args in numbers:
res = add.apply_async((2, 2), publisher=publisher)
results.append(res)
print([res.get() for res in results])
この特定の例は、グループとしてはるかによく表現されていますが、
>>> from celery import group
>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()
>>> res.get()
[4, 8, 16, 32]
ルーティングオプション
Celeryは、タスクを異なるキューにルーティングできます。
単純なルーティング(名前名前)は、queue
オプション:
add.apply_async(queue='priority.high')
次に、workers -Q
引数を使用して、ワーカーをpriority.high
キューに割り当てることができます。
$ celery -A proj worker -l INFO -Q celery,priority.high
も参照してください
コードにキュー名をハードコーディングすることはお勧めしません。ベストプラクティスは、構成ルーター(:setting: `task_routes` )を使用することです。
ルーティングの詳細については、ルーティングタスクを参照してください。
結果オプション
:setting: `task_ignore_result` 設定を使用するか、ignore_result
オプションを使用して、結果の保存を有効または無効にできます。
>>> result = add.apply_async((1, 2), ignore_result=True)
>>> result.get()
None
>>> # Do not ignore result (default)
...
>>> result = add.apply_async((1, 2), ignore_result=False)
>>> result.get()
3
タスクに関する追加のメタデータを結果バックエンドに保存する場合は、:setting: `result_extended` 設定をTrue
に設定します。
高度なオプション
これらのオプションは、AMQPの完全なルーティング機能を利用したい上級ユーザー向けです。 興味のある方は、ルーティングガイドをお読みください。
両替
メッセージを送信する交換の名前(または
kombu.entity.Exchange
)。routing_key
決定に使用されるルーティングキー。
優先順位
0 と 255 の間の数値。ここで、 255 が最高の優先順位です。
サポート:RabbitMQ、Redis(優先度が逆、0が最高)。