Canvas:ワークフローの設計—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/userguide/canvas
移動先:案内検索

キャンバス:ワークフローの設計

署名

バージョン2.0の新機能。


calling ガイドのtasks delayメソッドを使用してタスクを呼び出す方法を学習しました。多くの場合、これで十分ですが、タスク呼び出しの署名を渡したい場合もあります。別のプロセスへ、または別の関数への引数として。

signature()は、単一のタスク呼び出しの引数、キーワード引数、および実行オプションを、関数に渡したり、シリアル化してネットワーク経由で送信したりできるようにラップします。

  • addタスクの署名は、次のような名前を使用して作成できます。

    >>> from celery import signature
    >>> signature('tasks.add', args=(2, 2), countdown=10)
    tasks.add(2, 2)

    このタスクには、アリティ2(2つの引数)のシグネチャ(2, 2)があり、カウントダウン実行オプションを10に設定します。

  • または、タスクのsignatureメソッドを使用して作成できます。

    >>> add.signature((2, 2), countdown=10)
    tasks.add(2, 2)
  • スター引数を使用したショートカットもあります。

    >>> add.s(2, 2)
    tasks.add(2, 2)
  • キーワード引数もサポートされています。

    >>> add.s(2, 2, debug=True)
    tasks.add(2, 2, debug=True)
  • 任意の署名インスタンスから、さまざまなフィールドを検査できます。

    >>> s = add.signature((2, 2), {'debug': True}, countdown=10)
    >>> s.args
    (2, 2)
    >>> s.kwargs
    {'debug': True}
    >>> s.options
    {'countdown': 10}
  • delayapply_asyncなどの「CallingAPI」をサポートしており、直接呼び出されることもあります(__call__)。

    署名を呼び出すと、現在のプロセスでタスクがインラインで実行されます。

    >>> add(2, 2)
    4
    >>> add.s(2, 2)()
    4

    delayは、スター引数を取るapply_asyncへの私たちの最愛のショートカットです。

    >>> result = add.delay(2, 2)
    >>> result.get()
    4

    apply_asyncは、Task.apply_asyncメソッドと同じ引数を取ります。

    >>> add.apply_async(args, kwargs, **options)
    >>> add.signature(args, kwargs, **options).apply_async()
    
    >>> add.apply_async((2, 2), countdown=1)
    >>> add.signature((2, 2), countdown=1).apply_async()
  • s()でオプションを定義することはできませんが、チェーンset呼び出しがそれを処理します。

    >>> add.s(2, 2).set(countdown=1)
    proj.tasks.add(2, 2)

パーシャル

署名を使用すると、ワーカーでタスクを実行できます。

>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async(countdown=1)

または、現在のプロセスで直接呼び出すこともできます。

>>> add.s(2, 2)()
4

apply_async / delayに追加の引数、kwargs、またはオプションを指定すると、パーシャルが作成されます。

  • 追加された引数はすべて、署名の引数の前に追加されます。

    >>> partial = add.s(2)          # incomplete signature
    >>> partial.delay(4)            # 4 + 2
    >>> partial.apply_async((4,))  # same
  • 追加されたキーワード引数はすべて、署名のkwargsとマージされ、新しいキーワード引数が優先されます。

    >>> s = add.s(2, 2)
    >>> s.delay(debug=True)                    # -> add(2, 2, debug=True)
    >>> s.apply_async(kwargs={'debug': True})  # same
  • 追加されたオプションはすべて、署名のオプションとマージされ、新しいオプションが優先されます。

    >>> s = add.signature((2, 2), countdown=10)
    >>> s.apply_async(countdown=1)  # countdown is now 1

署名を複製して派生物を作成することもできます。

>>> s = add.s(2)
proj.tasks.add(2)

>>> s.clone(args=(4,), kwargs={'debug': True})
proj.tasks.add(4, 2, debug=True)

不変性

バージョン3.0の新機能。


パーシャルは、コールバック、リンクされたタスク、またはコードコールバックで使用されることを意図しており、親タスクの結果とともに適用されます。 追加の引数をとらないコールバックを指定したい場合があります。その場合、署名を不変に設定できます。

>>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))

.si()ショートカットを使用して、不変の署名を作成することもできます。

