タスクの呼び出し—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/userguide/calling
移動先:案内検索

呼び出しタスク

基本

このドキュメントでは、タスクインスタンスと canvas で使用されるCeleryの統一された「CallingAPI」について説明します。

APIは、実行オプションの標準セットと3つのメソッドを定義します。

  • apply_async(args[, kwargs[, …]])

    タスクメッセージを送信します。

  • delay(*args, **kwargs)

    タスクメッセージを送信するためのショートカットですが、実行オプションはサポートされていません。

  • 呼び出し__call__

    呼び出し元のAPIをサポートするオブジェクト(add(2, 2)など)を適用すると、タスクはワーカーによって実行されるのではなく、現在のプロセスで実行されます(メッセージは送信されません)。


クイックチートシート

  • *; T.delay(arg, kwarg=value)
    .apply_asyncへのスター引数のショートカット。 (.delay(*args, **kwargs).apply_async(args, kwargs)を呼び出します)。
  • T.apply_async((arg,), {'kwarg': value})
  • *; T.apply_async(countdown=10)
    今から10秒で実行されます。
  • *; T.apply_async(eta=now + timedelta(seconds=10))
    etaを使用して指定された今から10秒後に実行されます
  • *; T.apply_async(countdown=60, expires=120)
    今から1分で実行されますが、2分後に期限切れになります。
  • *; T.apply_async(expires=now + timedelta(days=2))
    datetimeを使用して設定すると2日で期限切れになります。


delay()メソッドは、通常の関数を呼び出すように見えるので便利です。

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

代わりにapply_async()を使用して、次のように記述する必要があります。

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

ヒント

タスクが現在のプロセスに登録されていない場合は、代わりに@send_task()を使用してタスクを名前で呼び出すことができます。

したがって、 delay は明らかに便利ですが、追加の実行オプションを設定する場合は、apply_asyncを使用する必要があります。

このドキュメントの残りの部分では、タスク実行オプションについて詳しく説明します。 すべての例では、 add というタスクを使用して、2つの引数の合計を返します。

@app.task
def add(x, y):
    return x + y

別の方法があります…

これについては、後で Canvas について読みながら詳しく説明しますが、signatureは、タスク呼び出しの署名を渡すために使用されるオブジェクトです(たとえば、ネットワーク)、およびそれらはCalling APIもサポートします:

task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()

リンク(コールバック/エラーバック)

Celeryは、あるタスクが別のタスクに続くように、タスクをリンクすることをサポートしています。 コールバックタスクは、親タスクの結果を部分引数として適用されます。

add.apply_async((2, 2), link=add.s(16))

sとは何ですか?

ここで使用されるadd.s呼び出しは、署名と呼ばれます。 それらが何であるかわからない場合は、キャンバスガイドでそれらについて読む必要があります。 ここでは、chainについても学ぶことができます。これは、タスクを連鎖させるためのより簡単な方法です。

実際には、link実行オプションは内部プリミティブと見なされ、おそらく直接使用するのではなく、代わりにチェーンを使用します。

ここで、最初のタスク(4)の結果は、前の結果に16を追加する新しいタスクに送信され、式 \((2 + 2)+ 16 = 20 \)を形成します。

タスクで例外が発生した場合にコールバックを適用することもできます( errback )。 ワーカーは実際にはerrbackをタスクとして呼び出すことはありませんが、代わりにerrback関数を直接呼び出して、生の要求、例外、およびトレースバックオブジェクトをタスクに渡すことができるようにします。

これはエラーコールバックの例です。

@app.task
def error_handler(request, exc, traceback):
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          request.id, exc, traceback))

link_error実行オプションを使用してタスクに追加できます。

add.apply_async((2, 2), link_error=error_handler.s())

さらに、linkオプションとlink_errorオプションの両方をリストとして表すことができます。

add.apply_async((2, 2), link=[add.s(16), other_task.s()])

次に、コールバック/エラーバックが順番に呼び出され、すべてのコールバックが親タスクの戻り値を部分引数として呼び出されます。


メッセージについて

Celeryは、on_messageコールバックを設定することにより、すべての状態の変化をキャッチすることをサポートしています。

たとえば、実行時間の長いタスクでタスクの進行状況を送信するには、次のようにします。

@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 90})
    time.sleep(1)
    return 'hello world: %i' % (a+b)
def on_raw_message(body):
    print(body)

a, b = 1, 1
r = hello.apply_async(args=(a, b))
print(r.get(on_message=on_raw_message, propagate=False))

次のような出力を生成します:

