拡張機能とブートステップ—Pythonドキュメント
拡張機能とブートステップ
カスタムメッセージコンシューマー
メッセージを手動で処理するために、カスタムの昆布消費者を埋め込むことをお勧めします。
そのために、特別なConsumerStep
ブートステップクラスが存在します。このクラスでは、get_consumers
メソッドを定義するだけで、接続が確立されるたびに開始するkombu.Consumer
オブジェクトのリストを返す必要があります。 :
ノート
昆布消費者は、2つの異なるメッセージコールバックディスパッチングメカニズムを利用できます。 1つ目は(body, message)
署名付きのコールバックのリストを受け入れるcallbacks
引数で、2つ目は(message,)
署名。 後者は、ペイロードを自動的にデコードおよび逆シリアル化しません。
設計図
ブートステップは、ワーカーに機能を追加するための手法です。 ブートステップは、ワーカーのさまざまな段階でカスタムアクションを実行するためのフックを定義するカスタムクラスです。 すべてのブートステップはブループリントに属し、ワーカーは現在、 Worker と Consumer の2つのブループリントを定義しています。
- 図A:ワーカーとコンシューマーのブループリントのブートステップ。 起動
- ワーカーブループリントの最初のステップは下から順にタイマーであり、最後のステップはコンシューマーブループリントを開始することです。これにより、ブローカー接続が確立され、メッセージの消費が開始されます。
ワーカー
ワーカーは最初に開始する青写真であり、イベントループ、処理プール、ETAタスクやその他の時間指定イベントに使用されるタイマーなどの主要なコンポーネントを開始します。
ワーカーが完全に開始されると、タスクの実行方法を設定し、ブローカーに接続してメッセージコンシューマーを開始するコンシューマーブループリントを続行します。
WorkController
はコアワーカーの実装であり、ブートステップで使用できるいくつかのメソッドと属性が含まれています。
属性
- app
- 現在のアプリインスタンス。
- hostname
- ワーカーノード名(例: worker1@example.com )
- blueprint
- これはワーカー
Blueprint
です。
- hub
イベントループオブジェクト(
Hub
)。 これを使用して、イベントループにコールバックを登録できます。これは、非同期I / O対応のトランスポート(amqp、redis)でのみサポートされます。この場合、 worker.use_eventloop 属性を設定する必要があります。
これを使用するには、ワーカーブートステップでハブブートステップが必要です。
- pool
現在のプロセス/イベントレット/ gevent /スレッドプール。
celery.concurrency.base.BasePool
を参照してください。ワーカーブートステップでは、これを使用するためにプールブートステップが必要です。
- timer
Timer
は機能のスケジュールに使用されます。ワーカーブートステップでは、これを使用するためにタイマーブートステップが必要です。
- statedb
Database <celery.worker.state.Persistent>`
は、ワーカーの再起動間で状態を保持します。これは、
statedb
引数が有効になっている場合にのみ定義されます。これを使用するには、ワーカーブートステップで
Statedb
ブートステップが必要です。
- autoscaler
Autoscaler
は、プール内のプロセスの数を自動的に増減するために使用されます。これは、
autoscale
引数が有効になっている場合にのみ定義されます。これを使用するには、ワーカーのブートステップで Autoscaler ブートステップが必要です。
- autoreloader
Autoreloader
は、ファイルシステムが変更されたときに使用コードを自動的に再読み込みするために使用されます。これは、
autoreload
引数が有効になっている場合にのみ定義されます。 これを使用するには、ワーカーのブートステップで Autoreloader ブートステップが必要です。
ワーカーのブートステップの例
ワーカーのブートステップの例は次のとおりです。
すべてのメソッドには、現在のWorkController
インスタンスが最初の引数として渡されます。
別の例では、タイマーを使用して定期的にウェイクアップできます。
タスク処理ログのカスタマイズ
Celeryワーカーは、タスクのライフサイクル全体を通じてさまざまなイベントのメッセージをPythonロギングサブシステムに送信します。 これらのメッセージは、celery/app/trace.py
で定義されているLOG_<TYPE>
形式の文字列を上書きすることでカスタマイズできます。 例えば:
さまざまなフォーマット文字列はすべて、%
フォーマットのタスク名とIDで提供され、それらの一部は、タスクが失敗する原因となった戻り値や例外などの追加フィールドを受け取ります。 これらのフィールドは、次のようなカスタム形式の文字列で使用できます。
消費者
コンシューマーブループリントはブローカーへの接続を確立し、この接続が失われるたびに再起動されます。 コンシューマーのブートステップには、ワーカーハートビート、リモートコントロールコマンドコンシューマー、そして重要なことに、タスクコンシューマーが含まれます。
コンシューマブートステップを作成するときは、ブループリントを再開できる必要があることを考慮に入れる必要があります。 追加の「shutdown」メソッドがコンシューマーブートステップ用に定義されています。このメソッドは、ワーカーがシャットダウンされたときに呼び出されます。
属性
- app
- 現在のアプリインスタンス。
- controller
- このコンシューマーを作成した親
@WorkController
オブジェクト。
- hostname
- ワーカーノード名(例: worker1@example.com )
- blueprint
- これはワーカー
Blueprint
です。
- hub
イベントループオブジェクト(
Hub
)。 これを使用して、イベントループにコールバックを登録できます。これは、非同期I / O対応のトランスポート(amqp、redis)でのみサポートされます。この場合、 worker.use_eventloop 属性を設定する必要があります。
これを使用するには、ワーカーブートステップでハブブートステップが必要です。
- connection
現在のブローカー接続(
kombu.Connection
)。コンシューマーブートステップでは、これを使用するために「接続」ブートステップが必要です。
- event_dispatcher
イベントの送信に使用できる
@events.Dispatcher
オブジェクト。コンシューマーブートステップでは、これを使用するために Events ブートステップが必要です。
- gossip
ワーカー間ブロードキャスト通信(
Gossip
)。コンシューマーブートステップでは、これを使用するために Gossip ブートステップが必要です。
コールバック
<set> gossip.on.node_join
新しいノードがクラスターに参加するたびに呼び出され、
Worker
インスタンスを提供します。<set> gossip.on.node_leave
新しいノードがクラスターを離れる(シャットダウンする)たびに呼び出され、
Worker
インスタンスを提供します。<set> gossip.on.node_lost
クラスター内のワーカーインスタンスのハートビートが失われた場合(ハートビートが時間内に受信または処理されなかった場合)に呼び出され、
Worker
インスタンスを提供します。これは必ずしもワーカーが実際にオフラインであることを意味するわけではないため、デフォルトのハートビートタイムアウトでは不十分な場合は、タイムアウトメカニズムを使用してください。
- pool
- 現在のプロセス/イベントレット/ gevent /スレッドプール。
celery.concurrency.base.BasePool
を参照してください。
- timer
Timer <celery.utils.timer2.Schedule
は機能のスケジュールに使用されます。
- heart
ワーカーイベントのハートビート(
Heart
)の送信を担当します。これを使用するには、コンシューマーブートステップで Heart ブートステップが必要です。
- task_consumer
タスクメッセージを消費するために使用される
kombu.Consumer
オブジェクト。これを使用するには、コンシューマブートステップで Tasks ブートステップが必要です。
- strategies
登録されたすべてのタスクタイプには、このマッピングにエントリがあり、この値は、このタスクタイプの着信メッセージを実行するために使用されます(タスク実行戦略)。 このマッピングは、コンシューマーが開始したときにタスクのブートステップによって生成されます。
これを使用するには、コンシューマブートステップで Tasks ブートステップが必要です。
- task_buckets
タイプごとにタスクのレート制限を検索するために使用される
defaultdict
。 このdictのエントリは、None(制限なし)またはconsume(tokens)
およびexpected_time(tokens)
を実装するTokenBucket
インスタンスの場合があります。TokenBucketはトークンバケットアルゴリズムを実装しますが、同じインターフェースに準拠し、上記の2つの方法を定義する限り、任意のアルゴリズムを使用できます。
- qos
QoS
オブジェクトを使用して、タスクチャネルの現在のprefetch_count値を変更できます。
メソッド
- consumer.reset_rate_limits()
- 登録されているすべてのタスクタイプの
task_buckets
マッピングを更新します。
- consumer.bucket_for_task(type, Bucket=TokenBucket)
task.rate_limit
属性を使用して、タスクのレート制限バケットを作成します。
- consumer.add_task_queue(name, exchange=None, exchange_type=None,
routing_key=None, \*\*options):
- 消費する新しいキューを追加します。 これは、接続を再開しても持続します。
- consumer.cancel_task_queue(name)
- 名前によるキューからの消費を停止します。 これは、接続を再開しても持続します。
- apply_eta_task(request)
request.eta
属性に基づいて実行するETAタスクをスケジュールします。 (Request
)
ブートステップのインストール
app.steps['worker']
およびapp.steps['consumer']
を変更して、新しいブートステップを追加できます。
ステップの順序は、結果の依存関係グラフ(Step.requires
)によって決定されるため、ここでは重要ではありません。
ブートステップをインストールする方法とその動作を説明するために、これはいくつかの役に立たないデバッグ情報を出力するステップの例です。 ワーカーとコンシューマーの両方のブートステップとして追加できます。
このステップをインストールしてワーカーを起動すると、次のログが表示されます。
print
ステートメントは、ワーカーが初期化された後にロギングサブシステムにリダイレクトされるため、「開始中」の行にはタイムスタンプが付けられます。 これは、シャットダウン時に発生しなくなったことに気付くかもしれません。これは、stop
メソッドとshutdown
メソッドがシグナルハンドラー内で呼び出され、ロギングを使用するのは安全ではないためです。そのようなハンドラーの内部。 Pythonロギングモジュールを使用したロギングは reentrant ではありません。つまり、関数を中断して後で再度呼び出すことはできません。 作成するstop
およびshutdown
メソッドもリエントラントであることが重要です。
--loglevel=debug
でワーカーを起動すると、起動プロセスに関する詳細情報が表示されます。
コマンドラインプログラム
新しいコマンドラインオプションの追加
コマンド固有のオプション
アプリケーションインスタンスの@user_options
属性を変更することにより、worker
、beat
、およびevents
コマンドにコマンドラインオプションを追加できます。
Celeryコマンドは、click
モジュールを使用してコマンドライン引数を解析するため、カスタム引数を追加するには、click.Option
インスタンスを関連するセットに追加する必要があります。
celeryworker コマンドにカスタムオプションを追加する例:
すべてのブートステップは、Bootstep.__init__
へのキーワード引数としてこの引数を受け取るようになります。
プリロードオプション
celery アンブレラコマンドは、「プリロードオプション」の概念をサポートしています。 これらは、すべてのサブコマンドに渡される特別なオプションです。
たとえば、構成テンプレートを指定するために、新しいプリロードオプションを追加できます。
新しいセロリサブコマンドの追加
setuptoolsエントリポイントを使用して、 celery アンブレラコマンドに新しいコマンドを追加できます。
エントリポイントは、パッケージsetup.py
プログラムに追加できる特別なメタデータであり、インストール後、pkg_resources
モジュールを使用してシステムから読み取ることができます。
Celeryは、celery.commands
エントリポイントを認識して、追加のサブコマンドをインストールします。エントリポイントの値は、有効なクリックコマンドを指している必要があります。
これは、:pypi: `Flower` 監視拡張機能が、setup.py
にエントリポイントを追加することにより、 celery flower コマンドを追加する方法です。
コマンド定義は、等号で区切られた2つの部分に分かれています。最初の部分はサブコマンド(花)の名前で、2番目の部分はコマンドを実装する関数への完全修飾シンボルパスです。
モジュールパスと属性の名前は、上記のようにコロンで区切る必要があります。
モジュールflower/command.py
では、コマンド関数は次のように定義できます。
ワーカーAPI
Hub-ワーカー非同期イベントループ
- サポートされているトランスポート
- amqp、redis
バージョン3.0の新機能。
amqpまたはredisブローカートランスポートが使用される場合、ワーカーは非同期I / Oを使用します。 最終的な目標は、すべてのトランスポートがイベントループを使用することですが、それには時間がかかるため、他のトランスポートは引き続きスレッドベースのソリューションを使用します。
- hub.add(fd, callback, flags)
- hub.add_reader(fd, callback, \*args)
fd
が読み取り可能になったときに呼び出されるコールバックを追加します。hub.remove(fd)を使用して明示的に削除されるか、ファイル記述子が無効になったために自動的に破棄されるまで、コールバックは登録されたままになります。
特定のファイル記述子に一度に登録できるコールバックは1つだけであるため、
add
をもう一度呼び出すと、そのファイル記述子に以前に登録されたコールバックが削除されることに注意してください。ファイル記述子は、
fileno
メソッドをサポートするファイルのようなオブジェクトです。または、ファイル記述子番号(int)にすることもできます。
- hub.add_writer(fd, callback, \*args)
fd
が書き込み可能であるときに呼び出されるコールバックを追加します。 上記の hub.add_reader()に関する注記も参照してください。
- hub.remove(fd)
- ファイル記述子
fd
のすべてのコールバックをループから削除します。
タイマー-イベントのスケジュール
- timer.call_after(secs, callback, args=(), kwargs=(),
priority=0)
- timer.call_repeatedly(secs, callback, args=(), kwargs=(),
priority=0)
- timer.call_at(eta, callback, args=(), kwargs=(),
priority=0)