>>> add.apply_async((2, 2), link=reset_buffers.si())

シグニチャが不変の場合は実行オプションのみを設定できるため、部分的なargs / kwargsを使用してシグニチャを呼び出すことはできません。

ノート

このチュートリアルでは、署名にプレフィックス演算子〜を使用することがあります。 おそらく本番コードでは使用しないでください。ただし、Pythonシェルで実験する場合は便利なショートカットです。

>>> ~sig

>>> # is the same as
>>> sig.delay().get()

コールバック

バージョン3.0の新機能。


apply_asynclink引数を使用してコールバックを任意のタスクに追加できます。

add.apply_async((2, 2), link=other_task.s())

コールバックは、タスクが正常に終了した場合にのみ適用され、親タスクの戻り値を引数として適用されます。

前に述べたように、署名に追加する引数はすべて、署名自体によって指定された引数の前に追加されます。

署名がある場合:

>>> sig = add.s(10)

その場合、 sig.delay(result)は次のようになります。

>>> add.apply_async(args=(result, 10))

次に、部分引数を使用したコールバックでaddタスクを呼び出しましょう。

>>> add.apply_async((2, 2), link=add.s(8))

予想どおり、これは最初に \(2 + 2 \)を計算する1つのタスクを起動し、次に \(4 + 8 \)を計算する別のタスクを起動します。


プリミティブ

バージョン3.0の新機能。


概要

  • group

    グループプリミティブは、並行して適用する必要があるタスクのリストを取得するシグニチャです。

  • chain

    チェーンプリミティブを使用すると、シグニチャをリンクして、一方が次々に呼び出され、基本的にコールバックのチェーンが形成されます。

  • chord

    コードはグループと同じですが、コールバックがあります。 コードはヘッダーグループと本文で構成されます。本文は、ヘッダー内のすべてのタスクが完了した後に実行する必要があるタスクです。

  • map

    マッププリミティブは、組み込みのmap関数のように機能しますが、引数のリストがタスクに適用される一時的なタスクを作成します。 たとえば、task.map([1, 2]) –単一のタスクが呼び出され、タスク関数に順番に引数が適用され、結果は次のようになります。

    res = [task(1), task(2)]
  • starmap

    引数が*argsとして適用されることを除いて、mapとまったく同じように機能します。 たとえば、add.starmap([(2, 2), (4, 4)])は、以下を呼び出す単一のタスクになります。

    res = [add(2, 2), add(4, 4)]
  • chunks

    チャンキングは、引数の長いリストを次のように部分に分割します。

    >>> items = zip(range(1000), range(1000))  # 1000 items
    >>> add.chunks(items, 10)

    アイテムのリストを10個のチャンクに分割し、100個のタスクを生成します(それぞれが10個のアイテムを順番に処理します)。


プリミティブはそれ自体が署名オブジェクトでもあるため、さまざまな方法で組み合わせて複雑なワークフローを構成できます。

