タスククックブック—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/tutorials/task-cookbook
移動先:案内検索

タスククックブック

タスクが一度に1つだけ実行されるようにする

これは、ロックを使用して実行できます。

この例では、キャッシュフレームワークを使用して、すべてのワーカーがアクセスできるロックを設定します。

これは、 djangofeeds と呼ばれる架空のRSSフィードインポーターの一部です。 このタスクは、フィードURLを単一の引数として受け取り、そのフィードを Feed と呼ばれるDjangoモデルにインポートします。 フィードURLのMD5チェックサムで構成されるキャッシュキーを設定することにより、2人以上のワーカーが同じフィードを同時にインポートできないようにします。

予期しないことが起こった場合に備えて、キャッシュキーはしばらくすると期限切れになり、何かが常に発生します…

このため、タスクの実行時間はタイムアウトを超えてはなりません。

ノート

これが正しく機能するためには、.add操作がアトミックであるキャッシュバックエンドを使用する必要があります。 memcachedはこの目的でうまく機能することが知られています。


import time
from celery import task
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed

logger = get_task_logger(__name__)

LOCK_EXPIRE = 60 * 10  # Lock expires in 10 minutes

@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = time.monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if time.monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)

@task(bind=True)
def import_feed(self, feed_url):
    # The cache key consists of the task name and the MD5 digest
    # of the feed URL.
    feed_url_hexdigest = md5(feed_url).hexdigest()
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
    logger.debug('Importing feed: %s', feed_url)
    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            return Feed.objects.import_feed(feed_url).url
    logger.debug(
        'Feed %s is already being imported by another worker', feed_url)