タスク—Pythonドキュメント
タスク
タスクは、Celeryアプリケーションの構成要素です。
タスクは、任意の呼び出し可能オブジェクトから作成できるクラスです。 タスクが呼び出されたとき(メッセージを送信したとき)と、ワーカーがそのメッセージを受信したときに発生することの両方を定義するという点で、2つの役割を果たします。
すべてのタスククラスには一意の名前があり、この名前はメッセージで参照されるため、ワーカーは実行する適切な関数を見つけることができます。
タスクメッセージは、そのメッセージがワーカーによって確認されるまでキューから削除されません。 ワーカーは事前に多くのメッセージを予約でき、停電やその他の理由でワーカーが殺された場合でも、メッセージは別のワーカーに再配信されます。
理想的には、タスク関数はべき等である必要があります。つまり、同じ引数で複数回呼び出されても、関数が意図しない効果を引き起こさないことを意味します。 ワーカーはタスクがべき等であるかどうかを検出できないため、デフォルトの動作では、メッセージが実行される直前にメッセージを事前に確認し、すでに開始されているタスク呼び出しが再度実行されることはありません。
タスクがべき等である場合は、 acks_late オプションを設定して、代わりにタスクが返すメッセージ after をワーカーに確認させることができます。 FAQエントリ retryまたはacks_lateを使用する必要がありますか?も参照してください。
acks_late が有効になっている場合でも、タスクを実行している子プロセスが(sys.exit()
を呼び出すタスクによって、またはシグナルによって)終了すると、ワーカーはメッセージを確認することに注意してください。 この動作は意図的なものです…
- カーネルに:sig: `SIGSEGV` (セグメンテーション違反)または同様のシグナルをプロセスに送信させるタスクを再実行したくありません。
- システム管理者が意図的にタスクを強制終了しても、タスクが自動的に再起動することを望まないと想定しています。
- あまりにも多くのメモリを割り当てるタスクは、カーネルOOMキラーをトリガーする危険があります。同じことが再び発生する可能性があります。
- 再配信時に常に失敗するタスクは、システムをダウンさせる高頻度のメッセージループを引き起こす可能性があります。
これらのシナリオでタスクを本当に再配信する必要がある場合は、:setting: `task_reject_on_worker_lost` 設定を有効にすることを検討する必要があります。
警告
無期限にブロックするタスクは、最終的にワーカーインスタンスが他の作業を実行できなくなる可能性があります。
タスクがI / Oを実行する場合は、:pypi: `requests` ライブラリを使用してWebリクエストにタイムアウトを追加するなど、これらの操作にタイムアウトを追加してください。
connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
時間制限は、すべてのタスクがタイムリーに戻るようにするのに便利ですが、時間制限イベントは実際にはプロセスを強制的に強制終了するため、手動タイムアウトをまだ使用していない場合にのみ使用してください。
以前のバージョンでは、デフォルトのプリフォークプールスケジューラは長時間実行されるタスクに対応していなかったため、数分/時間実行されるタスクがある場合は、の-Ofair
コマンドライン引数を有効にすることをお勧めしました。 ]セロリワーカー。 ただし、バージョン4.0以降、-Ofairがデフォルトのスケジューリング戦略になりました。 詳細、および専用ワーカーへの長時間実行タスクと短期実行タスクの最適なパフォーマンスルート(自動ルーティング)については、プリフェッチ制限を参照してください。
ワーカーがハングした場合は、問題を送信する前に、実行中のタスクを調査してください。ハングは、ネットワーク操作でハングしている1つ以上のタスクが原因である可能性があります。
–
この章では、タスクの定義についてすべて学習します。これは、目次です。
基本
@task()
デコレータを使用すると、任意の呼び出し可能オブジェクトからタスクを簡単に作成できます。
from .models import User
@app.task
def create_user(username, password):
User.objects.create(username=username, password=password)
タスクに設定できるオプションも多数あります。これらはデコレータへの引数として指定できます。
@app.task(serializer='json')
def create_user(username, password):
User.objects.create(username=username, password=password)
タスクデコレータをインポートするにはどうすればよいですか? そして、「アプリ」とは何ですか?
タスクデコレータは@Celery
アプリケーションインスタンスで利用できます。これが何であるかわからない場合は、 Celeryの最初のステップをお読みください。
Djangoを使用している場合( Django の最初のステップを参照)、またはライブラリの作成者である場合は、おそらく@shared_task()
デコレータを使用することをお勧めします。
from celery import shared_task
@shared_task
def add(x, y):
return x + y
複数のデコレータ
複数のデコレータをタスクデコレータと組み合わせて使用する場合は、 task デコレータが最後に適用されていることを確認する必要があります(奇妙なことに、Pythonではこれはリストの最初にある必要があります)。
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
バインドされたタスク
バインドされているタスクは、Pythonのバインドされたメソッドと同様に、タスクの最初の引数が常にタスクインスタンス(self
)になることを意味します。
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
logger.info(self.request.id)
バインドされたタスクは、再試行(Task.retry()
を使用)、現在のタスク要求に関する情報へのアクセス、およびカスタムタスク基本クラスに追加する追加機能に必要です。
タスクの継承
タスクデコレータのbase
引数は、タスクの基本クラスを指定します。
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask)
def add(x, y):
raise KeyError()
名前
すべてのタスクには一意の名前を付ける必要があります。
明示的な名前が指定されていない場合、タスクデコレータは名前を生成します。この名前は、1)タスクが定義されているモジュール、および2)タスク関数の名前に基づいています。
明示的な名前の設定例:
>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
... return x + y
>>> add.name
'sum-of-two-numbers'
ベストプラクティスは、モジュール名を名前空間として使用することです。これにより、別のモジュールでその名前が定義されたタスクがすでに存在する場合に、名前が衝突しなくなります。
>>> @app.task(name='tasks.add')
>>> def add(x, y):
... return x + y
.name
属性を調べることで、タスクの名前を知ることができます。
>>> add.name
'tasks.add'
ここで指定した名前(tasks.add
)は、タスクがtasks.py
という名前のモジュールで定義された場合に自動的に生成される名前とまったく同じです。
tasks.py
:
@app.task
def add(x, y):
return x + y
>>> from tasks import add
>>> add.name
'tasks.add'
自動命名動作の変更
バージョン4.0の新機能。
デフォルトの自動命名が適切でない場合があります。 多くの異なるモジュール内に多くのタスクを含めることを検討してください。
project/
/__init__.py
/celery.py
/moduleA/
/__init__.py
/tasks.py
/moduleB/
/__init__.py
/tasks.py
デフォルトの自動命名を使用すると、各タスクには、 moduleA.tasks.taskA 、 moduleA.tasks.taskB 、 moduleB.tasks.test 、等々。 すべてのタスク名にタスクが含まれていないようにすることをお勧めします。 上で指摘したように、すべてのタスクに明示的に名前を付けることも、@gen_task_name()
をオーバーライドして自動命名動作を変更することもできます。 例を続けると、 celery.py には次のものが含まれている可能性があります。
from celery import Celery
class MyCelery(Celery):
def gen_task_name(self, name, module):
if module.endswith('.tasks'):
module = module[:-6]
return super().gen_task_name(name, module)
app = MyCelery('main')
したがって、各タスクには、 moduleA.taskA 、 moduleA.taskB 、 moduleB.test のような名前が付けられます。
警告
@gen_task_name()
が純粋関数であることを確認してください。つまり、同じ入力に対して、常に同じ出力を返す必要があります。
タスクリクエスト
Task.request
には、現在実行中のタスクに関連する情報と状態が含まれています。
リクエストは次の属性を定義します。
- id
- 実行中のタスクの一意のID。
- グループ
- このタスクがメンバーである場合、タスクのグループの一意のID。
- コード
- このタスクが属するコードの一意のID(タスクがヘッダーの一部である場合)。
- correlation_id
- 重複排除などに使用されるカスタムID。
- 引数
- 位置引数。
- kwargs
- キーワード引数。
- 元
- このタスクを送信したホストの名前。
- 再試行
- 現在のタスクが再試行された回数。 0 で始まる整数。
- is_eager
- タスクがワーカーではなくクライアントでローカルに実行される場合は、
True
に設定します。 - エタ
- タスクの元のETA(ある場合)。 これはUTC時間です(:setting: `enable_utc` 設定によって異なります)。
- 有効期限が切れます
- タスクの元の有効期限(ある場合)。 これはUTC時間です(:setting: `enable_utc` 設定によって異なります)。
- ホスト名
- タスクを実行するワーカーインスタンスのノード名。
- delivery_info
- 追加のメッセージ配信情報。 これは、このタスクを実行するために使用される交換キーとルーティングキーを含むマッピングです。 たとえば、
Task.retry()
によって使用され、タスクを同じ宛先キューに再送信します。 このdictでのキーの可用性は、使用されるメッセージブローカーによって異なります。 - に返信
- 返信を送り返すキューの名前(たとえば、RPC結果バックエンドで使用されます)。
- called_directly
- タスクがワーカーによって実行されなかった場合、このフラグはtrueに設定されます。
- 制限時間
- このタスクに対してアクティブな現在の
(soft, hard)
時間制限のタプル(存在する場合)。 - コールバック
- このタスクが正常に戻った場合に呼び出されるシグニチャのリスト。
- エラーバック
- このタスクが失敗した場合に呼び出されるシグニチャのリスト。
- UTC
- trueに設定すると、発信者はUTCを有効にします(:setting: `enable_utc` )。
バージョン3.1の新機能。
- ヘッダー
- このタスクメッセージとともに送信されるメッセージヘッダーのマッピング(
None
の場合があります)。 - に返信
- 返信先(キュー名)。
- correlation_id
- 通常はタスクIDと同じで、応答の目的を追跡するためにamqpでよく使用されます。
バージョン4.0の新機能。
- root_id
- このタスクが含まれるワークフローの最初のタスクの一意のID(存在する場合)。
- parent_id
- このタスクを呼び出したタスクの一意のID(存在する場合)。
- 鎖
- チェーンを形成するタスクの逆リスト(存在する場合)。 このリストの最後の項目は、現在のタスクを引き継ぐ次のタスクになります。 タスクプロトコルのバージョン1を使用している場合、チェーンタスクは代わりに
request.callbacks
になります。
バージョン5.2の新機能。
- プロパティ
- このタスクメッセージで受信したメッセージプロパティのマッピング(
None
または{}
の場合があります) - replace_task_nesting
- タスクが置き換えられた場合の回数。 (
0
の場合があります)
例
コンテキスト内の情報にアクセスするタスクの例は次のとおりです。
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
self.request))
bind
引数は、関数が「バインドされたメソッド」になることを意味します。これにより、タスクタイプインスタンスの属性とメソッドにアクセスできます。
ロギング
ワーカーが自動的にログを設定します。または、手動でログを構成することもできます。
「celery.task」という名前の特別なロガーが利用可能です。このロガーから継承して、ログの一部としてタスク名と一意のIDを自動的に取得できます。
ベストプラクティスは、モジュールの上部にあるすべてのタスクに共通のロガーを作成することです。
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
Celeryは標準のPythonロガーライブラリを使用しており、ドキュメントはhere
にあります。
標準のout / -errに書き込まれたものはすべてログシステムにリダイレクトされるため、print()
を使用することもできます(これを無効にできます。:setting: `worker_redirect_stdouts` を参照)。
ノート
タスクまたはタスクモジュールのどこかにロガーインスタンスを作成した場合、ワーカーはリダイレクトを更新しません。
sys.stdout
およびsys.stderr
をカスタムロガーにリダイレクトする場合は、次のように手動で有効にする必要があります。
import sys
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
old_outs = sys.stdout, sys.stderr
rlevel = self.app.conf.worker_redirect_stdouts_level
try:
self.app.log.redirect_stdouts_to_logger(logger, rlevel)
print('Adding {0} + {1}'.format(x, y))
return x + y
finally:
sys.stdout, sys.stderr = old_outs
ノート
必要な特定のCeleryロガーがログを発行していない場合は、ロガーが適切に伝播していることを確認する必要があります。 この例では、「celery.app.trace」が有効になっているため、「succeedin」ログが出力されます。
import celery
import logging
@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
logger = logging.getLogger('celery')
logger.propagate = True
logger = logging.getLogger('celery.app.trace')
logger.propagate = True
ノート
Celeryロギング構成を完全に無効にする場合は、:signal: `setup_logging` シグナルを使用します。
import celery
@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
pass
引数チェック
バージョン4.0の新機能。
Celeryは、Pythonが通常の関数を呼び出すときに行うのと同じように、タスクを呼び出すときに渡された引数を検証します。
>>> @app.task
... def add(x, y):
... return x + y
# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 376, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 485, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)
typing
属性をFalse
に設定することにより、任意のタスクの引数チェックを無効にできます。
>>> @app.task(typing=False)
... def add(x, y):
... return x + y
# Works locally, but the worker receiving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
引数に機密情報を隠す
バージョン4.0の新機能。
:setting: `task_protocol` 2以降(4.0以降のデフォルト)を使用する場合、argsrepr
およびkwargsrepr
呼び出し引数:
>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
>>> charge.s(account, card='1234 5678 1234 5678').set(
... kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()
警告
機密情報は、ブローカーからのタスクメッセージを読んだり、傍受したりできる人なら誰でもアクセスできます。
このため、機密情報が含まれている場合はメッセージを暗号化する必要があります。この例では、クレジットカード番号を使用して、タスク自体で取得および復号化する安全なストアに実際の番号を暗号化して保存できます。
再試行
Task.retry()
は、たとえば回復可能なエラーが発生した場合に、タスクを再実行するために使用できます。
retry
を呼び出すと、同じタスクIDを使用して新しいメッセージが送信され、メッセージが元のタスクと同じキューに配信されるように注意が払われます。
タスクが再試行されると、これもタスク状態として記録されるため、結果インスタンスを使用してタスクの進行状況を追跡できます(状態を参照)。
retry
を使用した例を次に示します。
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
ノート
Task.retry()
呼び出しは例外を発生させるため、再試行後のコードには到達しません。 これは@Retry
例外であり、エラーとしてではなく、タスクが再試行されることをワーカーに示すための半述語として処理されるため、結果が発生したときに正しい状態を格納できます。バックエンドが有効になっています。
これは通常の操作であり、再試行するthrow
引数がFalse
に設定されていない限り常に発生します。
タスクデコレータへのbind引数は、self
(タスクタイプインスタンス)へのアクセスを提供します。
exc
引数は、ログで使用される例外情報を渡すため、およびタスクの結果を格納するときに使用されます。 例外とトレースバックの両方がタスク状態で使用可能になります(結果バックエンドが有効になっている場合)。
タスクの値がmax_retries
の場合、再試行の最大数を超えると現在の例外が再発生しますが、次の場合は発生しません。
exc
引数が指定されていません。この場合、
@MaxRetriesExceededError
例外が発生します。現在の例外はありません
再発生する元の例外がない場合は、代わりに
exc
引数が使用されるため、次のようになります。self.retry(exc=Twitter.LoginError())
与えられた
exc
引数を上げます。
カスタム再試行遅延の使用
タスクを再試行する場合、タスクは一定時間待機してから再試行できます。デフォルトの遅延は、default_retry_delay
属性によって定義されます。 デフォルトでは、これは3分に設定されています。 遅延を設定する単位は秒単位(intまたはfloat)であることに注意してください。
countdown 引数をretry()
に指定して、このデフォルトをオーバーライドすることもできます。
@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
def add(self, x, y):
try:
something_raising()
except Exception as exc:
# overrides the default delay to retry after 1 minute
raise self.retry(exc=exc, countdown=60)
既知の例外の自動再試行
バージョン4.0の新機能。
特定の例外が発生するたびに、タスクを再試行したい場合があります。
幸い、@task()
デコレータの autoretry_for 引数を使用して、タスクを自動的に再試行するようにCeleryに指示できます。
from twitter.exceptions import FailWhaleError
@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
return twitter.refresh_timeline(user)
内部retry()
呼び出しにカスタム引数を指定する場合は、 retry_kwargs 引数を@task()
デコレータに渡します。
@app.task(autoretry_for=(FailWhaleError,),
retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
return twitter.refresh_timeline(user)
これは、例外を手動で処理する代わりに提供されており、上記の例は、タスク本体をtry
…except
ステートメントでラップするのと同じことを行います。
@app.task
def refresh_timeline(user):
try:
twitter.refresh_timeline(user)
except FailWhaleError as exc:
raise div.retry(exc=exc, max_retries=5)
エラーが発生したときに自動的に再試行する場合は、次を使用します。
@app.task(autoretry_for=(Exception,))
def x():
...
バージョン4.2の新機能。
APIへのリクエストなど、タスクが別のサービスに依存している場合は、指数バックオフを使用して、リクエストでサービスが圧倒されないようにすることをお勧めします。 幸い、Celeryの自動再試行サポートにより簡単になります。 次のように、 retry_backoff 引数を指定するだけです。
from requests.exceptions import RequestException
@app.task(autoretry_for=(RequestException,), retry_backoff=True)
def x():
...
デフォルトでは、この指数バックオフにより、ランダムなジッターも導入され、すべてのタスクが同時に実行されるのを回避します。 また、最大バックオフ遅延を10分に制限します。 これらの設定はすべて、以下に記載されているオプションを使用してカスタマイズできます。
バージョン4.4の新機能。
クラスベースのタスクで autoretry_for 、 max_retries 、 retry_backoff 、 retry_backoff_max 、 retry_jitter オプションを設定することもできます。
class BaseTaskWithRetry(Task):
autoretry_for = (TypeError,)
max_retries = 5
retry_backoff = True
retry_backoff_max = 700
retry_jitter = False
- Task.autoretry_for
- 例外クラスのリスト/タプル。 タスクの実行中にこれらの例外のいずれかが発生した場合、タスクは自動的に再試行されます。 デフォルトでは、例外は自動再試行されません。
- Task.max_retries
- 数。 諦めるまでの最大再試行回数。
None
の値は、タスクが永久に再試行されることを意味します。 デフォルトでは、このオプションは3
に設定されています。
- Task.retry_backoff
- ブール値、または数値。 このオプションが
True
に設定されている場合、自動再試行は指数バックオフのルールに従って遅延されます。 最初の再試行には1秒の遅延があり、2回目の再試行には2秒の遅延があり、3回目は4秒の遅延があり、4回目は8秒の遅延があります。 (ただし、この遅延値が有効になっている場合は、 retry_jitter によって変更されます。)このオプションが数値に設定されている場合、遅延係数として使用されます。 たとえば、このオプションが3
に設定されている場合、最初の再試行は3秒遅れ、2回目は6秒遅れ、3回目は12秒遅れ、4回目は24秒遅れます。 デフォルトでは、このオプションはFalse
に設定されており、自動再試行は遅延されません。
- Task.retry_backoff_max
- 数。
retry_backoff
が有効になっている場合、このオプションはタスクの自動再試行間の最大遅延を秒単位で設定します。 デフォルトでは、このオプションは600
に設定されています。これは10分です。
- Task.retry_jitter
- ブール値。 Jitter は、指数バックオフ遅延にランダム性を導入して、キュー内のすべてのタスクが同時に実行されないようにするために使用されます。 このオプションが
True
に設定されている場合、 retry_backoff によって計算された遅延値は最大値として扱われ、実際の遅延値はゼロからその最大値までの乱数になります。 デフォルトでは、このオプションはTrue
に設定されています。
オプションのリスト
タスクデコレータは、タスクの動作を変更するいくつかのオプションを選択できます。たとえば、rate_limit
オプションを使用してタスクのレート制限を設定できます。
タスクデコレータに渡されるキーワード引数は、実際には結果のタスククラスの属性として設定されます。これは、組み込み属性のリストです。
全般的
- Task.name
タスクが登録されている名前。
この名前は手動で設定できます。または、モジュール名とクラス名を使用して名前が自動的に生成されます。
名前も参照してください。
- Task.request
タスクが実行されている場合、これには現在のリクエストに関する情報が含まれます。 スレッドローカルストレージが使用されます。
タスクリクエストを参照してください。
- Task.max_retries
タスクが
self.retry
を呼び出す場合、またはタスクが autoretry_for 引数で装飾されている場合にのみ適用されます。諦める前に試行された再試行の最大数。 再試行回数がこの値を超えると、
@MaxRetriesExceededError
例外が発生します。ノート
retry()
は例外時に自動的に再試行されないため、手動で呼び出す必要があります。デフォルトは
3
です。None
の値は再試行制限を無効にし、タスクは成功するまで永久に再試行します。
- Task.throws
実際のエラーと見なされるべきではない、予想されるエラークラスのオプションのタプル。
このリストのエラーは結果バックエンドへの失敗として報告されますが、ワーカーはイベントをエラーとしてログに記録せず、トレースバックは含まれません。
例:
@task(throws=(KeyError, HttpNotFound)): def get_foo(): something()
エラーの種類:
予期されるエラー(
Task.throws
内)重大度
INFO
でログに記録され、トレースバックは除外されました。予期しないエラー
重大度
ERROR
でログに記録され、トレースバックが含まれています。
- Task.default_retry_delay
- タスクの再試行が実行されるまでのデフォルトの秒数。
int
またはfloat
のいずれかになります。 デフォルトは3分の遅延です。
- Task.rate_limit
このタスクタイプのレート制限を設定します(特定の時間枠で実行できるタスクの数を制限します)。 レート制限が有効な場合でもタスクは完了しますが、開始が許可されるまでに時間がかかる場合があります。
これが
None
の場合、レート制限は有効ではありません。 整数または浮動小数点数の場合、「1秒あたりのタスク数」として解釈されます。レート制限は、値に“ / s” 、“ / m” 、または“ / h” を追加することにより、秒、分、または時間で指定できます。 タスクは、指定された時間枠に均等に分散されます。
例:「100 / m」(1分間に100タスク)。 これにより、同じワーカーインスタンスで2つのタスクを開始する間に600msの最小遅延が適用されます。
デフォルトは:setting: `task_default_rate_limit` 設定です。指定されていない場合、タスクのレート制限がデフォルトで無効になっていることを意味します。
これはワーカーインスタンスごとのレート制限であり、グローバルレート制限ではないことに注意してください。 グローバルレート制限を適用するには(たとえば、1秒あたりのリクエスト数が最大のAPIの場合)、特定のキューに制限する必要があります。
- Task.time_limit
- このタスクのハードタイム制限(秒単位)。 設定されていない場合、ワーカーのデフォルトが使用されます。
- Task.soft_time_limit
- このタスクのソフト制限時間。 設定されていない場合、ワーカーのデフォルトが使用されます。
- Task.ignore_result
- タスクの状態を保存しないでください。 これは、
AsyncResult
を使用して、タスクの準備ができているかどうかを確認したり、その戻り値を取得したりできないことを意味します。
- Task.store_errors_even_if_ignored
True
の場合、タスクが結果を無視するように構成されていても、エラーが保存されます。
- Task.serializer
使用するデフォルトのシリアル化方法を識別する文字列。 デフォルトは:setting: `task_serializer` 設定です。 pickle 、 json 、 yaml 、または
kombu.serialization.registry
に登録されている任意のカスタムシリアル化方法を使用できます。詳細については、シリアライザーを参照してください。
- Task.compression
使用するデフォルトの圧縮スキームを識別する文字列。
デフォルトは:setting: `task_compression` 設定です。 gzip 、 bzip2 、または
kombu.compression
レジストリに登録されている任意のカスタム圧縮スキームにすることができます。詳細については、圧縮を参照してください。
- Task.backend
- このタスクに使用する結果ストアバックエンド。 celery.backends のバックエンドクラスの1つのインスタンス。 デフォルトは app.backend で、:setting: `result_backend` 設定で定義されています。
- Task.acks_late
このタスクのメッセージが
True
に設定されている場合、[X116X]の直前ではなく、の実行後に確認応答されます(デフォルトの動作)。注:これは、実行中にワーカーがクラッシュした場合に、タスクが複数回実行される可能性があることを意味します。 タスクがべき等であることを確認してください。
グローバルデフォルトは、:setting: `task_acks_late` 設定で上書きできます。
- Task.track_started
True
の場合、タスクがワーカーによって実行されると、タスクはそのステータスを「開始済み」として報告します。 通常の動作ではそのレベルの粒度を報告しないため、デフォルト値はFalse
です。 タスクは保留中、終了中、または再試行待ちのいずれかです。 「開始済み」ステータスを持つことは、長時間実行されているタスクがあり、現在実行中のタスクを報告する必要がある場合に役立ちます。タスクを実行しているワーカーのホスト名とプロセスIDは、状態メタデータで利用できます(例: result.info ['pid'] )
グローバルデフォルトは、:setting: `task_track_started` 設定で上書きできます。
も参照してください
@Task
のAPIリファレンス。
州
Celeryは、タスクの現在の状態を追跡できます。 状態には、成功したタスクの結果、または失敗したタスクの例外とトレースバック情報も含まれます。
選択できる結果バックエンドはいくつかあり、それらはすべて異なる長所と短所があります(結果バックエンドを参照)。
タスクはその存続期間中にいくつかの可能な状態を経て遷移し、各状態には任意のメタデータが添付される場合があります。 タスクが新しい状態に移行すると、前の状態は忘れられますが、いくつかの遷移を推測できます(たとえば、現在:state: `FAILED` 状態にあるタスクは、 :state: `STARTED` 状態のある時点)。
:state: `FAILURE_STATES` のセット、:state:` READY_STATES` のセットなど、状態のセットもあります。
クライアントは、これらのセットのメンバーシップを使用して、例外を再発生させる必要があるか(:state: `PROPAGATE_STATES` )、または状態をキャッシュできるか(タスクの準備ができている場合は可能)を決定します。
カスタム状態を定義することもできます。
結果のバックエンド
タスクを追跡したい場合、または戻り値が必要な場合、Celeryは状態をどこかに保存または送信して、後で取得できるようにする必要があります。 SQLAlchemy / Django ORM、Memcached、RabbitMQ / QPid(rpc
)、Redisなど、いくつかの組み込みの結果バックエンドから選択できます。または、独自に定義することもできます。
すべてのユースケースでうまく機能するバックエンドはありません。 各バックエンドの長所と短所について読み、ニーズに最も適したものを選択する必要があります。
警告
バックエンドはリソースを使用して結果を保存および送信します。 リソースが確実に解放されるようにするには、タスクの呼び出し後に返されるすべての@AsyncResult
インスタンスで最終的にget()
またはforget()
を呼び出す必要があります。
RPC結果バックエンド(RabbitMQ / QPid)
RPC結果バックエンド( rpc:// )は、実際には状態を保存せず、メッセージとして送信するため、特別です。 これは重要な違いです。結果は一度だけ取得でき、はタスクを開始したクライアントだけが取得できることを意味します。 2つの異なるプロセスが同じ結果を待つことはできません。
その制限があっても、状態の変化をリアルタイムで受信する必要がある場合は、優れた選択肢です。 メッセージングを使用すると、クライアントは新しい状態をポーリングする必要がなくなります。
メッセージはデフォルトで一時的(非永続的)であるため、ブローカーを再起動すると結果は表示されなくなります。 :setting: `result_persistent` 設定を使用して、永続メッセージを送信するように結果バックエンドを構成できます。
データベース結果バックエンド
データベースに状態を保持することは、多くの人にとって、特にデータベースがすでに配置されているWebアプリケーションにとっては便利ですが、制限もあります。
データベースの新しい状態のポーリングにはコストがかかるため、 result.get()などの操作のポーリング間隔を増やす必要があります。
一部のデータベースは、変更のためにテーブルをポーリングするのに適していないデフォルトのトランザクション分離レベルを使用します。
MySQLでは、デフォルトのトランザクション分離レベルは REPEATABLE-READ です。つまり、現在のトランザクションがコミットされるまで、トランザクションは他のトランザクションによって行われた変更を認識しません。
これを READ-COMMITTED 分離レベルに変更することをお勧めします。
ビルトインステート
保留中
タスクは実行を待機しているか、不明です。 不明なタスクIDは、保留状態にあることを意味します。
開始しました
タスクが開始されました。 デフォルトでは報告されません。有効にするには、@Task.track_started
を参照してください。
- メタデータ
- タスクを実行しているワーカープロセスの pid および hostname 。
成功
タスクは正常に実行されました。
- メタデータ
- result には、タスクの戻り値が含まれています。
- 伝播する
- はい
- 準備
- はい
失敗
タスクの実行に失敗しました。
- メタデータ
- result には発生した例外が含まれ、 traceback には例外が発生した時点でのスタックのバックトレースが含まれます。
- 伝播する
- はい
リトライ
タスクは再試行されています。
- メタデータ
- result には、再試行の原因となった例外が含まれ、 traceback には、例外が発生した時点でのスタックのバックトレースが含まれます。
- 伝播する
- 番号
取り消されました
タスクが取り消されました。
- 伝播する
- はい
カスタム状態
独自の状態を簡単に定義できます。必要なのは一意の名前だけです。 状態の名前は通常、大文字の文字列です。 例として、カスタム:state: `ABORTED` 状態を定義するabortable tasks
を見ることができます。
update_state()
を使用して、タスクの状態を更新します。
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
ここで、状態“ PROGRESS” を作成し、この状態を認識しているすべてのアプリケーションに、タスクが現在進行中であり、 current と[X197X ] total は、状態メタデータの一部としてカウントされます。 これを使用して、たとえばプログレスバーを作成できます。
ピクルス可能な例外の作成
まれにしか知られていないPythonの事実は、pickleモジュールによるシリアル化をサポートするには、例外がいくつかの単純なルールに準拠している必要があるということです。
Pickleがシリアライザーとして使用されている場合、pickle化できない例外を発生させるタスクは正しく機能しません。
例外が選択可能であることを確認するために、例外 Must は、インスタンス化された元の引数を.args
属性で提供します。 これを確認する最も簡単な方法は、例外呼び出しException.__init__
を使用することです。
うまくいく例とうまくいかない例を見てみましょう。
# OK:
class HttpError(Exception):
pass
# BAD:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
# OK:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
Exception.__init__(self, status_code) # <-- REQUIRED
したがって、ルールは次のとおりです。カスタム引数*args
をサポートする例外については、Exception.__init__(self, *args)
を使用する必要があります。
キーワード引数は特別にサポートされていないため、例外が選択されていないときにキーワード引数を保持する場合は、通常の引数として渡す必要があります。
class HttpError(Exception):
def __init__(self, status_code, headers=None, body=None):
self.status_code = status_code
self.headers = headers
self.body = body
super(HttpError, self).__init__(status_code, headers, body)
半述語
ワーカーは、タスクの最終状態を記録するトレース関数でタスクをラップします。 この関数に信号を送って、タスクの戻りの処理方法を変更するために使用できる例外がいくつかあります。
無視
タスクは@Ignore
を発生させて、ワーカーにタスクを無視させる場合があります。 これは、タスクの状態が記録されないことを意味しますが、メッセージは引き続き確認されます(キューから削除されます)。
これは、カスタムの取り消しのような機能を実装する場合、またはタスクの結果を手動で保存する場合に使用できます。
取り消されたタスクをRedisセットに保持する例:
from celery.exceptions import Ignore
@app.task(bind=True)
def some_task(self):
if redis.ismember('tasks.revoked', self.request.id):
raise Ignore()
結果を手動で保存する例:
from celery import states
from celery.exceptions import Ignore
@app.task(bind=True)
def get_tweets(self, user):
timeline = twitter.get_timeline(user)
if not self.request.called_directly:
self.update_state(state=states.SUCCESS, meta=timeline)
raise Ignore()
拒絶
タスクは@Reject
を発生させ、AMQP basic_reject
メソッドを使用してタスクメッセージを拒否する場合があります。 Task.acks_late が有効になっていない限り、これは効果がありません。
メッセージを拒否することは、メッセージを確認することと同じ効果がありますが、一部のブローカーは、使用できる追加機能を実装する場合があります。 たとえば、RabbitMQは Dead Letter Exchanges の概念をサポートしており、拒否されたメッセージが再配信されるデッドレター交換を使用するようにキューを構成できます。
リジェクトを使用してメッセージを再キューイングすることもできますが、これを使用する場合は、メッセージループが無限に発生する可能性があるため、十分に注意してください。
タスクがメモリ不足状態を引き起こしたときにrejectを使用する例:
import errno
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def render_scene(self, path):
file = get_file(path)
try:
renderer.render_scene(file)
# if the file is too big to fit in memory
# we reject it so that it's redelivered to the dead letter exchange
# and we can manually inspect the situation.
except MemoryError as exc:
raise Reject(exc, requeue=False)
except OSError as exc:
if exc.errno == errno.ENOMEM:
raise Reject(exc, requeue=False)
# For any other error we retry after 10 seconds.
except Exception as exc:
raise self.retry(exc, countdown=10)
メッセージを再キューイングする例:
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def requeues(self):
if not self.request.delivery_info['redelivered']:
raise Reject('no reason', requeue=True)
print('received two times')
basic_reject
メソッドの詳細については、ブローカーのドキュメントを参照してください。
リトライ
@Retry
例外は、Task.retry
メソッドによって発生し、タスクが再試行されていることをワーカーに通知します。
カスタムタスククラス
すべてのタスクは@Task
クラスから継承します。 run()
メソッドがタスク本体になります。
例として、次のコードは、
@app.task
def add(x, y):
return x + y
舞台裏でこれを大まかに行います:
class _AddTask(app.Task):
def run(self, x, y):
return x + y
add = app.tasks[_AddTask.name]
インスタンス化
タスクは、すべての要求に対してインスタンス化されませんですが、グローバルインスタンスとしてタスクレジストリに登録されます。
これは、__init__
コンストラクターがプロセスごとに1回だけ呼び出され、タスククラスが意味的にアクターに近いことを意味します。
タスクがある場合は、
from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
そして、すべてのリクエストを同じプロセスにルーティングすると、リクエスト間の状態が維持されます。
これは、リソースをキャッシュする場合にも役立ちます。たとえば、データベース接続をキャッシュする基本タスククラスは次のとおりです。
from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
タスクごとの使用法
上記は、次のように各タスクに追加できます。
@app.task(base=DatabaseTask)
def process_rows():
for row in process_rows.db.table.all():
process_row(row)
process_rows
タスクのdb
属性は、各プロセスで常に同じままになります。
アプリ全体の使用法
アプリをインスタンス化するときにtask_cls
引数として渡すことで、Celeryアプリ全体でカスタムクラスを使用することもできます。 この引数は、タスククラスへのPythonパスを指定する文字列またはクラス自体のいずれかである必要があります。
from celery import Celery
app = Celery('tasks', task_cls='your.module.path:DatabaseTask')
これにより、アプリ内でデコレータ構文を使用して宣言されたすべてのタスクがDatabaseTask
クラスを使用するようになり、すべてdb
属性が割り当てられます。
デフォルト値は、Celeryが提供するクラス'celery.app.task:Task'
です。
ハンドラー
- before_start(self, task_id, args, kwargs)
タスクの実行を開始する前にワーカーによって実行されます。
バージョン5.2の新機能。
- パラメーター
task_id –実行するタスクの一意のID。
args –実行するタスクの元の引数。
kwargs –実行するタスクの元のキーワード引数。
このハンドラーの戻り値は無視されます。
- after_return(self, status, retval, task_id, args, kwargs, einfo)
タスクが戻った後に呼び出されるハンドラー。
- パラメーター
status –現在のタスクの状態。
retval –タスクの戻り値/例外。
task_id –タスクの一意のID。
args –返されたタスクの元の引数。
kwargs –返されたタスクの元のキーワード引数。
einfo –
ExceptionInfo
インスタンス。トレースバック(存在する場合)が含まれます。
このハンドラーの戻り値は無視されます。
- on_failure(self, exc, task_id, args, kwargs, einfo)
これは、タスクが失敗したときにワーカーによって実行されます。
- パラメーター
exc –タスクによって発生した例外。
task_id –失敗したタスクの一意のID。
args –失敗したタスクの元の引数。
kwargs –失敗したタスクの元のキーワード引数。
einfo –
ExceptionInfo
インスタンス、トレースバックを含みます。
このハンドラーの戻り値は無視されます。
- on_retry(self, exc, task_id, args, kwargs, einfo)
これは、タスクが再試行されるときにワーカーによって実行されます。
- パラメーター
exc –
retry()
に送信された例外。task_id –再試行されたタスクの一意のID。
args –再試行されたタスクの元の引数。
kwargs –再試行されたタスクの元のキーワード引数。
einfo –
ExceptionInfo
インスタンス、トレースバックを含みます。
このハンドラーの戻り値は無視されます。
- on_success(self, retval, task_id, args, kwargs)
タスクが正常に実行された場合、ワーカーによって実行されます。
- パラメーター
retval –タスクの戻り値。
task_id –実行されたタスクの一意のID。
args –実行されたタスクの元の引数。
kwargs –実行されたタスクの元のキーワード引数。
このハンドラーの戻り値は無視されます。
リクエストとカスタムリクエスト
タスクを実行するためのメッセージを受信すると、ワーカーはそのような要求を表すrequest
を作成します。
カスタムタスククラスは、属性celery.app.task.Task.Request
を変更することにより、使用する要求クラスをオーバーライドできます。 カスタムリクエストクラス自体、またはその完全修飾名のいずれかを割り当てることができます。
リクエストにはいくつかの責任があります。 カスタムリクエストクラスはそれらすべてをカバーする必要があります-それらは実際にタスクを実行して追跡する責任があります。 celery.worker.request.Request
から継承することを強くお勧めします。
プリフォークワーカーを使用する場合、メソッドon_timeout()
およびon_failure()
はメインワーカープロセスで実行されます。 アプリケーションは、このような機能を利用して、celery.app.task.Task.on_failure()
を使用して検出されない障害を検出できます。
例として、次のカスタムリクエストは、ハードタイム制限やその他の障害を検出してログに記録します。
import logging
from celery import Task
from celery.worker.request import Request
logger = logging.getLogger('my.package')
class MyRequest(Request):
'A minimal custom request to log failures and hard time limits.'
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'A hard timeout was enforced for task %s',
self.task.name
)
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
super().on_failure(
exc_info,
send_failed_event=send_failed_event,
return_ok=return_ok
)
logger.warning(
'Failure detected for task %s',
self.task.name
)
class MyTask(Task):
Request = MyRequest # you can use a FQN 'my.package:MyRequest'
@app.task(base=MyTask)
def some_longrunning_task():
# use your imagination
使い方
ここに技術的な詳細があります。 この部分はあなたが知る必要があるものではありませんが、あなたは興味があるかもしれません。
定義されたすべてのタスクがレジストリに一覧表示されます。 レジストリには、タスク名とそのタスククラスのリストが含まれています。 このレジストリは自分で調べることができます。
>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
<@task: celery.chord_unlock>,
'celery.backend_cleanup':
<@task: celery.backend_cleanup>,
'celery.chord':
<@task: celery.chord>}
これは、Celeryに組み込まれているタスクのリストです。 タスクは、タスクが定義されているモジュールがインポートされたときにのみ登録されることに注意してください。
デフォルトのローダーは、:setting: `imports` 設定にリストされているすべてのモジュールをインポートします。
@task()
デコレータは、アプリケーションのタスクレジストリにタスクを登録する役割を果たします。
タスクが送信されると、実際の機能コードは送信されず、実行するタスクの名前だけが送信されます。 その後、ワーカーはメッセージを受信すると、タスクレジストリで名前を検索して、実行コードを見つけることができます。
つまり、ワーカーは常にクライアントと同じソフトウェアで更新する必要があります。 これは欠点ですが、代替手段はまだ解決されていない技術的な課題です。
ヒントとベストプラクティス
不要な結果は無視してください
タスクの結果を気にしない場合は、必ずignore_result
オプションを設定してください。結果を保存すると、時間とリソースが無駄になります。
@app.task(ignore_result=True)
def mytask():
something()
:setting: `task_ignore_result` 設定を使用して、結果をグローバルに無効にすることもできます。
apply_async
またはdelay
を呼び出すときに、ignore_result
ブール値パラメーターを渡すことにより、実行ごとに結果を有効/無効にできます。
@app.task
def mytask(x, y):
return x + y
# No result will be stored
result = mytask.apply_async(1, 2, ignore_result=True)
print result.get() # -> None
# Result will be stored
result = mytask.apply_async(1, 2, ignore_result=False)
print result.get() # -> 3
デフォルトでは、結果バックエンドが構成されている場合、タスクは結果(ignore_result=False
)を無視しません。
オプションの優先順位は次のとおりです。
- グローバル:setting: `task_ignore_result`
ignore_result
オプション- タスク実行オプション
ignore_result
同期サブタスクの起動を回避する
タスクに別のタスクの結果を待機させることは実際には非効率的であり、ワーカープールが使い果たされた場合にデッドロックを引き起こす可能性さえあります。
代わりに、たとえばコールバックを使用して、デザインを非同期にします。
悪い:
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
良い:
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
ここでは、代わりに、さまざまなsignature()
をリンクして一連のタスクを作成しました。 Canvas:Designing Work-flows で、チェーンやその他の強力な構成要素について読むことができます。
デフォルトでは、Celeryではタスク内でサブタスクを同期的に実行できませんが、まれに、または極端な場合に実行する必要があります。 警告:サブタスクを同期的に実行できるようにすることはお勧めしません!
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get(disable_sync_subtasks=False)
info = parse_page.delay(url, page).get(disable_sync_subtasks=False)
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(url, page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
パフォーマンスと戦略
粒度
タスクの粒度は、各サブタスクに必要な計算量です。 一般に、長時間実行されるタスクをいくつか持つよりも、問題を多くの小さなタスクに分割する方が適切です。
小さいタスクを使用すると、より多くのタスクを並行して処理でき、タスクは、ワーカーが他の待機中のタスクを処理するのをブロックするのに十分な時間実行されません。
ただし、タスクの実行にはオーバーヘッドがあります。 メッセージを送信する必要がある、データがローカルでない可能性があるなど。 したがって、タスクがきめ細かくなりすぎると、追加されたオーバーヘッドによってメリットが失われる可能性があります。
- AOC1
- Breshears、粘土。 セクション2.2.1「並行性の芸術」。 O'Reilly Media、Inc。 2009年5月15日。 ISBN-13978-0-596-52153-0。
データの局所性
タスクを処理するワーカーは、データにできるだけ近づける必要があります。 最良の方法はメモリにコピーを置くことであり、最悪の場合は別の大陸からの完全な転送です。
データが遠くにある場合は、その場所で別のワーカーを実行しようとするか、それが不可能な場合は、頻繁に使用されるデータをキャッシュするか、使用されることがわかっているデータをプリロードします。
ワーカー間でデータを共有する最も簡単な方法は、 memcached などの分散キャッシュシステムを使用することです。
州
Celeryは分散システムであるため、どのプロセス、またはどのマシンでタスクが実行されるかを知ることはできません。 タスクがタイムリーに実行されるかどうかさえわかりません。
古代のことわざは、「世界を主張することは仕事の責任である」と私たちに告げています。 これが意味するのは、タスクが要求されてから世界観が変更された可能性があるため、タスクは世界が本来あるべき姿であることを確認する責任があります。 検索エンジンのインデックスを再作成するタスクがあり、検索エンジンのインデックスを最大で5分ごとに再インデックスする必要がある場合は、呼び出し元ではなく、それを表明するのはタスクの責任である必要があります。
もう1つの落とし穴は、Djangoモデルオブジェクトです。 それらはタスクへの引数として渡されるべきではありません。 古いデータを使用すると競合状態が発生する可能性があるため、ほとんどの場合、タスクの実行中にデータベースからオブジェクトを再フェッチすることをお勧めします。
いくつかの略語を自動的に展開する記事とタスクがある次のシナリオを想像してみてください。
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('MyCorp', 'My Corporation')
article.save()
最初に、作成者が記事を作成して保存し、次に作成者が略語タスクを開始するボタンをクリックします。
>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)
現在、キューは非常にビジーであるため、タスクはさらに2分間実行されません。 その間に別の作成者が記事に変更を加えたため、タスクが最終的に実行されると、タスクの引数に古い本文が含まれていたため、記事の本文は古いバージョンに戻されます。
競合状態の修正は簡単です。代わりに記事IDを使用し、タスク本体で記事を再フェッチしてください。
@app.task
def expand_abbreviations(article_id):
article = Article.objects.get(id=article_id)
article.body.replace('MyCorp', 'My Corporation')
article.save()
>>> expand_abbreviations.delay(article_id)
大きなメッセージの送信にはコストがかかる可能性があるため、このアプローチにはパフォーマンス上の利点さえある可能性があります。
データベーストランザクション
別の例を見てみましょう:
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
return HttpResponseRedirect('/articles/')
これは、データベースに記事オブジェクトを作成し、主キーをタスクに渡すDjangoビューです。 transaction.atomic デコレータを使用します。これは、ビューが戻ったときにトランザクションをコミットするか、ビューで例外が発生した場合にロールバックします。
トランザクションがコミットされる前にタスクの実行が開始されると、競合状態が発生します。 データベースオブジェクトはまだ存在していません!
解決策は、すべてのトランザクションが正常にコミットされたら、on_commit
コールバックを使用してCeleryタスクを起動することです。
from django.db.transaction import on_commit
def create_article(request):
article = Article.objects.create()
on_commit(lambda: expand_abbreviations.delay(article.pk))
例
実際の例を見てみましょう。投稿されたコメントをスパム用にフィルタリングする必要があるブログです。 コメントが作成されると、スパムフィルターがバックグラウンドで実行されるため、ユーザーはコメントが終了するのを待つ必要がありません。
ブログ投稿へのコメントを許可するDjangoブログアプリケーションがあります。 このアプリケーションのモデル/ビューとタスクの一部について説明します。
blog/models.py
コメントモデルは次のようになります。
from django.db import models
from django.utils.translation import ugettext_lazy as _
class Comment(models.Model):
name = models.CharField(_('name'), max_length=64)
email_address = models.EmailField(_('email address'))
homepage = models.URLField(_('home page'),
blank=True, verify_exists=False)
comment = models.TextField(_('comment'))
pub_date = models.DateTimeField(_('Published date'),
editable=False, auto_add_now=True)
is_spam = models.BooleanField(_('spam?'),
default=False, editable=False)
class Meta:
verbose_name = _('comment')
verbose_name_plural = _('comments')
コメントが投稿されたビューで、最初にコメントをデータベースに書き込み、次にバックグラウンドでスパムフィルタータスクを起動します。
blog/views.py
from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response
from blog import tasks
from blog.models import Comment
class CommentForm(forms.ModelForm):
class Meta:
model = Comment
def add_comment(request, slug, template_name='comments/create.html'):
post = get_object_or_404(Entry, slug=slug)
remote_addr = request.META.get('REMOTE_ADDR')
if request.method == 'post':
form = CommentForm(request.POST, request.FILES)
if form.is_valid():
comment = form.save()
# Check spam asynchronously.
tasks.spam_filter.delay(comment_id=comment.id,
remote_addr=remote_addr)
return HttpResponseRedirect(post.get_absolute_url())
else:
form = CommentForm()
context = RequestContext(request, {'form': form})
return render_to_response(template_name, context_instance=context)
コメント内のスパムをフィルタリングするには、 Akismet を使用します。これは、無料のブログプラットフォーム Wordpress に投稿されたコメント内のスパムをフィルタリングするために使用されるサービスです。 Akismet は個人使用は無料ですが、商用使用は有料です。 APIキーを取得するには、サービスにサインアップする必要があります。
Akismet へのAPI呼び出しを行うには、 Michael Foord によって作成された akismet.py ライブラリを使用します。
blog/tasks.py
from celery import Celery
from akismet import Akismet
from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site
from blog.models import Comment
app = Celery(broker='amqp://')
@app.task
def spam_filter(comment_id, remote_addr=None):
logger = spam_filter.get_logger()
logger.info('Running spam filter for comment %s', comment_id)
comment = Comment.objects.get(pk=comment_id)
current_domain = Site.objects.get_current().domain
akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
if not akismet.verify_key():
raise ImproperlyConfigured('Invalid AKISMET_KEY')
is_spam = akismet.comment_check(user_ip=remote_addr,
comment_content=comment.comment,
comment_author=comment.name,
comment_author_email=comment.email_address)
if is_spam:
comment.is_spam = True
comment.save()
return is_spam