ここにいくつかの例があります:

  • シンプルチェーン

    これは単純なチェーンです。最初のタスクが実行され、その戻り値がチェーン内の次のタスクに渡されます。

    >>> from celery import chain
    
    >>> # 2 + 2 + 4 + 8
    >>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
    >>> res.get()
    16

    これは、パイプを使用して書き込むこともできます。

    >>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
    16
  • 不変の署名

    署名は部分的であるため、既存の引数に引数を追加できますが、チェーン内の前のタスクの結果が必要ない場合など、常に必要な場合はありません。

    その場合、署名を不変としてマークして、引数を変更できないようにすることができます。

    >>> add.signature((2, 2), immutable=True)

    これには.si()ショートカットもあり、これは署名を作成するための推奨される方法です。

    >>> add.si(2, 2)

    これで、代わりに独立したタスクのチェーンを作成できます。

    >>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
    >>> res.get()
    16
    
    >>> res.parent.get()
    8
    
    >>> res.parent.parent.get()
    4
  • 単純なグループ

    並行して実行するタスクのグループを簡単に作成できます。

    >>> from celery import group
    >>> res = group(add.s(i, i) for i in range(10))()
    >>> res.get(timeout=1)
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  • シンプルなコード

    コードプリミティブを使用すると、グループ内のすべてのタスクの実行が終了したときに呼び出されるコールバックを追加できます。 これは、驚異的並列ではないアルゴリズムで必要になることがよくあります。

    >>> from celery import chord
    >>> res = chord((add.s(i, i) for i in range(10)), xsum.s())()
    >>> res.get()
    90

    上記の例では、すべてが並行して開始される10個のタスクを作成し、それらがすべて完了すると、戻り値がリストにまとめられ、xsumタスクに送信されます。

    コードの本体も不変である可能性があるため、グループの戻り値はコールバックに渡されません。

    >>> chord((import_contact.s(c) for c in contacts),
    ...       notify_complete.si(import_id)).apply_async()

    上記の.siの使用に注意してください。 これにより、不変の署名が作成されます。つまり、渡された新しい引数(前のタスクの戻り値を含む)はすべて無視されます。

  • 組み合わせてあなたの心を吹き飛ばします

    チェーンも部分的である可能性があります。

    >>> c1 = (add.s(4) | mul.s(8))
    
    # (16 + 4) * 8
    >>> res = c1(16)
    >>> res.get()
    160

    これは、チェーンを組み合わせることができることを意味します。

    # ((4 + 16) * 2 + 4) * 8
    >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
    
    >>> res = c2()
    >>> res.get()
    352

    グループを別のタスクとチェーンすると、自動的にコードにアップグレードされます。

    >>> c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())
    >>> res = c3()
    >>> res.get()
    90

    グループとコードは部分的な引数も受け入れるため、チェーンでは、前のタスクの戻り値がグループ内のすべてのタスクに転送されます。

    >>> new_user_workflow = (create_user.s() | group(
    ...                      import_contacts.s(),
    ...                      send_welcome_email.s()))
    ... new_user_workflow.delay(username='artv',
    ...                         first='Art',
    ...                         last='Vandelay',
    ...                         email='[email protected]')

    引数をグループに転送したくない場合は、グループ内の署名を不変にすることができます。

    >>> res = (add.s(4, 4) | group(add.si(i, i) for i in range(10)))()
    >>> res.get()
    <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [
        bc01831b-9486-4e51-b046-480d7c9b78de,
        2650a1b8-32bf-4771-a645-b0a35dcc791b,
        dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf,
        59f92e0a-23ea-41ce-9fad-8645a0e7759c,
        26e1e707-eccf-4bf4-bbd8-1e1729c3cce3,
        2d10a5f4-37f0-41b2-96ac-a973b1df024d,
        e13d3bdb-7ae3-4101-81a4-6f17ee21df2d,
        104b2be0-7b75-44eb-ac8e-f9220bdfa140,
        c5c551a5-0386-4973-aa37-b65cbeb2624b,
        83f72d71-4b71-428e-b604-6f16599a9f37]>
    
    >>> res.parent.get()
    8

チェーン

バージョン3.0の新機能。


タスクは相互にリンクできます。リンクされたタスクは、タスクが正常に戻ったときに呼び出されます。

>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
4

リンクされたタスクは、その親タスクの結果を最初の引数として適用されます。 結果が4である上記の場合、これはmul(4, 16)になります。

結果は、元のタスクによって呼び出されたサブタスクを追跡します。これには、結果インスタンスからアクセスできます。

>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]

>>> res.children[0].get()
64

結果インスタンスには、結果をグラフとして扱うcollect()メソッドもあり、結果を反復処理できます。

>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
 (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]

デフォルトでは、グラフが完全に形成されていない場合(タスクの1つがまだ完了していない場合)、collect()@IncompleteStream例外を発生させますが、グラフの中間表現を取得することもできます。

>>> for result, value in res.collect(intermediate=True):
....

必要な数のタスクをリンクでき、署名もリンクできます。

>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())

on_error メソッドを使用して、エラーコールバックを追加することもできます。

>>> add.s(2, 2).on_error(log_error.s()).delay()

これにより、署名が適用されると、次の.apply_async呼び出しが発生します。

>>> add.apply_async((2, 2), link_error=log_error.s())

ワーカーは実際にはerrbackをタスクとして呼び出すことはありませんが、代わりにerrback関数を直接呼び出して、生の要求、例外、およびトレースバックオブジェクトをタスクに渡すことができるようにします。