{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 50},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 90},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': 'hello world: 10',
 'children': [],
 'status': 'SUCCESS',
 'traceback': None}
hello world: 10

ETAとカウントダウン

ETA(到着予定時刻)を使用すると、タスクが実行される最も早い時刻である特定の日時を設定できます。 countdown は、ETAを秒単位で設定するためのショートカットです。

>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()    # this takes at least 3 seconds to return
20

タスクは、指定された日時ののある時点で実行されることが保証されていますが、必ずしもその正確な時刻である必要はありません。 期限が切れた理由として考えられるのは、キューで待機している多くのアイテムや、ネットワークの遅延が大きいことです。 タスクがタイムリーに実行されるようにするには、キューの輻輳を監視する必要があります。 Muninまたは同様のツールを使用してアラートを受信すると、適切なアクションを実行してワークロードを軽減できます。 Munin を参照してください。

countdown は整数ですが、 eta はdatetimeオブジェクトである必要があり、正確な日付と時刻(ミリ秒の精度、タイムゾーン情報を含む)を指定します。

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)

警告

countdownを15分以上指定するときにメッセージブローカーとしてRabbitMQを使用すると、ワーカーがPreconditionFailedエラーで終了するという問題が発生する場合があります。

amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - consumer ack timed out on channel

バージョン3.8.15以降のRabbitMQでは、consumer_timeoutのデフォルト値は15分です。 バージョン3.8.17以降、30分に延長されました。 コンシューマーがタイムアウト値を超えて配信を確認しない場合、そのチャネルはPRECONDITION_FAILEDチャネル例外で閉じられます。 詳細については、配信確認タイムアウトを参照してください。

この問題を解決するには、RabbitMQ構成ファイルrabbitmq.confで、カウントダウン値以上のconsumer_timeoutパラメーターを指定する必要があります。 たとえば、consumer_timeout = 31622400000の非常に大きな値(ミリ秒単位で1年に相当)を指定して、将来の問題を回避できます。


有効期限

expires 引数は、オプションの有効期限を、タスクの公開後の秒数、またはdatetimeを使用した特定の日時として定義します。

>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)

ワーカーが期限切れのタスクを受信すると、タスクを:state: `REVOKED`@TaskRevokedError)としてマークします。


メッセージ送信の再試行

Celeryは、接続に失敗した場合にメッセージの送信を自動的に再試行します。再試行の動作(再試行の頻度や最大再試行回数など)を構成するか、すべて一緒に無効にすることができます。

再試行を無効にするには、retry実行オプションをFalseに設定できます。

add.apply_async((2, 2), retry=False)

再試行ポリシー

再試行ポリシーは、再試行の動作を制御するマッピングであり、次のキーを含めることができます。

  • max_retries

    諦めるまでの最大再試行回数。この場合、再試行が失敗する原因となった例外が発生します。

    Noneの値は、永久に再試行することを意味します。

    デフォルトでは、3回再試行します。

  • interval_start

    再試行の間に待機する秒数(浮動小数点または整数)を定義します。 デフォルトは0です(最初の再試行は瞬時に行われます)。

  • interval_step

    連続して再試行するたびに、この数値が再試行遅延(浮動小数点または整数)に追加されます。 デフォルトは0.2です。

  • interval_max

    再試行の間に待機する最大秒数(浮動小数点または整数)。 デフォルトは0.2です。

たとえば、デフォルトのポリシーは次のように相関します。

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

再試行に費やされる最大時間は0.4秒です。 ブローカー接続がダウンしている場合、接続障害が再試行パイル効果につながる可能性があるため、デフォルトでは比較的短く設定されています。たとえば、多くのWebサーバープロセスが再試行を待機し、他の着信要求をブロックします。


接続エラー処理

タスクを送信してメッセージ転送接続が失われた場合、または接続を開始できない場合、OperationalErrorエラーが発生します。

>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 388, in delay
        return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 503, in apply_async
    **options
  File "celery/app/base.py", line 662, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "celery/backends/rpc.py", line 275, in on_task_call
    maybe_declare(self.binding(producer.channel), retry=True)
  File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
    channel = self._channel = channel()
  File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
    self.transport.connect()
  File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
    self.sock.connect(sa)
  kombu.exceptions.OperationalError: [Errno 61] Connection refused

再試行を有効にしている場合、これは再試行が終了した後、またはすぐに無効にした場合にのみ発生します。

このエラーも処理できます。

>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     logger.exception('Sending task raised: %r', exc)

シリアライザー

安全

pickleモジュールでは、任意の機能を実行できます。セキュリティガイドを参照してください。

