監視および管理ガイド—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/userguide/monitoring
移動先:案内検索

監視および管理ガイド

序章

Celeryクラスターを監視および検査するために使用できるツールがいくつかあります。

このドキュメントでは、これらのいくつかと、イベントやブロードキャストコマンドなどの監視に関連する機能について説明します。


労働者

管理コマンドラインユーティリティ(inspect / control)

celery は、ワーカーノード(およびある程度のタスク)の検査と管理にも使用できます。

使用可能なすべてのコマンドを一覧表示するには、次のようにします。

$ celery --help

または、特定のコマンドのヘルプを取得するには、次のようにします。

$ celery <command> --help

コマンド

  • shell :Pythonシェルにドロップします。

    ローカルにはcelery変数が含まれます。これは現在のアプリです。 また、すべての既知のタスクがローカルに自動的に追加されます(--without-tasksフラグが設定されている場合を除く)。

    インストールされている場合は、:pypi: `Ipython`:pypi:` bpython` 、または通常の python をこの順序で使用します。 --ipython--bpython、または--pythonを使用して実装を強制できます。

  • status :このクラスター内のアクティブなノードを一覧表示します

    $ celery -A proj status
  • 結果:タスクの結果を表示します

    $ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577

    タスクがカスタム結果バックエンドを使用しない限り、タスクの名前を省略できることに注意してください。

  • パージ:構成されているすべてのタスクキューからメッセージをパージします。

    このコマンドは、:setting: `CELERY_QUEUES` 設定で構成されたキューからすべてのメッセージを削除します。

    警告

    この操作は元に戻すことはできず、メッセージは完全に削除されます。

    $ celery -A proj purge

    -Q オプションを使用して、パージするキューを指定することもできます。

    $ celery -A proj purge -Q celery,foo,bar

    -X オプションを使用して、キューがパージされないようにします。

    $ celery -A proj purge -X celery
  • アクティブな検査:アクティブなタスクを一覧表示します

    $ celery -A proj inspect active

    これらは、現在実行されているすべてのタスクです。

  • スケジュールされた検査:スケジュールされたETAタスクを一覧表示します

    $ celery -A proj inspect scheduled

    これらは、 eta または countdown 引数が設定されている場合にワーカーによって予約されるタスクです。

  • 予約済みの検査:予約済みタスクを一覧表示します

    $ celery -A proj inspect reserved

    これにより、ワーカーによってプリフェッチされ、現在実行を待機しているすべてのタスクが一覧表示されます(ETA値が設定されているタスクは含まれません)。

  • 取り消されたタスクの検査:取り消されたタスクの履歴を一覧表示します

    $ celery -A proj inspect revoked
  • 登録済みの検査:登録済みタスクを一覧表示します

    $ celery -A proj inspect registered
  • 統計の検査:ワーカー統計を表示します(統計を参照)

    $ celery -A proj inspect stats
  • query_taskの検査:タスクに関する情報をIDで表示します。

    このIDのセットに予約済み/アクティブのタスクがあるワーカーは、ステータスと情報で応答します。

    $ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8

    複数のタスクに関する情報を照会することもできます。

    $ celery -A proj inspect query_task id1 id2 ... idN
  • control enable_events :イベントを有効にします

    $ celery -A proj control enable_events
  • control disable_events :イベントを無効にします

    $ celery -A proj control disable_events
  • mergerate :あるブローカーから別のブローカーにタスクを移行します( EXPERIMENTAL )。

    $ celery -A proj migrate redis://localhost amqp://localhost

    このコマンドは、あるブローカーのすべてのタスクを別のブローカーに移行します。 このコマンドは新しく実験的なものなので、先に進む前に必ずデータのバックアップをとっておく必要があります。

ノート

すべてのinspectおよびcontrolコマンドは、--timeout引数をサポートします。これは、応答を待機する秒数です。 待ち時間が原因で応答がない場合は、このタイムアウトを増やす必要がある場合があります。


宛先ノードの指定

デフォルトでは、検査および制御コマンドはすべてのワーカーで機能します。 --destination引数を使用して、単一またはワーカーのリストを指定できます。