errbackの例を次に示します。

from __future__ import print_function

import os

from proj.celery import app

@app.task
def log_error(request, exc, traceback):
    with open(os.path.join('/var/errors', request.id), 'a') as fh:
        print('--\n\n{0} {1} {2}'.format(
            request.id, exc, traceback), file=fh)

タスクをさらに簡単にリンクできるようにするために、chainと呼ばれる特別な署名があります。これを使用するとタスクをチェーンできます。

>>> from celery import chain
>>> from proj.tasks import add, mul

>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)

チェーンを呼び出すと、現在のプロセスのタスクが呼び出され、チェーンの最後のタスクの結果が返されます。

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.get()
640

また、parent属性を設定して、チェーンを上っていき、中間結果を取得できるようにします。

>>> res.parent.get()
64

>>> res.parent.parent.get()
8

>>> res.parent.parent
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>

チェーンは、|(パイプ)演算子を使用して作成することもできます。

>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()

グラフ

さらに、結果グラフをDependencyGraphとして操作できます。

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()

>>> res.parent.parent.graph
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
    463afec2-5ed4-4036-b22d-ba067ec64f52(0)
872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
    285fa253-fcf8-42ef-8b95-0078897e83e6(1)
        463afec2-5ed4-4036-b22d-ba067ec64f52(0)

これらのグラフをドット形式に変換することもできます。

>>> with open('graph.dot', 'w') as fh:
...     res.parent.parent.graph.to_dot(fh)

画像を作成します。

$ dot -Tpng graph.dot -o graph.png

../_images/result_graph.png

グループ

バージョン3.0の新機能。


グループを使用して、複数のタスクを並行して実行できます。

group関数は、署名のリストを受け取ります。

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))

グループを呼び出しすると、タスクは現在のプロセスで次々に適用され、結果を追跡したり、方法を教えたりするために使用できるGroupResultインスタンスが返されます。多くのタスクの準備ができているなど:

>>> g = group(add.s(2, 2), add.s(4, 4))
>>> res = g()
>>> res.get()
[4, 8]

グループはイテレータもサポートしています。

>>> group(add.s(i, i) for i in range(100))()

グループは署名オブジェクトであるため、他の署名と組み合わせて使用できます。

グループコールバックとエラー処理

グループにはコールバック署名とエラーバック署名をリンクさせることもできますが、グループは実際のタスクではなく、リンクされたタスクをカプセル化された署名に渡すだけなので、動作はやや意外な場合があります。 これは、グループの戻り値が収集されて、リンクされたコールバック署名に渡されないことを意味します。 例として、単純な add(a、b)タスクを使用する次のスニペットは、リンクされた add.s()署名が最終的なグループ結果を受け取らないため、障害があります。予想。

>>> g = group(add.s(2, 2), add.s(4, 4))
>>> g.link(add.s())
>>> res = g()
[4, 8]

最初の2つのタスクの最終結果が返されますが、コールバックシグネチャはバックグラウンドで実行され、予期する2つの引数を受け取らなかったため、例外が発生することに注意してください。

グループのエラーバックはカプセル化されたシグニチャにも渡されます。これにより、グループ内の複数のタスクが失敗した場合に、一度だけリンクされたエラーバックが複数回呼び出される可能性があります。 例として、例外を発生させる fail()タスクを使用する次のスニペットは、グループで実行される失敗したタスクごとに1回 log_error()署名を呼び出すことが期待できます。 。

>>> g = group(fail.s(), fail.s())
>>> g.link_error(log_error.s())
>>> res = g()

これを念頭に置いて、エラーバックとして使用するために繰り返し呼び出されることを許容するべき等またはカウントタスクを作成することをお勧めします。

これらのユースケースは、特定のバックエンド実装でサポートされているchordクラスによってより適切に対処されます。


グループの結果

グループタスクも特別な結果を返します。この結果は、グループ全体で機能することを除いて、通常のタスク結果と同じように機能します。

>>> from celery import group
>>> from tasks import add

>>> job = group([
...             add.s(2, 2),
...             add.s(4, 4),
...             add.s(8, 8),
...             add.s(16, 16),
...             add.s(32, 32),
... ])

