監視および管理ガイド—Pythonドキュメント
監視および管理ガイド
序章
Celeryクラスターを監視および検査するために使用できるツールがいくつかあります。
このドキュメントでは、これらのいくつかと、イベントやブロードキャストコマンドなどの監視に関連する機能について説明します。
労働者
管理コマンドラインユーティリティ(inspect / control)
celery は、ワーカーノード(およびある程度のタスク)の検査と管理にも使用できます。
使用可能なすべてのコマンドを一覧表示するには、次のようにします。
$ celery --help
または、特定のコマンドのヘルプを取得するには、次のようにします。
$ celery <command> --help
コマンド
shell :Pythonシェルにドロップします。
ローカルには
celery
変数が含まれます。これは現在のアプリです。 また、すべての既知のタスクがローカルに自動的に追加されます(--without-tasks
フラグが設定されている場合を除く)。インストールされている場合は、:pypi: `Ipython` 、:pypi:` bpython` 、または通常の python をこの順序で使用します。
--ipython
、--bpython
、または--python
を使用して実装を強制できます。status :このクラスター内のアクティブなノードを一覧表示します
$ celery -A proj status
結果:タスクの結果を表示します
$ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
タスクがカスタム結果バックエンドを使用しない限り、タスクの名前を省略できることに注意してください。
パージ:構成されているすべてのタスクキューからメッセージをパージします。
このコマンドは、:setting: `CELERY_QUEUES` 設定で構成されたキューからすべてのメッセージを削除します。
警告
この操作は元に戻すことはできず、メッセージは完全に削除されます。
$ celery -A proj purge
-Q オプションを使用して、パージするキューを指定することもできます。
$ celery -A proj purge -Q celery,foo,bar
-X オプションを使用して、キューがパージされないようにします。
$ celery -A proj purge -X celery
アクティブな検査:アクティブなタスクを一覧表示します
$ celery -A proj inspect active
これらは、現在実行されているすべてのタスクです。
スケジュールされた検査:スケジュールされたETAタスクを一覧表示します
$ celery -A proj inspect scheduled
これらは、 eta または countdown 引数が設定されている場合にワーカーによって予約されるタスクです。
予約済みの検査:予約済みタスクを一覧表示します
$ celery -A proj inspect reserved
これにより、ワーカーによってプリフェッチされ、現在実行を待機しているすべてのタスクが一覧表示されます(ETA値が設定されているタスクは含まれません)。
取り消されたタスクの検査:取り消されたタスクの履歴を一覧表示します
$ celery -A proj inspect revoked
登録済みの検査:登録済みタスクを一覧表示します
$ celery -A proj inspect registered
統計の検査:ワーカー統計を表示します(統計を参照)
$ celery -A proj inspect stats
query_taskの検査:タスクに関する情報をIDで表示します。
このIDのセットに予約済み/アクティブのタスクがあるワーカーは、ステータスと情報で応答します。
$ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
複数のタスクに関する情報を照会することもできます。
$ celery -A proj inspect query_task id1 id2 ... idN
control enable_events :イベントを有効にします
$ celery -A proj control enable_events
control disable_events :イベントを無効にします
$ celery -A proj control disable_events
mergerate :あるブローカーから別のブローカーにタスクを移行します( EXPERIMENTAL )。
$ celery -A proj migrate redis://localhost amqp://localhost
このコマンドは、あるブローカーのすべてのタスクを別のブローカーに移行します。 このコマンドは新しく実験的なものなので、先に進む前に必ずデータのバックアップをとっておく必要があります。
ノート
すべてのinspect
およびcontrol
コマンドは、--timeout
引数をサポートします。これは、応答を待機する秒数です。 待ち時間が原因で応答がない場合は、このタイムアウトを増やす必要がある場合があります。
宛先ノードの指定
デフォルトでは、検査および制御コマンドはすべてのワーカーで機能します。 --destination
引数を使用して、単一またはワーカーのリストを指定できます。
$ celery -A proj inspect -d [email protected],[email protected] reserved
$ celery -A proj control -d [email protected],[email protected] enable_events
花:リアルタイムセロリウェブモニター
Flowerは、Celery用のリアルタイムWebベースのモニターおよび管理ツールです。 活発に開発中ですが、すでに不可欠なツールです。 Celeryの推奨モニターであるため、Django-Adminモニター、celerymon
、およびncurses
ベースのモニターは廃止されます。
花は「流れ」のように発音されますが、必要に応じて植物バージョンを使用することもできます。
特徴
セロリイベントを使用したリアルタイム監視
タスクの進行状況と履歴
タスクの詳細(引数、開始時間、実行時間など)を表示する機能
グラフと統計
リモコン
ワーカーのステータスと統計を表示する
ワーカーインスタンスをシャットダウンして再起動します
ワーカープールのサイズと自動スケール設定を制御する
ワーカーインスタンスが消費するキューを表示および変更する
現在実行中のタスクを表示する
スケジュールされたタスクの表示(ETA /カウントダウン)
予約済みおよび取り消されたタスクを表示する
時間とレートの制限を適用する
構成ビューア
タスクを取り消すか終了する
HTTP API
労働者をリストする
ワーカーをシャットダウンします
ワーカーのプールを再起動します
労働者のプールを育てる
労働者のプールを縮小する
オートスケールワーカープール
キューから消費を開始します
キューからの消費を停止します
タスクを一覧表示する
(表示された)タスクタイプのリスト
タスク情報を取得する
タスクを実行する
名前でタスクを実行する
タスクの結果を取得する
タスクのソフトおよびハードの時間制限を変更する
タスクのレート制限の変更
タスクを取り消す
OpenID認証
スクリーンショット
その他のスクリーンショット:
使用法
pipを使用してFlowerをインストールできます。
$ pip install flower
flowerコマンドを実行すると、アクセスできるWebサーバーが起動します。
$ celery -A proj flower
デフォルトのポートは http:// localhost:5555 ですが、 –port 引数を使用してこれを変更できます。
$ celery -A proj flower --port=5555
ブローカーのURLは、--broker
引数を介して渡すこともできます。
$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0
次に、Webブラウザでflowerにアクセスできます。
$ open http://localhost:5555
Flowerには、認証オプションなど、ここで詳しく説明するよりも多くの機能があります。 詳細については、公式ドキュメントをご覧ください。
セロリイベント:Curses Monitor
バージョン2.0の新機能。
セロリイベントは、タスクとワーカーの履歴を表示するシンプルな呪いモニターです。 タスクの結果とトレースバックを検査できます。また、レート制限やワーカーのシャットダウンなどのいくつかの管理コマンドもサポートしています。 このモニターは概念実証として開始されたものであり、代わりにFlowerを使用することをお勧めします。
起動:
$ celery -A proj events
次のような画面が表示されます。
セロリイベントは、スナップショットカメラの起動にも使用されます(スナップショットを参照:
$ celery -A proj events --camera=<camera-class> --frequency=1.0
また、イベントをstdout
にダンプするツールが含まれています。
$ celery -A proj events --dump
オプションの完全なリストについては、--help
を使用してください。
$ celery events --help
RabbitMQ
Celeryクラスターを管理するには、RabbitMQを監視する方法を知ることが重要です。
RabbitMQには rabbitmqctl(1)コマンドが付属しています。これにより、キュー、交換、バインディング、キューの長さ、各キューのメモリ使用量を一覧表示したり、ユーザー、仮想ホスト、およびそれらの権限を管理したりできます。
ノート
これらの例では、デフォルトの仮想ホスト("/"
)が使用されています。カスタム仮想ホストを使用する場合は、コマンドに-p
引数を追加する必要があります(例:rabbitmqctl list_queues -p my_vhost …
)。
キューの検査
キュー内のタスクの数を見つける:
$ rabbitmqctl list_queues name messages messages_ready \
messages_unacknowledged
ここで、 messages_ready は、配信の準備ができている(送信されたが受信されていない)メッセージの数です。 messages_unacknowledged は、ワーカーによって受信されたがまだ確認されていない(つまり、進行中、または予約済み)。 messages は、準備完了メッセージと未確認メッセージの合計です。
キューから現在消費しているワーカーの数を見つける:
$ rabbitmqctl list_queues name consumers
キューに割り当てられたメモリの量を見つける:
$ rabbitmqctl list_queues name memory
- ヒント
-q
オプションを rabbitmqctl(1)に追加すると、出力の解析が容易になります。
Redis
ブローカーとしてRedisを使用している場合は、 redis-cli(1)コマンドを使用してCeleryクラスターを監視し、キューの長さを一覧表示できます。
キューの検査
キュー内のタスクの数を見つける:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
デフォルトのキューの名前は celery です。 使用可能なすべてのキューを取得するには、次を呼び出します。
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*
ノート
キューキーは、タスクが含まれている場合にのみ存在するため、キーが存在しない場合は、単にそのキューにメッセージがないことを意味します。 これは、Redisでは、要素が含まれていないリストが自動的に削除されるため、 keys コマンド出力に表示されず、そのリストの llen が0を返すためです。
また、Redisを他の目的で使用している場合、 keys コマンドの出力には、データベースに保存されている無関係な値が含まれます。 これを回避するための推奨される方法は、Celery専用の DATABASE_NUMBER を使用することです。データベース番号を使用して、Celeryアプリケーションを相互に分離することもできます(仮想ホスト)。ただし、これは、例Floweras Redis pub / subコマンドは、データベースベースではなくグローバルです。
ムニン
これは、Celeryクラスターを保守するときに役立つ既知のMuninプラグインのリストです。
rabbitmq-munin
:RabbitMQ用のMuninプラグイン。celery_tasks
:各タスクタイプが実行された回数を監視します( celerymon が必要です)。celery_tasks_states
:各状態のタスクの数を監視します( celerymon が必要です)。
イベント
ワーカーには、何らかのイベントが発生するたびにメッセージを送信する機能があります。 これらのイベントは、Flowerやセロリイベントなどのツールによってキャプチャされ、クラスターを監視します。
スナップショット
バージョン2.1の新機能。
1人のワーカーでも大量のイベントを生成できるため、すべてのイベントの履歴をディスクに保存すると非常にコストがかかる場合があります。
一連のイベントは、その期間のクラスター状態を記述します。この状態のスナップショットを定期的に取得することで、すべての履歴を保持できますが、それでも定期的にディスクに書き込むだけです。
スナップショットを作成するには、Cameraクラスが必要です。これにより、状態がキャプチャされるたびに何が発生するかを定義できます。 データベースに書き込んだり、電子メールなどで完全に送信したりできます。
セロリイベントは、カメラでスナップショットを撮るために使用されます。たとえば、カメラmyapp.Camera
を使用して2秒ごとに状態をキャプチャする場合は、セロリイベントを実行します。次の引数:
$ celery -A proj events -c myapp.Camera --frequency=2.0
カスタムカメラ
カメラは、イベントをキャプチャし、それらのイベントを一定の間隔で処理する必要がある場合に役立ちます。 リアルタイムイベント処理の場合は、リアルタイム処理のように、@events.Receiver
を直接使用する必要があります。
スナップショットを画面にダンプするカメラの例を次に示します。
from pprint import pformat
from celery.events.snapshot import Polaroid
class DumpCam(Polaroid):
clear_after = True # clear after flush (incl, state.event_count).
def on_shutter(self, state):
if not state.event_count:
# No new events since last snapshot.
return
print('Workers: {0}'.format(pformat(state.workers, indent=4)))
print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
print('Total: {0.event_count} events, {0.task_count} tasks'.format(
state))
状態オブジェクトの詳細については、celery.events.state
のAPIリファレンスを参照してください。
これで、-c
オプションで指定することにより、セロリイベントでこのカムを使用できます。
$ celery -A proj events -c myapp.DumpCam --frequency=2.0
または、次のようにプログラムで使用できます。
from celery import Celery
from myapp import DumpCam
def main(app, freq=1.0):
state = app.events.State()
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={'*': state.event})
with DumpCam(state, freq=freq):
recv.capture(limit=None, timeout=None)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
main(app)
リアルタイム処理
イベントをリアルタイムで処理するには、次のものが必要です。
イベントコンシューマー(これは
Receiver
です)イベントが発生したときに呼び出されるハンドラーのセット。
イベントタイプごとに異なるハンドラーを使用することも、キャッチオールハンドラーを使用することもできます( '*')
状態(オプション)
@events.State
は、クラスター内のタスクとワーカーの便利なメモリ内表現であり、イベントが発生すると更新されます。ワーカーがまだ生きているかどうかを確認する(ハートビートを確認する)、イベントが発生したときにイベントフィールドをマージする、タイムスタンプが同期していることを確認するなど、多くの一般的なソリューションをカプセル化します。
これらを組み合わせると、イベントをリアルタイムで簡単に処理できます。
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
ノート
capture
のwakeup
引数は、すべてのワーカーに信号を送信して、ハートビートを送信するように強制します。 このようにして、モニターの起動時にワーカーをすぐに確認できます。
ハンドラーを指定することにより、特定のイベントをリッスンできます。
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
イベントリファレンス
このリストには、ワーカーによって送信されたイベントとその引数が含まれています。
タスクイベント
タスク送信
- サイン
task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)
タスクメッセージが公開され、:setting: `task_send_sent_event` 設定が有効になっている場合に送信されます。
タスク受信
- サイン
task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)
ワーカーがタスクを受け取ったときに送信されます。
タスク開始
- サイン
task-started(uuid, hostname, timestamp, pid)
ワーカーがタスクを実行する直前に送信されます。
タスク-成功
- サイン
task-succeeded(uuid, result, runtime, hostname, timestamp)
タスクが正常に実行された場合に送信されます。
実行時は、プールを使用してタスクを実行するのにかかった時間です。 (タスクから開始してワーカープールに送信され、プール結果ハンドラーのコールバックが呼び出されたときに終了します)。
タスク失敗
- サイン
task-failed(uuid, exception, traceback, hostname, timestamp)
タスクの実行が失敗した場合に送信されます。
タスクが拒否されました
- サイン
task-rejected(uuid, requeued)
タスクはワーカーによって拒否されました。おそらく、再キューイングされるか、デッドレターキューに移動される可能性があります。
タスクが取り消されました
- サイン
task-revoked(uuid, terminated, signum, expired)
タスクが取り消された場合に送信されます(これは複数のワーカーによって送信される可能性があることに注意してください)。
- *; タスクプロセスが終了した場合、
terminated
はtrueに設定されます。signum
フィールドは使用される信号に設定されます。
- タスクの有効期限が切れた場合、
expired
はtrueに設定されます。
タスク-再試行
- サイン
task-retried(uuid, exception, traceback, hostname, timestamp)
タスクが失敗した場合に送信されますが、将来再試行されます。
労働者のイベント
労働者-オンライン
- サイン
worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
ワーカーはブローカーに接続し、オンラインになっています。
- hostname :ワーカーのノード名。
- タイムスタンプ:イベントのタイムスタンプ。
- freq :秒単位のハートビート周波数(フロート)。
- sw_ident :ワーカーソフトウェアの名前(例:
py-celery
)。 - sw_ver :ソフトウェアバージョン(例:2.2.0)。
- sw_sys :オペレーティングシステム(例:Linux / Darwin)。
労働者の鼓動
- サイン
worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)
毎分送信され、ワーカーが2分以内にハートビートを送信しなかった場合、オフラインであると見なされます。
- hostname :ワーカーのノード名。
- タイムスタンプ:イベントのタイムスタンプ。
- freq :秒単位のハートビート周波数(フロート)。
- sw_ident :ワーカーソフトウェアの名前(例:
py-celery
)。 - sw_ver :ソフトウェアバージョン(例:2.2.0)。
- sw_sys :オペレーティングシステム(例:Linux / Darwin)。
- active :現在実行中のタスクの数。
- 処理済み:このワーカーによって処理されたタスクの総数。
労働者-オフライン
- サイン
worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
ワーカーがブローカーから切断されました。