$ celery -A proj inspect -d [email protected],[email protected] reserved

$ celery -A proj control -d [email protected],[email protected] enable_events

花:リアルタイムセロリウェブモニター

Flowerは、Celery用のリアルタイムWebベースのモニターおよび管理ツールです。 活発に開発中ですが、すでに不可欠なツールです。 Celeryの推奨モニターであるため、Django-Adminモニター、celerymon、およびncursesベースのモニターは廃止されます。

花は「流れ」のように発音されますが、必要に応じて植物バージョンを使用することもできます。

特徴

  • セロリイベントを使用したリアルタイム監視

    • タスクの進行状況と履歴

    • タスクの詳細(引数、開始時間、実行時間など)を表示する機能

    • グラフと統計


  • リモコン

    • ワーカーのステータスと統計を表示する

    • ワーカーインスタンスをシャットダウンして再起動します

    • ワーカープールのサイズと自動スケール設定を制御する

    • ワーカーインスタンスが消費するキューを表示および変更する

    • 現在実行中のタスクを表示する

    • スケジュールされたタスクの表示(ETA /カウントダウン)

    • 予約済みおよび取り消されたタスクを表示する

    • 時間とレートの制限を適用する

    • 構成ビューア

    • タスクを取り消すか終了する


  • HTTP API

    • 労働者をリストする

    • ワーカーをシャットダウンします

    • ワーカーのプールを再起動します

    • 労働者のプールを育てる

    • 労働者のプールを縮小する

    • オートスケールワーカープール

    • キューから消費を開始します

    • キューからの消費を停止します

    • タスクを一覧表示する

    • (表示された)タスクタイプのリスト

    • タスク情報を取得する

    • タスクを実行する

    • 名前でタスクを実行する

    • タスクの結果を取得する

    • タスクのソフトおよびハードの時間制限を変更する

    • タスクのレート制限の変更

    • タスクを取り消す


  • OpenID認証

スクリーンショット

thumb|none

thumb|none

その他のスクリーンショット


使用法

pipを使用してFlowerをインストールできます。

$ pip install flower

flowerコマンドを実行すると、アクセスできるWebサーバーが起動します。

$ celery -A proj flower

デフォルトのポートは http:// localhost:5555 ですが、 –port 引数を使用してこれを変更できます。

$ celery -A proj flower --port=5555

ブローカーのURLは、--broker引数を介して渡すこともできます。

$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0

次に、Webブラウザでflowerにアクセスできます。

$ open http://localhost:5555

Flowerには、認証オプションなど、ここで詳しく説明するよりも多くの機能があります。 詳細については、公式ドキュメントをご覧ください。


セロリイベント:Curses Monitor

バージョン2.0の新機能。


セロリイベントは、タスクとワーカーの履歴を表示するシンプルな呪いモニターです。 タスクの結果とトレースバックを検査できます。また、レート制限やワーカーのシャットダウンなどのいくつかの管理コマンドもサポートしています。 このモニターは概念実証として開始されたものであり、代わりにFlowerを使用することをお勧めします。

起動:

$ celery -A proj events

次のような画面が表示されます。

thumb|none