>>> result = job.apply_async()

>>> result.ready()  # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]

GroupResultは、AsyncResultインスタンスのリストを取得し、単一のタスクであるかのようにそれらを操作します。

次の操作をサポートします。

  • successful()

    すべてのサブタスクが正常に終了した場合(たとえば、例外が発生しなかった場合)、Trueを返します。

  • failed()

    サブタスクのいずれかが失敗した場合は、Trueを返します。

  • waiting()

    サブタスクのいずれかがまだ準備ができていない場合は、Trueを返します。

  • ready()

    すべてのサブタスクの準備ができたら、Trueを返します。

  • completed_count()

    完了したサブタスクの数を返します。

  • revoke()

    すべてのサブタスクを取り消します。

  • join()

    すべてのサブタスクの結果を収集し、呼び出されたのと同じ順序で(リストとして)返します。


和音

バージョン2.3の新機能。


ノート

コード内で使用されるタスクは、その結果を無視してはなりません ' 。 コード内の任意のタスク(ヘッダーまたは本文)で結果バックエンドが無効になっている場合は、「重要な注意事項」をお読みください。 コードは現在、RPC結果バックエンドではサポートされていません。


コードは、グループ内のすべてのタスクの実行が終了した後にのみ実行されるタスクです。

式の合計を計算してみましょう \(1 + 1 + 2 + 2 + 3 + 3..。 n + n \) 100桁まで。

まず、add()tsum()の2つのタスクが必要です(sum()はすでに標準機能です)。

@app.task
def add(x, y):
    return x + y

@app.task
def tsum(numbers):
    return sum(numbers)

これで、コードを使用して各加算ステップを並行して計算し、結果の数値の合計を取得できます。

>>> from celery import chord
>>> from tasks import add, tsum

>>> chord(add.s(i, i)
...       for i in range(100))(tsum.s()).get()
9900

これは明らかに非常に不自然な例です。メッセージングと同期のオーバーヘッドにより、Pythonの対応するものよりもはるかに遅くなります。

>>> sum(i + i for i in range(100))

同期の手順にはコストがかかるため、コードの使用はできるだけ避けてください。 それでも、同期は多くの並列アルゴリズムに必要なステップであるため、コードはツールボックスに含めることができる強力なプリミティブです。

コード式を分解してみましょう:

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900

コールバックは、ヘッダー内のすべてのタスクが返された後にのみ実行できることを忘れないでください。 ヘッダーの各ステップは、タスクとして並行して、場合によっては異なるノードで実行されます。 次に、ヘッダー内の各タスクの戻り値を使用してコールバックが適用されます。 chord()によって返されるタスクIDはコールバックのIDであるため、コールバックが完了するのを待って最終的な戻り値を取得できます(ただし、タスクに他のタスクを待機させないでください

エラー処理

では、タスクの1つで例外が発生した場合はどうなりますか?

コードコールバックの結果は失敗状態に移行し、エラーは@ChordError例外に設定されます。

>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
>>> result = c()
>>> result.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "*/celery/result.py", line 120, in get
    interval=interval)
  File "*/celery/backends/amqp.py", line 150, in wait_for
    raise meta['result']
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
    raised ValueError('something something',)

トレースバックは、使用された結果バックエンドによって異なる場合がありますが、エラーの説明には、失敗したタスクのIDと元の例外の文字列表現が含まれていることがわかります。 元のトレースバックはresult.tracebackにもあります。

残りのタスクは引き続き実行されるため、中間のタスクが失敗した場合でも、3番目のタスク(add.s(8, 8))は引き続き実行されることに注意してください。 また、@ChordErrorは、最初に(時間内に)失敗したタスクのみを表示します。ヘッダーグループの順序は考慮されません。

したがって、コードが失敗したときにアクションを実行するには、コードコールバックにerrbackをアタッチできます。

@app.task
def on_chord_error(request, exc, traceback):
    print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
>>> c = (group(add.s(i, i) for i in range(10)) |
...      xsum.s().on_error(on_chord_error.s())).delay()