Celeryには、暗号化を使用してメッセージに署名する特別なシリアライザーも付属しています。

クライアントとワーカー間で転送されるデータはシリアル化する必要があるため、Celeryのすべてのメッセージには、エンコードに使用されるシリアル化方法を説明するcontent_typeヘッダーがあります。

デフォルトのシリアライザーは JSON ですが、:setting: `task_serializer` 設定を使用して、または個々のタスクごとに、あるいはメッセージごとに、これを変更できます。

JSON 、pickle、 YAML 、msgpackのサポートが組み込まれており、独自のカスタムシリアライザーをに登録して追加することもできます。昆布シリアライザーレジストリ

も参照してください

昆布ユーザーガイドのメッセージシリアル化


各オプションには長所と短所があります。

json – JSONは多くのプログラミング言語でサポートされていますが、現在は

Pythonの標準部分(2.6以降)であり、:pypi: `simplejson` などの最新のPythonライブラリを使用してデコードするのはかなり高速です。

JSONの主な欠点は、文字列、Unicode、浮動小数点数、ブール値、辞書、リストのデータ型に制限されることです。 小数と日付が特に欠落しています。

Binary data will be transferred using Base64 encoding, increasing the size of the transferred data by 34% compared to an encoding format where native binary types are supported.

ただし、データが上記の制約内に収まり、言語間のサポートが必要な場合は、JSONのデフォルト設定がおそらく最良の選択です。

詳細については、 http://json.org を参照してください。

ノート

