次のステップ—Pythonドキュメント
次のステップ
セロリの最初のステップガイドは意図的に最小限に抑えられています。 このガイドでは、アプリケーションとライブラリにCeleryサポートを追加する方法など、Celeryが提供するものについて詳しく説明します。
このドキュメントには、Celeryのすべての機能とベストプラクティスが記載されているわけではないため、ユーザーガイドも読むことをお勧めします。
アプリケーションでのCeleryの使用
私たちのプロジェクト
プロジェクトのレイアウト:
proj/__init__.py
/celery.py
/tasks.py
proj/celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='rpc://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
このモジュールでは、@Celery
インスタンス(アプリと呼ばれることもあります)を作成しました。 プロジェクト内でCeleryを使用するには、このインスタンスをインポートするだけです。
broker
引数は、使用するブローカーのURLを指定します。詳細については、ブローカーの選択を参照してください。
backend
引数は、使用する結果バックエンドを指定します。タスクの状態と結果を追跡するために使用されます。 結果はデフォルトで無効になっていますが、後で結果の取得がどのように機能するかを示すため、ここではRPC結果バックエンドを使用します。 アプリケーションに別のバックエンドを使用することをお勧めします。 それらはすべて異なる長所と短所を持っています。 結果が必要ない場合は、無効にすることをお勧めします。
@task(ignore_result=True)
オプションを設定することにより、個々のタスクの結果を無効にすることもできます。詳細については、結果の保持を参照してください。
include
引数は、ワーカーの起動時にインポートするモジュールのリストです。 ワーカーがタスクを見つけられるように、ここにタスクモジュールを追加する必要があります。
proj/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
労働者を始める
celery プログラムを使用して、ワーカーを起動できます(projの上のディレクトリでワーカーを実行する必要があります)。
$ celery -A proj worker -l INFO
ワーカーが起動すると、バナーといくつかのメッセージが表示されます。
--------------- [email protected] v4.0 (latentcall)
--- ***** -----
-- ******* ---- [Configuration]
- *** --- * --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events: OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-06-08 16:23:51,078: WARNING/MainProcess] [email protected] has started.
– ブローカーは、celery
モジュールのブローカー引数で指定したURLです。 -b
オプションを使用して、コマンドラインで別のブローカーを指定することもできます。
– 同時実行性は、タスクを同時に処理するために使用されるプリフォークワーカープロセスの数です。 これらすべてが作業で忙しい場合、新しいタスクは、タスクの1つが完了するのを待ってから処理する必要があります。
デフォルトの同時実行数は、そのマシン(コアを含む)のCPUの数です。 celery worker -c
オプションを使用してカスタム番号を指定できます。 最適な数は多くの要因に依存するため、推奨値はありませんが、タスクがほとんどI / Oバウンドである場合は、それを増やすことを試みることができます。 実験によると、CPUの数を2倍以上追加しても効果はほとんどなく、代わりにパフォーマンスが低下する可能性があります。
Celeryは、デフォルトのプリフォークプールを含め、Eventlet、Geventの使用、および単一スレッドでの実行もサポートしています(同時実行を参照)。
– Events は、Celeryがワーカーで発生するアクションの監視メッセージ(イベント)を送信するようにするオプションです。 これらは、celery events
やFlower–リアルタイムCeleryモニターなどのモニタープログラムで使用できます。これについては、監視および管理ガイドで確認できます。
– キューは、ワーカーがタスクを消費するキューのリストです。 ワーカーは一度に複数のキューから消費するように指示できます。これは、ルーティングガイドで説明されているサービス品質、関心の分離、優先順位付けの手段として、メッセージを特定のワーカーにルーティングするために使用されます。 。
--help
フラグを渡すと、コマンドライン引数の完全なリストを取得できます。
$ celery worker --help
これらのオプションについては、ワーカーガイドで詳しく説明されています。
バックグラウンドで
本番環境では、デーモン化チュートリアルで詳細に説明されているように、ワーカーをバックグラウンドで実行する必要があります。
デーモン化スクリプトは、 celery multi コマンドを使用して、バックグラウンドで1つ以上のワーカーを起動します。
$ celery multi start w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Starting nodes...
> w1.halcyon.local: OK
再起動することもできます。
$ celery multi restart w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
> w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64052
またはそれを停止します:
$ celery multi stop w1 -A proj -l INFO
stop
コマンドは非同期であるため、ワーカーがシャットダウンするのを待ちません。 代わりにstopwait
コマンドを使用することをお勧めします。これにより、現在実行中のすべてのタスクが終了する前に完了します。
$ celery multi stopwait w1 -A proj -l INFO
ノート
celery multi はワーカーに関する情報を格納しないため、再起動時に同じコマンドライン引数を使用する必要があります。 停止するときは、同じpidfile引数とlogfile引数のみを使用する必要があります。
デフォルトでは、現在のディレクトリにpidファイルとログファイルが作成されます。 複数のワーカーが互いに重なり合うのを防ぐために、これらを専用のディレクトリに配置することをお勧めします。
$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
multiコマンドを使用すると、複数のワーカーを開始できます。また、さまざまなワーカーの引数を指定するための強力なコマンドライン構文があります。次に例を示します。
$ celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \
-Q default -L:4,5 debug
その他の例については、APIリファレンスのmulti
モジュールを参照してください。
--app引数について
--app
引数は、使用するCeleryアプリインスタンスをmodule.path:attribute
の形式で指定します。
ただし、ショートカットフォームもサポートしています。 パッケージ名のみが指定されている場合、次の順序でアプリインスタンスを検索しようとします。
--app=proj
の場合:
proj.app
という名前の属性、またはproj.celery
という名前の属性、または- モジュール
proj
の任意の属性(値はCeleryアプリケーション)、または
これらのいずれも見つからない場合は、proj.celery
という名前のサブモジュールを試します。
proj.celery.app
という名前の属性、またはproj.celery.celery
という名前の属性、またはモジュール
proj.celery
の任意の属性で、値はCeleryアプリケーションです。
このスキームは、ドキュメントで使用されている方法を模倣しています。つまり、含まれている単一のモジュールの場合はproj:app
、大規模なプロジェクトの場合はproj.celery:app
です。
呼び出しタスク
delay()
メソッドを使用してタスクを呼び出すことができます。
>>> from proj.tasks import add
>>> add.delay(2, 2)
このメソッドは、実際にはapply_async()
と呼ばれる別のメソッドへのスター引数のショートカットです。
>>> add.apply_async((2, 2))
後者を使用すると、実行時間(カウントダウン)、送信先のキューなどの実行オプションを指定できます。
>>> add.apply_async((2, 2), queue='lopri', countdown=10)
上記の例では、タスクはlopri
という名前のキューに送信され、メッセージが送信されてから最短で10秒後にタスクが実行されます。
タスクを直接適用すると、現在のプロセスでタスクが実行されるため、メッセージは送信されません。
>>> add(2, 2)
4
delay()
、apply_async()
、および適用(__call__
)の3つのメソッドは、署名にも使用されるCelery呼び出しAPIを構成します。
Calling APIの詳細な概要は、 Calling User Guide にあります。
すべてのタスク呼び出しには一意の識別子(UUID)が与えられます–これはタスクIDです。
delay
およびapply_async
メソッドは、@AsyncResult
インスタンスを返します。これは、タスクの実行状態を追跡するために使用できます。 ただし、このためには、結果バックエンドを有効にして、状態をどこかに保存できるようにする必要があります。
すべてのアプリケーションに適した結果バックエンドがないため、結果はデフォルトで無効になっています。 いずれかを選択するには、個々のバックエンドの欠点を考慮する必要があります。 多くのタスクでは、戻り値を保持することはあまり有用ではないため、デフォルトで設定するのが賢明です。 また、結果のバックエンドはタスクとワーカーの監視には使用されないことに注意してください。そのため、Celeryは専用のイベントメッセージを使用します(監視および管理ガイドを参照)。
結果バックエンドを構成している場合は、タスクの戻り値を取得できます。
>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4
id
属性を確認すると、タスクのIDを見つけることができます。
>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114
タスクで例外が発生した場合は、例外とトレースバックを検査することもできます。実際、result.get()
はデフォルトでエラーを伝播します。
>>> res = add.delay(2, '2')
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/result.py", line 221, in get
return self.backend.wait_for_pending(
File "celery/backends/asynchronous.py", line 195, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "celery/result.py", line 333, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "celery/result.py", line 326, in throw
self.on_ready.throw(*args, **kwargs)
File "vine/promises.py", line 244, in throw
reraise(type(exc), exc, tb)
File "vine/five.py", line 195, in reraise
raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'
エラーが伝播したくない場合は、propagate
を渡すことで無効にできます。
>>> res.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")
この場合、代わりに発生した例外インスタンスが返されます。そのため、タスクが成功したか失敗したかを確認するには、結果インスタンスで対応するメソッドを使用する必要があります。
>>> res.failed()
True
>>> res.successful()
False
では、タスクが失敗したかどうかをどうやって知るのでしょうか? タスク状態を見るとわかります。
>>> res.state
'FAILURE'
タスクは単一の状態にすることしかできませんが、複数の状態を経て進行することができます。 典型的なタスクの段階は次のとおりです。
PENDING -> STARTED -> SUCCESS
開始状態は、:setting: `task_track_started` 設定が有効になっている場合、または@task(track_started=True)
オプションがタスクに設定されている場合にのみ記録される特別な状態です。
保留中の状態は実際には記録された状態ではなく、不明なタスクIDのデフォルトの状態です。これは次の例からわかります。
>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'
タスクが再試行されると、ステージはさらに複雑になる可能性があります。 実例として、2回再試行されたタスクの場合、ステージは次のようになります。
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
タスクの状態の詳細については、タスクユーザーガイドの状態セクションを参照してください。
呼び出しタスクについては、呼び出しガイドで詳しく説明されています。
Canvas :ワークフローの設計
あなたはtasks delay
メソッドを使用してタスクを呼び出す方法を学びました、そしてこれはしばしばあなたが必要とするすべてです。 ただし、タスク呼び出しの署名を別のプロセスに渡したり、別の関数の引数として渡したりする場合があります。Celeryは署名と呼ばれるものを使用します。
シグニチャは、単一のタスク呼び出しの引数と実行オプションをラップして、関数に渡したり、シリアル化してネットワーク経由で送信したりできるようにします。
引数(2, 2)
を使用して、add
タスクの署名を作成し、次のように10秒のカウントダウンを行うことができます。
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
スター引数を使用したショートカットもあります。
>>> add.s(2, 2)
tasks.add(2, 2)
そして、APIの呼び出しが再びあります…
署名インスタンスは呼び出しAPIもサポートします。つまり、delay
メソッドとapply_async
メソッドがあります。
ただし、署名にすでに引数署名が指定されている場合があるという違いがあります。 add
タスクは2つの引数を取るため、2つの引数を指定する署名は、完全な署名になります。
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4
ただし、不完全な署名を作成して、 partial と呼ばれるものを作成することもできます。
# incomplete partial: add(?, 2)
>>> s2 = add.s(2)
s2
は部分的な署名になり、完全にするために別の引数が必要になりました。これは、署名を呼び出すときに解決できます。
# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10
ここでは、既存の引数2の前に追加された引数8を追加して、add(8, 2)
の完全な署名を形成しました。
キーワード引数は後で追加することもできます。 次に、これらは既存のキーワード引数とマージされますが、新しい引数が優先されます。
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False) # debug is now False.
前述のように、署名は呼び出し元のAPIをサポートします。
sig.apply_async(args=(), kwargs={}, **options)
オプションの部分引数と部分キーワード引数を使用して署名を呼び出します。 部分実行オプションもサポートします。
sig.delay(*args, **kwargs)
apply_async
のスター引数バージョン。 引数は署名の引数の前に追加され、キーワード引数は既存のキーとマージされます。
これはすべて非常に便利なようですが、実際にこれらを使用して何ができるでしょうか。 そのためには、キャンバスプリミティブを紹介する必要があります…
プリミティブ
これらのプリミティブはそれ自体が署名オブジェクトであるため、さまざまな方法で組み合わせて複雑なワークフローを構成できます。
ノート
これらの例は結果を取得するため、それらを試すには、結果バックエンドを構成する必要があります。 上記のサンプルプロジェクトはすでにそれを行っています(Celery
のバックエンド引数を参照)。
いくつかの例を見てみましょう。
グループ
group
は、タスクのリストを並行して呼び出し、結果をグループとして検査し、戻り値を順番に取得できる特別な結果インスタンスを返します。
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- 部分的なグループ
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
チェーン
タスクをリンクして、一方のタスクが戻った後、もう一方のタスクが呼び出されるようにすることができます。
>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
または部分的なチェーン:
>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
チェーンは次のように書くこともできます:
>>> (add.s(4, 4) | mul.s(8))().get()
64
和音
コードは、コールバックを持つグループです。
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90
別のタスクにチェーンされたグループは、自動的にコードに変換されます。
>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90
これらのプリミティブはすべてシグニチャタイプであるため、たとえば次のように、ほぼ必要に応じて組み合わせることができます。
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
Canvas ユーザーガイドで、ワークフローの詳細を必ずお読みください。
ルーティング
Celeryは、AMQPが提供するすべてのルーティング機能をサポートしますが、メッセージが名前付きキューに送信される単純なルーティングもサポートします。
:setting: `task_routes` 設定を使用すると、タスクを名前でルーティングし、すべてを1か所に集中させることができます。
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
[X53X] のqueue
引数を使用して、実行時にキューを指定することもできます。
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')
次に、celery worker -Q
オプションを指定して、ワーカーにこのキューから消費させることができます。
$ celery -A proj worker -Q hipri
コンマ区切りのリストを使用して、複数のキューを指定できます。 たとえば、ワーカーにデフォルトキューとhipri
キューの両方から消費させることができます。ここで、デフォルトキューの名前は歴史的な理由からcelery
です。
$ celery -A proj worker -Q hipri,celery
ワーカーはキューに等しい重みを与えるため、キューの順序は重要ではありません。
AMQPルーティングのフルパワーの利用を含むルーティングの詳細については、ルーティングガイドを参照してください。
リモコン
ブローカーとしてRabbitMQ(AMQP)、Redis、またはQpidを使用している場合は、実行時にワーカーを制御および検査できます。
たとえば、ワーカーが現在取り組んでいるタスクを確認できます。
$ celery -A proj inspect active
これはブロードキャストメッセージングを使用して実装されるため、すべてのリモートコントロールコマンドはクラスター内のすべてのワーカーによって受信されます。
--destination
オプションを使用して、リクエストに対応する1つ以上のワーカーを指定することもできます。 これは、ワーカーホスト名のコンマ区切りのリストです。
$ celery -A proj inspect active [email protected]
宛先が指定されていない場合、すべてのワーカーが行動し、要求に応答します。
celery inspect コマンドには、ワーカー内で何も変更しないコマンドが含まれています。 ワーカー内で何が起こっているかに関する情報と統計のみを返します。 実行できる検査コマンドのリストについては、以下を参照してください。
$ celery -A proj inspect --help
次に、 celery control コマンドがあります。これには、実行時にワーカー内の処理を実際に変更するコマンドが含まれています。
$ celery -A proj control --help
たとえば、ワーカーにイベントメッセージ(タスクとワーカーの監視に使用)を有効にするように強制できます。
$ celery -A proj control enable_events
イベントが有効になっている場合は、イベントダンパーを起動して、ワーカーが何をしているかを確認できます。
$ celery -A proj events --dump
または、cursesインターフェイスを開始できます。
$ celery -A proj events
監視が終了したら、イベントを再度無効にすることができます。
$ celery -A proj control disable_events
celery status コマンドもリモート制御コマンドを使用し、クラスター内のオンラインワーカーのリストを表示します。
$ celery -A proj status
celery コマンドと監視の詳細については、監視ガイドを参照してください。
タイムゾーン
内部およびメッセージ内のすべての時刻と日付は、UTCタイムゾーンを使用します。
ワーカーは、たとえばカウントダウンが設定されたメッセージを受信すると、そのUTC時刻を現地時間に変換します。 システムタイムゾーンとは異なるタイムゾーンを使用する場合は、:setting: `timezone` 設定を使用して構成する必要があります。
app.conf.timezone = 'Europe/London'
最適化
デフォルトの構成は、スループットに対して最適化されていません。 デフォルトでは、多くの短いタスクと少数の長いタスクの中間を歩こうとします。これは、スループットと公平なスケジューリングの間の妥協点です。
厳密な公平なスケジューリング要件がある場合、またはスループットを最適化したい場合は、最適化ガイドをお読みください。