セロリイベントは、スナップショットカメラの起動にも使用されます(スナップショットを参照:

$ celery -A proj events --camera=<camera-class> --frequency=1.0

また、イベントをstdoutにダンプするツールが含まれています。

$ celery -A proj events --dump

オプションの完全なリストについては、--helpを使用してください。

$ celery events --help

RabbitMQ

Celeryクラスターを管理するには、RabbitMQを監視する方法を知ることが重要です。

RabbitMQには rabbitmqctl(1)コマンドが付属しています。これにより、キュー、交換、バインディング、キューの長さ、各キューのメモリ使用量を一覧表示したり、ユーザー、仮想ホスト、およびそれらの権限を管理したりできます。

ノート

これらの例では、デフォルトの仮想ホスト("/")が使用されています。カスタム仮想ホストを使用する場合は、コマンドに-p引数を追加する必要があります(例:rabbitmqctl list_queues -p my_vhost …)。


キューの検査

キュー内のタスクの数を見つける:

$ rabbitmqctl list_queues name messages messages_ready \
                          messages_unacknowledged

ここで、 messages_ready は、配信の準備ができている(送信されたが受信されていない)メッセージの数です。 messages_unacknowledged は、ワーカーによって受信されたがまだ確認されていない(つまり、進行中、または予約済み)。 messages は、準備完了メッセージと未確認メッセージの合計です。

キューから現在消費しているワーカーの数を見つける:

$ rabbitmqctl list_queues name consumers

キューに割り当てられたメモリの量を見つける:

$ rabbitmqctl list_queues name memory
ヒント
-qオプションを rabbitmqctl(1)に追加すると、出力の解析が容易になります。


Redis

ブローカーとしてRedisを使用している場合は、 redis-cli(1)コマンドを使用してCeleryクラスターを監視し、キューの長さを一覧表示できます。

キューの検査

キュー内のタスクの数を見つける:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

デフォルトのキューの名前は celery です。 使用可能なすべてのキューを取得するには、次を呼び出します。

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*

ノート

キューキーは、タスクが含まれている場合にのみ存在するため、キーが存在しない場合は、単にそのキューにメッセージがないことを意味します。 これは、Redisでは、要素が含まれていないリストが自動的に削除されるため、 keys コマンド出力に表示されず、そのリストの llen が0を返すためです。

また、Redisを他の目的で使用している場合、 keys コマンドの出力には、データベースに保存されている無関係な値が含まれます。 これを回避するための推奨される方法は、Celery専用の DATABASE_NUMBER を使用することです。データベース番号を使用して、Celeryアプリケーションを相互に分離することもできます(仮想ホスト)。ただし、これは、例Floweras Redis pub / subコマンドは、データベースベースではなくグローバルです。


ムニン

これは、Celeryクラスターを保守するときに役立つ既知のMuninプラグインのリストです。


イベント

ワーカーには、何らかのイベントが発生するたびにメッセージを送信する機能があります。 これらのイベントは、Flowerやセロリイベントなどのツールによってキャプチャされ、クラスターを監視します。

スナップショット

バージョン2.1の新機能。


1人のワーカーでも大量のイベントを生成できるため、すべてのイベントの履歴をディスクに保存すると非常にコストがかかる場合があります。

一連のイベントは、その期間のクラスター状態を記述します。この状態のスナップショットを定期的に取得することで、すべての履歴を保持できますが、それでも定期的にディスクに書き込むだけです。

スナップショットを作成するには、Cameraクラスが必要です。これにより、状態がキャプチャされるたびに何が発生するかを定義できます。 データベースに書き込んだり、電子メールなどで完全に送信したりできます。

セロリイベントは、カメラでスナップショットを撮るために使用されます。たとえば、カメラmyapp.Cameraを使用して2秒ごとに状態をキャプチャする場合は、セロリイベントを実行します。次の引数:

$ celery -A proj events -c myapp.Camera --frequency=2.0

カスタムカメラ

カメラは、イベントをキャプチャし、それらのイベントを一定の間隔で処理する必要がある場合に役立ちます。 リアルタイムイベント処理の場合は、リアルタイム処理のように、@events.Receiverを直接使用する必要があります。

スナップショットを画面にダンプするカメラの例を次に示します。

from pprint import pformat

from celery.events.snapshot import Polaroid

class DumpCam(Polaroid):
    clear_after = True  # clear after flush (incl, state.event_count).

    def on_shutter(self, state):
        if not state.event_count:
            # No new events since last snapshot.
            return
        print('Workers: {0}'.format(pformat(state.workers, indent=4)))
        print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
        print('Total: {0.event_count} events, {0.task_count} tasks'.format(
            state))

状態オブジェクトの詳細については、celery.events.stateのAPIリファレンスを参照してください。

これで、-cオプションで指定することにより、セロリイベントでこのカムを使用できます。

$ celery -A proj events -c myapp.DumpCam --frequency=2.0

または、次のようにプログラムで使用できます。

from celery import Celery
from myapp import DumpCam

def main(app, freq=1.0):
    state = app.events.State()
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': state.event})
        with DumpCam(state, freq=freq):
            recv.capture(limit=None, timeout=None)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    main(app)

リアルタイム処理

イベントをリアルタイムで処理するには、次のものが必要です。

  • イベントコンシューマー(これはReceiverです)

  • イベントが発生したときに呼び出されるハンドラーのセット。

    イベントタイプごとに異なるハンドラーを使用することも、キャッチオールハンドラーを使用することもできます( '*')

  • 状態(オプション)

    @events.Stateは、クラスター内のタスクとワーカーの便利なメモリ内表現であり、イベントが発生すると更新されます。

    ワーカーがまだ生きているかどうかを確認する(ハートビートを確認する)、イベントが発生したときにイベントフィールドをマージする、タイムスタンプが同期していることを確認するなど、多くの一般的なソリューションをカプセル化します。

これらを組み合わせると、イベントをリアルタイムで簡単に処理できます。

from celery import Celery


def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

ノート

capturewakeup引数は、すべてのワーカーに信号を送信して、ハートビートを送信するように強制します。 このようにして、モニターの起動時にワーカーをすぐに確認できます。


ハンドラーを指定することにより、特定のイベントをリッスンできます。

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

イベントリファレンス

このリストには、ワーカーによって送信されたイベントとその引数が含まれています。

タスクイベント

タスク送信

サイン
task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)