(Python公式ドキュメント https://docs.python.org/3.6/library/json.html から)JSONのキー/値ペアのキーは常にstrタイプです。 辞書がJSONに変換されると、辞書のすべてのキーが文字列に強制されます。 この結果、辞書がJSONに変換されてから辞書に戻されると、辞書が元の辞書と一致しない場合があります。 つまり、xに文字列以外のキーがある場合はloads(dumps(x)) != xです。

ピクルス–以外の言語をサポートしたくない場合

Pythonの場合、pickleエンコーディングを使用すると、すべての組み込みPythonデータ型(クラスインスタンスを除く)のサポート、バイナリファイル送信時の小さなメッセージ、JSON処理のわずかな高速化が得られます。

詳細については、pickleを参照してください。

yaml – YAMLには、jsonと同じ特性の多くがあります。

ただし、より多くのデータ型(日付、再帰参照など)をネイティブにサポートします。

ただし、YAML用のPythonライブラリは、JSON用のライブラリよりもかなり低速です。

より表現力豊かなデータ型のセットが必要で、言語間の互換性を維持する必要がある場合は、YAMLが上記よりも適している可能性があります。

詳細については、 http://yaml.org/を参照してください。

msgpack – msgpackは、JSONに近いバイナリシリアル化形式です。

機能で。 ただし、これは非常に若いため、この時点でサポートは実験的なものと見なす必要があります。

詳細については、 http://msgpack.org/を参照してください。

使用されるエンコーディングはメッセージヘッダーとして利用できるため、ワーカーはタスクを逆シリアル化する方法を知っています。 カスタムシリアライザーを使用する場合、このシリアライザーはワーカーが使用できる必要があります。

次の順序は、タスクの送信時に使用されるシリアライザーを決定するために使用されます。

  1. シリアライザー実行オプション。
  2. @-Task.serializer属性
  3. :setting: `task_serializer` 設定。


単一のタスク呼び出しにカスタムシリアライザーを設定する例:

>>> add.apply_async((10, 10), serializer='json')

圧縮

Celeryは、次の組み込みスキームを使用してメッセージを圧縮できます。

  • brotli

    brotliは、Web、特に小さなテキストドキュメント用に最適化されています。 フォントやHTMLページなどの静的コンテンツを提供するのに最も効果的です。

    これを使用するには、Celeryを次のコマンドでインストールします。

    $ pip install celery[brotli]
  • bzip2

    bzip2はgzipよりも小さいファイルを作成しますが、圧縮と解凍の速度はgzipよりも著しく遅くなります。

    これを使用するには、Python実行可能ファイルがbzip2サポートでコンパイルされていることを確認してください。

    次のImportErrorを取得した場合:

    >>> import bz2
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'bz2'

    これは、Pythonバージョンをbzip2サポート付きで再コンパイルする必要があることを意味します。

  • gzip

    gzipは、小さなメモリフットプリントを必要とするシステムに適しており、メモリが限られているシステムに最適です。 多くの場合、拡張子が「.tar.gz」のファイルを生成するために使用されます。

    これを使用するには、Python実行可能ファイルがgzipサポートでコンパイルされていることを確認してください。

    次のImportErrorを取得した場合:

    >>> import gzip
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'gzip'

    これは、Pythonバージョンをgzipサポート付きで再コンパイルする必要があることを意味します。

  • lzma

    lzmaは優れた圧縮率を提供し、メモリ使用量が高くなる代わりに、高速の圧縮および解凍速度で実行されます。

    これを使用するには、Python実行可能ファイルがlzmaサポートを使用してコンパイルされていること、およびPythonバージョンが3.3以降であることを確認してください。

    次のImportErrorを取得した場合:

    >>> import lzma
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'lzma'

    これは、Pythonバージョンをlzmaサポート付きで再コンパイルする必要があることを意味します。

    または、以下を使用してバックポートをインストールすることもできます。

    $ pip install celery[lzma]
  • zlib

    zlibは、ライブラリ形式のDeflateアルゴリズムを抽象化したもので、APIでgzipファイル形式と軽量ストリーム形式の両方のサポートが含まれています。 これは、多くのソフトウェアシステムの重要なコンポーネントです。ほんの数例を挙げると、LinuxカーネルとGitVCSです。

    これを使用するには、Python実行可能ファイルがzlibサポートでコンパイルされていることを確認してください。

    次のImportErrorを取得した場合:

    >>> import zlib
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'zlib'

    これは、Pythonバージョンをzlibサポート付きで再コンパイルする必要があることを意味します。

  • zstd

    zstdは、zlibレベルおよびより優れた圧縮率でリアルタイムの圧縮シナリオを対象としています。 これは、Huff0およびFSEライブラリによって提供される非常に高速なエントロピーステージに支えられています。

    これを使用するには、Celeryを次のコマンドでインストールします。

    $ pip install celery[zstd]

独自の圧縮スキームを作成して、kombu compression registryに登録することもできます。

次の順序は、タスクの送信時に使用される圧縮スキームを決定するために使用されます。

  1. 圧縮実行オプション。
  2. @-Task.compression属性。
  3. :setting: `task_compression` 属性。


タスクを呼び出すときに使用される圧縮を指定する例:

>>> add.apply_async((2, 2), compression='zlib')

接続

自動プールサポート

バージョン2.3以降、自動接続プールがサポートされているため、接続を手動で処理したり、接続を再利用するためにパブリッシャーを処理したりする必要はありません。

バージョン2.5以降、接続プールはデフォルトで有効になっています。

詳細については、:setting: `broker_pool_limit` 設定を参照してください。

パブリッシャーを作成することにより、接続を手動で処理できます。

results = []
with add.app.pool.acquire(block=True) as connection:
    with add.get_publisher(connection) as publisher:
        try:
            for args in numbers:
                res = add.apply_async((2, 2), publisher=publisher)
                results.append(res)
print([res.get() for res in results])

この特定の例は、グループとしてはるかによく表現されていますが、

>>> from celery import group

>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()

>>> res.get()
[4, 8, 16, 32]

ルーティングオプション

Celeryは、タスクを異なるキューにルーティングできます。

単純なルーティング(名前名前)は、queue オプション:

add.apply_async(queue='priority.high')

次に、workers -Q引数を使用して、ワーカーをpriority.highキューに割り当てることができます。

$ celery -A proj worker -l INFO -Q celery,priority.high

も参照してください

コードにキュー名をハードコーディングすることはお勧めしません。ベストプラクティスは、構成ルーター(:setting: `task_routes` )を使用することです。

ルーティングの詳細については、ルーティングタスクを参照してください。


結果オプション

:setting: `task_ignore_result` 設定を使用するか、ignore_resultオプションを使用して、結果の保存を有効または無効にできます。

>>> result = add.apply_async((1, 2), ignore_result=True)
>>> result.get()
None

>>> # Do not ignore result (default)
...
>>> result = add.apply_async((1, 2), ignore_result=False)
>>> result.get()
3

タスクに関する追加のメタデータを結果バックエンドに保存する場合は、:setting: `result_extended` 設定をTrueに設定します。

も参照してください

タスクの詳細については、タスクを参照してください。


高度なオプション

これらのオプションは、AMQPの完全なルーティング機能を利用したい上級ユーザー向けです。 興味のある方は、ルーティングガイドをお読みください。

  • 両替

    メッセージを送信する交換の名前(またはkombu.entity.Exchange)。

  • routing_key

    決定に使用されるルーティングキー。

  • 優先順位

    0 と 255 の間の数値。ここで、 255 が最高の優先順位です。

    サポート:RabbitMQ、Redis(優先度が逆、0が最高)。