コードには、コールバック署名とエラーバック署名がリンクされている場合があります。これにより、署名をグループにリンクする際の問題のいくつかに対処できます。 そうすることで、提供された署名がコードの本体にリンクされ、本体の完了時にコールバックが1回だけ正常に呼び出されるか、コードヘッダーまたは本体のタスクが失敗した場合にエラーバックが1回だけ呼び出されることが期待できます。


重要な注意事項

コード内で使用されるタスクは、その結果を無視してはなりません ' 。 実際には、これは、コードを使用するためにresult_backendを有効にする必要があることを意味します。 また、設定でtask_ignore_resultTrueに設定されている場合は、コード内で使用する個々のタスクがignore_result=Falseで定義されていることを確認してください。 これは、タスクサブクラスと装飾されたタスクの両方に適用されます。

タスクサブクラスの例:

class MyTask(Task):
    ignore_result = False

装飾されたタスクの例:

@app.task(ignore_result=False)
def another_task(project):
    do_something()

デフォルトでは、同期ステップは、定期的なタスクがグループの完了を毎秒ポーリングし、準備ができたら署名を呼び出すことによって実装されます。

実装例:

from celery import maybe_signature

@app.task(bind=True)
def unlock_chord(self, group, callback, interval=1, max_retries=None):
    if group.ready():
        return maybe_signature(callback).delay(group.join())
    raise self.retry(countdown=interval, max_retries=max_retries)

これは、RedisとMemcachedを除くすべての結果バックエンドで使用されます。ヘッダー内の各タスクの後にカウンターをインクリメントし、カウンターがセット内のタスクの数を超えるとコールバックを適用します。

RedisとMemcachedのアプローチははるかに優れたソリューションですが、他のバックエンドに簡単に実装することはできません(提案を歓迎します!)。

ノート

コードは、バージョン2.2より前のRedisでは正しく機能しません。 それらを使用するには、少なくともredis-server2.2にアップグレードする必要があります。


ノート

Redis結果バックエンドでコードを使用していて、Task.after_return()メソッドもオーバーライドしている場合は、必ずスーパーメソッドを呼び出す必要があります。そうしないと、コードコールバックが適用されません。

def after_return(self, *args, **kwargs):
    do_something()
    super().after_return(*args, **kwargs)

地図と星図

mapおよびstarmapは、シーケンス内のすべての要素に対して提供された呼び出しタスクを呼び出す組み込みタスクです。

groupとは次の点で異なります。

  • 1つのタスクメッセージのみが送信されます。
  • 操作は順次です。

たとえば、mapを使用します。

>>> from proj.tasks import add

>>> ~xsum.map([range(10), range(100)])
[45, 4950]

タスクを実行するのと同じです。

@app.task
def temp():
    return [xsum(range(10)), xsum(range(100))]

starmapの使用:

>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

タスクを実行するのと同じです。

@app.task
def temp():
    return [add(i, i) for i in range(10)]

mapstarmapはどちらも署名オブジェクトであるため、他の署名として使用したり、グループなどで組み合わせたりして、たとえば10秒後に星図を呼び出すことができます。

>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)

チャンク

チャンキングを使用すると、反復可能な作業を断片に分割できるため、100万個のオブジェクトがある場合、それぞれ10万個のオブジェクトで10個のタスクを作成できます。

タスクをチャンク化すると並列処理が低下することを心配する人もいるかもしれませんが、これがビジーなクラスターに当てはまることはめったにありません。実際には、メッセージングのオーバーヘッドを回避しているため、パフォーマンスが大幅に向上する可能性があります。

チャンク署名を作成するには、@Task.chunks()を使用できます。

>>> add.chunks(zip(range(100), range(100)), 10)

groupと同様に、チャンクのメッセージを送信する動作は、呼び出されたときに現在のプロセスで発生します。

>>> from proj.tasks import add

>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

.apply_asyncを呼び出すと、専用のタスクが作成され、代わりに個々のタスクがワーカーに適用されます。

>>> add.chunks(zip(range(100), range(100)), 10).apply_async()

チャンクをグループに変換することもできます。

>>> group = add.chunks(zip(range(100), range(100)), 10).group()

グループを使用すると、各タスクのカウントダウンが1ずつ増加します。

>>> group.skew(start=1, stop=10)()

これは、最初のタスクのカウントダウンが1秒、2番目のタスクのカウントダウンが2秒というようになることを意味します。