タスクメッセージが公開され、:setting: `task_send_sent_event` 設定が有効になっている場合に送信されます。


タスク受信

サイン
task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)

ワーカーがタスクを受け取ったときに送信されます。


タスク開始

サイン
task-started(uuid, hostname, timestamp, pid)

ワーカーがタスクを実行する直前に送信されます。


タスク-成功

サイン
task-succeeded(uuid, result, runtime, hostname, timestamp)

タスクが正常に実行された場合に送信されます。

実行時は、プールを使用してタスクを実行するのにかかった時間です。 (タスクから開始してワーカープールに送信され、プール結果ハンドラーのコールバックが呼び出されたときに終了します)。


タスク失敗

サイン
task-failed(uuid, exception, traceback, hostname, timestamp)

タスクの実行が失敗した場合に送信されます。


タスクが拒否されました

サイン
task-rejected(uuid, requeued)

タスクはワーカーによって拒否されました。おそらく、再キューイングされるか、デッドレターキューに移動される可能性があります。


タスクが取り消されました

サイン
task-revoked(uuid, terminated, signum, expired)

タスクが取り消された場合に送信されます(これは複数のワーカーによって送信される可能性があることに注意してください)。

  • *; タスクプロセスが終了した場合、terminatedはtrueに設定されます。
    signumフィールドは使用される信号に設定されます。
  • タスクの有効期限が切れた場合、expiredはtrueに設定されます。


タスク-再試行

サイン
task-retried(uuid, exception, traceback, hostname, timestamp)

タスクが失敗した場合に送信されますが、将来再試行されます。


労働者のイベント

労働者-オンライン

サイン
worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

ワーカーはブローカーに接続し、オンラインになっています。

  • hostname :ワーカーのノード名。
  • タイムスタンプ:イベントのタイムスタンプ。
  • freq :秒単位のハートビート周波数(フロート)。
  • sw_ident :ワーカーソフトウェアの名前(例:py-celery)。
  • sw_ver :ソフトウェアバージョン(例:2.2.0)。
  • sw_sys :オペレーティングシステム(例:Linux / Darwin)。


労働者の鼓動

サイン
worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)

毎分送信され、ワーカーが2分以内にハートビートを送信しなかった場合、オフラインであると見なされます。

  • hostname :ワーカーのノード名。
  • タイムスタンプ:イベントのタイムスタンプ。
  • freq :秒単位のハートビート周波数(フロート)。
  • sw_ident :ワーカーソフトウェアの名前(例:py-celery)。
  • sw_ver :ソフトウェアバージョン(例:2.2.0)。
  • sw_sys :オペレーティングシステム(例:Linux / Darwin)。
  • active :現在実行中のタスクの数。
  • 処理済み:このワーカーによって処理されたタスクの総数。


労働者-オフライン

サイン
worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

ワーカーがブローカーから切断されました。