Celeryの最初のステップ—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/getting-started/first-steps-with-celery
移動先:案内検索

セロリの最初のステップ

セロリはバッテリーが含まれているタスクキューです。 使い方は簡単なので、解決する問題の複雑さを完全に学ぶことなく始めることができます。 製品を拡張して他の言語と統合できるように、ベストプラクティスに基づいて設計されており、このようなシステムを本番環境で実行するために必要なツールとサポートが付属しています。

このチュートリアルでは、Celeryの使用の絶対的な基本を学びます。

について学ぶ;

  • メッセージトランスポート(ブローカー)の選択とインストール。
  • Celeryをインストールし、最初のタスクを作成します。
  • ワーカーの開始とタスクの呼び出し。
  • さまざまな状態を移行するタスクを追跡し、戻り値を検査します。

セロリは最初は気が遠くなるように思えるかもしれませんが、心配しないでください。このチュートリアルでは、すぐに始めることができます。 高度な機能と混同しないように、意図的にシンプルにしています。 このチュートリアルを終了したら、残りのドキュメントを参照することをお勧めします。 たとえば、次のステップチュートリアルでは、Celeryの機能を紹介します。

ブローカーの選択

Celeryには、メッセージを送受信するためのソリューションが必要です。 通常、これはメッセージブローカーと呼ばれる別個のサービスの形式で提供されます。

次のようないくつかの選択肢があります。

RabbitMQ

RabbitMQ は、機能が完全で、安定していて、耐久性があり、インストールが簡単です。 これは、実稼働環境に最適です。 CeleryでのRabbitMQの使用に関する詳細情報:

UbuntuまたはDebianを使用している場合は、次のコマンドを実行してRabbitMQをインストールします。

$ sudo apt-get install rabbitmq-server

または、Dockerで実行する場合は、次のコマンドを実行します。

$ docker run -d -p 5672:5672 rabbitmq

コマンドが完了すると、ブローカーはすでにバックグラウンドで実行されており、メッセージを移動する準備ができています:Starting rabbitmq-server: SUCCESS

UbuntuまたはDebianを実行していなくても心配しないでください。このウェブサイトにアクセスして、MicrosoftWindowsを含む他のプラットフォームの同様に簡単なインストール手順を見つけることができます。

Redis

Redis も長編コンプリートですが、突然の終了や電源障害が発生した場合にデータが失われる可能性が高くなります。 Redisの使用に関する詳細情報:

Redisの使用

Dockerで実行する場合は、次のコマンドを実行します。

$ docker run -d -p 6379:6379 redis

他のブローカー

上記に加えて、 Amazon SQS など、他の実験的なトランスポート実装から選択できます。

完全なリストについては、ブローカーの概要を参照してください。


セロリのインストール

CeleryはPythonPackage Index(PyPI)に含まれているため、pipeasy_installなどの標準のPythonツールを使用してインストールできます。

$ pip install celery

応用

最初に必要なのはCeleryインスタンスです。 これを Celeryアプリケーションまたは単にアプリと略して呼びます。 このインスタンスは、タスクの作成やワーカーの管理など、Celeryで実行するすべてのエントリポイントとして使用されるため、他のモジュールがインスタンスをインポートできる必要があります。

このチュートリアルでは、すべてを1つのモジュールに収めていますが、大規模なプロジェクトでは、専用モジュールを作成する必要があります。

ファイルtasks.pyを作成しましょう:

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

Celeryの最初の引数は、現在のモジュールの名前です。 これは、タスクが __ main __ モジュールで定義されたときに名前を自動的に生成できるようにするためにのみ必要です。

2番目の引数はbrokerキーワード引数で、使用するメッセージブローカーのURLを指定します。 ここでは、RabbitMQ(これもデフォルトオプション)を使用しています。

その他の選択肢については、上記のブローカーの選択を参照してください。RabbitMQの場合はamqp://localhostを使用でき、Redisの場合はredis://localhostを使用できます。

addという単一のタスクを定義し、2つの数値の合計を返しました。


Celeryワーカーサーバーの実行

これで、worker引数を指定してプログラムを実行することにより、ワーカーを実行できます。

$ celery -A tasks worker --loglevel=INFO

ノート

ワーカーが起動しない場合は、トラブルシューティングセクションを参照してください。


