セロリのバックグラウンドタスク
アップロードされたデータの処理やメールの送信など、アプリケーションに長時間実行されるタスクがある場合は、リクエスト中にアプリケーションが完了するのを待ちたくありません。 代わりに、タスクキューを使用して、要求がすぐに返される間、バックグラウンドでタスクを実行する別のプロセスに必要なデータを送信します。
Celeryは、単純なバックグラウンドタスクだけでなく、複雑なマルチステージプログラムやスケジュールにも使用できる強力なタスクキューです。 このガイドでは、Flaskを使用してCeleryを構成する方法を説明しますが、Celeryドキュメントの First Steps with Celery ガイドをすでに読んでいることを前提としています。
インストール
Celeryは別のPythonパッケージです。 pipを使用してPyPIからインストールします。
$ pip install celery
構成、設定
最初に必要なのはCeleryインスタンスです。これは、celeryアプリケーションと呼ばれます。 これは、Celery専用で、FlaskのFlask
オブジェクトと同じ目的を果たします。 このインスタンスは、タスクの作成やワーカーの管理など、Celeryで実行するすべてのエントリポイントとして使用されるため、他のモジュールがインスタンスをインポートできる必要があります。
たとえば、これをtasks
モジュールに配置できます。 CeleryはFlaskで再構成せずに使用できますが、タスクをサブクラス化し、Flaskのアプリケーションコンテキストのサポートを追加し、Flask構成に接続することで、少し便利になります。
CeleryをFlaskと適切に統合するために必要なのはこれだけです。
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
この関数は、新しいCeleryオブジェクトを作成し、アプリケーション構成からブローカーを使用して構成し、Flask構成から残りのCelery構成を更新してから、アプリケーションコンテキストでタスクの実行をラップするタスクのサブクラスを作成します。
タスクの例
2つの数値を足し合わせて結果を返すタスクを書いてみましょう。 Redisを使用するようにCeleryのブローカーとバックエンドを構成し、上記の係数を使用してcelery
アプリケーションを作成し、それを使用してタスクを定義します。
from flask import Flask
flask_app = Flask(__name__)
flask_app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)
@celery.task()
def add_together(a, b):
return a + b
このタスクは、バックグラウンドで呼び出すことができます。
result = add_together.delay(23, 42)
result.wait() # 65
ワーカーを実行する
上記のコードに飛び込んですでに実行した場合、.wait()
が実際には戻らないことを知ってがっかりするでしょう。 これは、タスクを受け取って実行するためにCeleryワーカーも実行する必要があるためです。
$ celery -A your_application.celery worker
your_application
文字列は、celery
オブジェクトを作成するアプリケーションのパッケージまたはモジュールを指している必要があります。
ワーカーが実行されているので、タスクが終了すると、wait
は結果を返します。