本番環境では、ワーカーをデーモンとしてバックグラウンドで実行する必要があります。 これを行うには、プラットフォームが提供するツール、または supervisord などを使用する必要があります(詳細については、デーモン化を参照してください)。

使用可能なコマンドラインオプションの完全なリストについては、次の手順を実行してください。

$  celery worker --help

他にもいくつかのコマンドが利用可能であり、ヘルプも利用できます。

$ celery --help

タスクの呼び出し

タスクを呼び出すには、delay()メソッドを使用できます。

これは、apply_async()メソッドへの便利なショートカットであり、タスクの実行をより細かく制御できます(タスクの呼び出しを参照)。

>>> from tasks import add
>>> add.delay(4, 4)

これで、以前に開始したワーカーによってタスクが処理されました。 これは、ワーカーのコンソール出力を確認することで確認できます。

タスクを呼び出すと、@AsyncResultインスタンスが返されます。 これを使用して、タスクの状態を確認したり、タスクが終了するのを待ったり、戻り値を取得したりできます(または、タスクが失敗した場合は、例外とトレースバックを取得します)。

結果はデフォルトでは有効になっていません。 リモートプロシージャコールを実行したり、データベースでタスクの結果を追跡したりするには、結果のバックエンドを使用するようにCeleryを構成する必要があります。 これについては、次のセクションで説明します。


結果を維持する

タスクの状態を追跡したい場合、Celeryは状態をどこかに保存または送信する必要があります。 SQLAlchemy / Django ORM、 MongoDBMemcached 、 Redis [から選択できる組み込みの結果バックエンドがいくつかあります。 X147X]、 RPCRabbitMQ / AMQP)、および–または独自に定義できます。

この例では、 rpc 結果バックエンドを使用します。これは、状態を一時的なメッセージとして送り返します。 バックエンドは、@Celeryへのbackend引数を介して(または、構成モジュールを使用することを選択した場合は、:setting: `result_backend` 設定を介して)指定されます。 したがって、 tasks.py ファイルのこの行を変更して、 rpc:// バックエンドを有効にすることができます。

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

または、結果のバックエンドとしてRedisを使用したいが、メッセージブローカーとしてRabbitMQを使用したい場合(一般的な組み合わせ):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

結果バックエンドの詳細については、結果バックエンドを参照してください。

結果のバックエンドが構成されたので、現在のPythonセッションを閉じ、tasksモジュールを再度インポートして、変更を有効にします。 今回は、タスクを呼び出すときに返される@AsyncResultインスタンスを保持します。

>>> from tasks import add    # close and reopen to get updated 'app'
>>> result = add.delay(4, 4)

ready()メソッドは、タスクが処理を終了したかどうかを返します。

>>> result.ready()
False

結果が完了するのを待つことができますが、非同期呼び出しが同期呼び出しに変わるため、これが使用されることはめったにありません。

>>> result.get(timeout=1)
8

タスクが例外を発生させた場合、get()は例外を再発生させますが、propagate引数を指定することでこれをオーバーライドできます。

>>> result.get(propagate=False)

タスクで例外が発生した場合は、元のトレースバックにアクセスすることもできます。

>>> result.traceback

警告

バックエンドはリソースを使用して結果を保存および送信します。 リソースが確実に解放されるようにするには、タスクの呼び出し後に返されるすべての@AsyncResultインスタンスで最終的にget()またはforget()を呼び出す必要があります。


完全な結果オブジェクトリファレンスについては、celery.resultを参照してください。


構成

セロリは、家電製品のように、動作するために多くの構成を必要としません。 入力と出力があります。 入力はブローカーに接続する必要があり、出力はオプションで結果バックエンドに接続できます。 ただし、背面をよく見ると、スライダー、ダイヤル、ボタンの負荷を示す蓋があります。これが構成です。

ほとんどのユースケースではデフォルト構成で十分ですが、Celeryを必要に応じて正確に機能させるように構成できるオプションは多数あります。 使用可能なオプションについて読むことは、構成できるものを理解するための良いアイデアです。 オプションについては、構成とデフォルトリファレンスを参照してください。

構成は、アプリで直接設定することも、専用の構成モジュールを使用して設定することもできます。 例として、:setting: `task_serializer` 設定を変更することにより、タスクペイロードのシリアル化に使用されるデフォルトのシリアライザーを構成できます。

app.conf.task_serializer = 'json'

一度に多くの設定を構成する場合は、updateを使用できます。

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

大規模なプロジェクトの場合は、専用の構成モジュールをお勧めします。 定期的なタスク間隔とタスクルーティングオプションをハードコーディングすることはお勧めしません。 これらを一元化された場所に保管することをお勧めします。 これは、ユーザーがタスクの動作を制御できるため、特にライブラリに当てはまります。 一元化された構成により、システムに問題が発生した場合にSysAdminが簡単な変更を加えることもできます。

@config_from_object()メソッドを呼び出すことにより、Celeryインスタンスに構成モジュールを使用するように指示できます。

app.config_from_object('celeryconfig')

このモジュールは「celeryconfig」と呼ばれることがよくありますが、任意のモジュール名を使用できます。

上記の場合、celeryconfig.pyという名前のモジュールが、現在のディレクトリまたはPythonパスからロードできるようになっている必要があります。 次のようになります。

celeryconfig.py

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

構成ファイルが正しく機能し、構文エラーが含まれていないことを確認するには、次のようにインポートしてみてください。

$ python -m celeryconfig

構成オプションの完全なリファレンスについては、構成とデフォルトを参照してください。

構成ファイルの能力を示すために、これは、動作に問題のあるタスクを専用キューにルーティングする方法です。

celeryconfig.py

task_routes = {
    'tasks.add': 'low-priority',
}

または、ルーティングする代わりに、タスクをレート制限して、このタイプの10個のタスクのみを1分(10 / m)で処理できるようにすることもできます。

celeryconfig.py

task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

ブローカーとしてRabbitMQまたはRedisを使用している場合は、実行時にタスクに新しいレート制限を設定するようにワーカーに指示することもできます。

$ celery -A tasks control rate_limit tasks.add 10/m
[email protected]: OK
    new rate limit set successfully

タスクルーティングの詳細についてはルーティングタスクを、注釈の詳細については:setting: `task_annotations` 設定を、詳細については監視および管理ガイドを参照してください。リモートコントロールコマンドと、ワーカーの行動を監視する方法。


ここからどこへ行くか

詳細については、次のステップチュートリアルに進む必要があります。その後、ユーザーガイドを読むことができます。


トラブルシューティング

よくある質問にもトラブルシューティングのセクションがあります。

ワーカーが起動しない:許可エラー

  • Debian、Ubuntu、またはその他のDebianベースのディストリビューションを使用している場合:

    Debianは最近、/dev/shm特殊ファイルの名前を/run/shmに変更しました。

    簡単な回避策は、シンボリックリンクを作成することです。

    # ln -s /run/shm /dev/shm
  • その他:

    --pidfile--logfile、または--statedb引数のいずれかを指定する場合は、それらが、ユーザーが書き込み可能で読み取り可能なファイルまたはディレクトリを指していることを確認する必要があります。ワーカー。


結果のバックエンドが機能しないか、タスクが常にPENDING状態にある

デフォルトでは、すべてのタスクは:state: `PENDING` であるため、状態には「不明」という名前を付ける方が適切です。 Celeryは、タスクが送信されたときに状態を更新せず、履歴のないタスクは保留中であると見なされます(結局のところ、タスクIDはわかっています)。

  1. タスクでignore_resultが有効になっていないことを確認してください。

    このオプションを有効にすると、ワーカーは状態の更新をスキップします。

  2. :setting: `task_ignore_result` 設定が有効になっていないことを確認してください。

  3. 古いワーカーがまだ実行されていないことを確認してください。

    誤って複数のワーカーを開始するのは簡単なので、新しいワーカーを開始する前に、前のワーカーが適切にシャットダウンされていることを確認してください。

    期待される結果のバックエンドで構成されていない古いワーカーが実行されている可能性があり、タスクを乗っ取っています。

    --pidfile引数を絶対パスに設定して、これが発生しないようにすることができます。

  4. クライアントが正しいバックエンドで構成されていることを確認してください。

    何らかの理由で、クライアントがワーカーとは異なるバックエンドを使用するように構成されている場合、結果を受け取ることができません。 バックエンドが正しく構成されていることを確認します。

    >>> result = task.delay()
    >>> print(result.backend)