構成とデフォルト—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/userguide/configuration
移動先:案内検索

構成とデフォルト

このドキュメントでは、使用可能な構成オプションについて説明します。

デフォルトのローダーを使用している場合は、celeryconfig.pyモジュールを作成し、Pythonパスで使用できることを確認する必要があります。

設定ファイルの例

これは、開始するための構成ファイルの例です。 基本的なCeleryセットアップを実行するために必要なものがすべて含まれている必要があります。

## Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'

# List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)

## Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

新しい小文字の設定

バージョン4.0では、新しい小文字の設定と設定の構成が導入されました。

以前のバージョンとの主な違いは、小文字の名前を除いて、celery_beat_からbeat_celeryd_からworker_などの一部のプレフィックスの名前が変更されていることです。また、最上位のcelery_設定のほとんどは、新しいtask_プレフィックスに移動されました。

警告

Celeryは、Celery 6.0まで、古い構成ファイルを引き続き読み取ることができます。 その後、古い構成ファイルのサポートは削除されます。 多くのケース( Django を含む)を処理する必要があるcelery upgradeコマンドを提供します。

できるだけ早く新しい構成スキームに移行してください。


設定名 と置換する
CELERY_ACCEPT_CONTENT :setting: `accept_content`
CELERY_ENABLE_UTC :setting: `enable_utc`
CELERY_IMPORTS :setting: `imports`
CELERY_INCLUDE :setting: `include`
CELERY_TIMEZONE :setting: `timezone`
CELERYBEAT_MAX_LOOP_INTERVAL :setting: `beat_max_loop_interval`
CELERYBEAT_SCHEDULE :setting: `beat_schedule`
CELERYBEAT_SCHEDULER :setting: `beat_scheduler`
CELERYBEAT_SCHEDULE_FILENAME :setting: `beat_schedule_filename`
CELERYBEAT_SYNC_EVERY :setting: `beat_sync_every`
BROKER_URL :setting: `broker_url`
BROKER_TRANSPORT :setting: `broker_transport`
BROKER_TRANSPORT_OPTIONS :setting: `broker_transport_options`
BROKER_CONNECTION_TIMEOUT :setting: `broker_connection_timeout`
BROKER_CONNECTION_RETRY :setting: `broker_connection_retry`
BROKER_CONNECTION_MAX_RETRIES :setting: `broker_connection_max_retries`
BROKER_FAILOVER_STRATEGY :setting: `broker_failover_strategy`
BROKER_HEARTBEAT :setting: `broker_heartbeat`
BROKER_LOGIN_METHOD :setting: `broker_login_method`
BROKER_POOL_LIMIT :setting: `broker_pool_limit`
BROKER_USE_SSL :setting: `broker_use_ssl`
CELERY_CACHE_BACKEND :setting: `cache_backend`
CELERY_CACHE_BACKEND_OPTIONS :setting: `cache_backend_options`
CASSANDRA_COLUMN_FAMILY :setting: `cassandra_table`
CASSANDRA_ENTRY_TTL :setting: `cassandra_entry_ttl`
CASSANDRA_KEYSPACE :setting: `cassandra_keyspace`
CASSANDRA_PORT :setting: `cassandra_port`
CASSANDRA_READ_CONSISTENCY :setting: `cassandra_read_consistency`
CASSANDRA_SERVERS :setting: `cassandra_servers`
CASSANDRA_WRITE_CONSISTENCY :setting: `cassandra_write_consistency`
CASSANDRA_OPTIONS :setting: `cassandra_options`
S3_ACCESS_KEY_ID :setting: `s3_access_key_id`
S3_SECRET_ACCESS_KEY :setting: `s3_secret_access_key`
S3_BUCKET :setting: `s3_bucket`
S3_BASE_PATH :setting: `s3_base_path`
S3_ENDPOINT_URL :setting: `s3_endpoint_url`
S3_REGION :setting: `s3_region`
CELERY_COUCHBASE_BACKEND_SETTINGS :setting: `couchbase_backend_settings`
CELERY_ARANGODB_BACKEND_SETTINGS :setting: `arangodb_backend_settings`
CELERY_MONGODB_BACKEND_SETTINGS :setting: `mongodb_backend_settings`
CELERY_EVENT_QUEUE_EXPIRES :setting: `event_queue_expires`
CELERY_EVENT_QUEUE_TTL :setting: `event_queue_ttl`
CELERY_EVENT_QUEUE_PREFIX :setting: `event_queue_prefix`
CELERY_EVENT_SERIALIZER :setting: `event_serializer`
CELERY_REDIS_DB :setting: `redis_db`
CELERY_REDIS_HOST :setting: `redis_host`
CELERY_REDIS_MAX_CONNECTIONS :setting: `redis_max_connections`
CELERY_REDIS_USERNAME :setting: `redis_username`
CELERY_REDIS_PASSWORD :setting: `redis_password`
CELERY_REDIS_PORT :setting: `redis_port`
CELERY_REDIS_BACKEND_USE_SSL :setting: `redis_backend_use_ssl`
CELERY_RESULT_BACKEND :setting: `result_backend`
CELERY_MAX_CACHED_RESULTS :setting: `result_cache_max`
CELERY_MESSAGE_COMPRESSION :setting: `result_compression`
CELERY_RESULT_EXCHANGE :setting: `result_exchange`
CELERY_RESULT_EXCHANGE_TYPE :setting: `result_exchange_type`
CELERY_RESULT_EXPIRES :setting: `result_expires`
CELERY_RESULT_PERSISTENT :setting: `result_persistent`
CELERY_RESULT_SERIALIZER :setting: `result_serializer`
CELERY_RESULT_DBURI 代わりに:setting: `result_backend` を使用してください。
CELERY_RESULT_ENGINE_OPTIONS :setting: `database_engine_options`
[...]_DB_SHORT_LIVED_SESSIONS :setting: `database_short_lived_sessions`
CELERY_RESULT_DB_TABLE_NAMES :setting: `database_db_names`
CELERY_SECURITY_CERTIFICATE :setting: `security_certificate`
CELERY_SECURITY_CERT_STORE :setting: `security_cert_store`
CELERY_SECURITY_KEY :setting: `security_key`
CELERY_ACKS_LATE :setting: `task_acks_late`
CELERY_ACKS_ON_FAILURE_OR_TIMEOUT :setting: `task_acks_on_failure_or_timeout`
CELERY_ALWAYS_EAGER :setting: `task_always_eager`
CELERY_ANNOTATIONS :setting: `task_annotations`
CELERY_COMPRESSION :setting: `task_compression`
CELERY_CREATE_MISSING_QUEUES :setting: `task_create_missing_queues`
CELERY_DEFAULT_DELIVERY_MODE :setting: `task_default_delivery_mode`
CELERY_DEFAULT_EXCHANGE :setting: `task_default_exchange`
CELERY_DEFAULT_EXCHANGE_TYPE :setting: `task_default_exchange_type`
CELERY_DEFAULT_QUEUE :setting: `task_default_queue`
CELERY_DEFAULT_RATE_LIMIT :setting: `task_default_rate_limit`
CELERY_DEFAULT_ROUTING_KEY :setting: `task_default_routing_key`
CELERY_EAGER_PROPAGATES :setting: `task_eager_propagates`
CELERY_IGNORE_RESULT :setting: `task_ignore_result`
CELERY_PUBLISH_RETRY :setting: `task_publish_retry`
CELERY_PUBLISH_RETRY_POLICY :setting: `task_publish_retry_policy`
CELERY_QUEUES :setting: `task_queues`
CELERY_ROUTES :setting: `task_routes`
CELERY_SEND_SENT_EVENT :setting: `task_send_sent_event`
CELERY_SERIALIZER :setting: `task_serializer`
CELERYD_SOFT_TIME_LIMIT :setting: `task_soft_time_limit`
CELERY_TASK_TRACK_STARTED :setting: `task_track_started`
CELERY_TASK_REJECT_ON_WORKER_LOST :setting: `task_reject_on_worker_lost`
CELERYD_TIME_LIMIT :setting: `task_time_limit`
CELERYD_AGENT :setting: `worker_agent`
CELERYD_AUTOSCALER :setting: `worker_autoscaler`
CELERYD_CONCURRENCY :setting: `worker_concurrency`
CELERYD_CONSUMER :setting: `worker_consumer`
CELERY_WORKER_DIRECT :setting: `worker_direct`
CELERY_DISABLE_RATE_LIMITS :setting: `worker_disable_rate_limits`
CELERY_ENABLE_REMOTE_CONTROL :setting: `worker_enable_remote_control`
CELERYD_HIJACK_ROOT_LOGGER :setting: `worker_hijack_root_logger`
CELERYD_LOG_COLOR :setting: `worker_log_color`
CELERYD_LOG_FORMAT :setting: `worker_log_format`
CELERYD_WORKER_LOST_WAIT :setting: `worker_lost_wait`
CELERYD_MAX_TASKS_PER_CHILD :setting: `worker_max_tasks_per_child`
CELERYD_POOL :setting: `worker_pool`
CELERYD_POOL_PUTLOCKS :setting: `worker_pool_putlocks`
CELERYD_POOL_RESTARTS :setting: `worker_pool_restarts`
CELERYD_PREFETCH_MULTIPLIER :setting: `worker_prefetch_multiplier`
CELERYD_REDIRECT_STDOUTS :setting: `worker_redirect_stdouts`
CELERYD_REDIRECT_STDOUTS_LEVEL :setting: `worker_redirect_stdouts_level`
CELERY_SEND_EVENTS :setting: `worker_send_task_events`
CELERYD_STATE_DB :setting: `worker_state_db`
CELERYD_TASK_LOG_FORMAT :setting: `worker_task_log_format`
CELERYD_TIMER :setting: `worker_timer`
CELERYD_TIMER_PRECISION :setting: `worker_timer_precision`


構成ディレクティブ

一般設定

accept_content

デフォルト:{'json'}(セット、リスト、またはタプル)。

許可するコンテンツタイプ/シリアライザーのホワイトリスト。

このリストにないメッセージを受信した場合、そのメッセージはエラーとともに破棄されます。

デフォルトではjsonのみが有効になっていますが、pickleやyamlなどの任意のコンテンツタイプを追加できます。 この場合、信頼できない当事者がブローカーにアクセスできないようにしてください。 詳細については、セキュリティを参照してください。

例:

# using serializer name
accept_content = ['json']

# or the actual content-type (MIME)
accept_content = ['application/json']

result_accept_content

デフォルト:None(設定、リスト、またはタプルが可能)。

バージョン4.3の新機能。


結果のバックエンドを可能にするコンテンツタイプ/シリアライザーのホワイトリスト。

このリストにないメッセージを受信した場合、そのメッセージはエラーとともに破棄されます。

デフォルトでは、accept_contentと同じシリアライザーです。 ただし、結果バックエンドの受け入れられたコンテンツに対して別のシリアライザーを指定できます。 通常、これは、署名付きメッセージングが使用され、結果が署名なしで結果バックエンドに保存される場合に必要です。 詳細については、セキュリティを参照してください。

例:

# using serializer name
result_accept_content = ['json']

# or the actual content-type (MIME)
result_accept_content = ['application/json']

時間と日付の設定

enable_utc

バージョン2.5の新機能。


デフォルト:バージョン3.0以降、デフォルトで有効になっています。

有効にすると、メッセージの日付と時刻がUTCタイムゾーンを使用するように変換されます。

2.5未満のCeleryバージョンを実行しているワーカーは、すべてのメッセージに対してローカルタイムゾーンを想定しているため、すべてのワーカーがアップグレードされている場合にのみ有効にすることに注意してください。


timezone

バージョン2.5の新機能。


デフォルト:"UTC"

カスタムタイムゾーンを使用するようにCeleryを構成します。 タイムゾーン値は、:pypi: `pytz` ライブラリでサポートされている任意のタイムゾーンにすることができます。

設定されていない場合、UTCタイムゾーンが使用されます。 下位互換性のために、:setting: `enable_utc` 設定もあり、これがfalseに設定されている場合は、代わりにシステムのローカルタイムゾーンが使用されます。


タスク設定

task_annotations

バージョン2.5の新機能。


デフォルト:None

この設定を使用して、構成から任意のタスク属性を書き換えることができます。 設定は、dict、またはタスクをフィルタリングして変更する属性のマップを返す注釈オブジェクトのリストにすることができます。

これにより、tasks.addタスクのrate_limit属性が変更されます。

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

または、すべてのタスクで同じように変更します。

task_annotations = {'*': {'rate_limit': '10/s'}}

on_failureハンドラーなど、メソッドを変更することもできます。

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
    print('Oh no! Task failed: {0!r}'.format(exc))

task_annotations = {'*': {'on_failure': my_on_failure}}

より柔軟性が必要な場合は、dictの代わりにオブジェクトを使用して、注釈を付けるタスクを選択できます。

class MyAnnotate:

    def annotate(self, task):
        if task.name.startswith('tasks.'):
            return {'rate_limit': '10/s'}

task_annotations = (MyAnnotate(), {other,})

task_compression

デフォルト:None

タスクメッセージに使用されるデフォルトの圧縮。 gzipbzip2(利用可能な場合)、または昆布圧縮レジストリに登録されている任意のカスタム圧縮スキームにすることができます。

デフォルトでは、非圧縮メッセージを送信します。


task_protocol

デフォルト:2(4.0以降)。

タスクの送信に使用されるデフォルトのタスクメッセージプロトコルバージョンを設定します。 プロトコルをサポート:1および2。

プロトコル2は、3.1.24および4.x +でサポートされています。


task_serializer

デフォルト:"json"(4.0以降:ピクルス)。

使用するデフォルトのシリアル化方法を識別する文字列。 json (デフォルト)、 pickle 、 yaml 、 msgpack 、またはkombu.serialization.registry

も参照してください

シリアライザー


task_publish_retry

バージョン2.2の新機能。


デフォルト:有効。

接続が失われた場合やその他の接続エラーが発生した場合に、公開タスクメッセージを再試行するかどうかを決定します。 :setting: `task_publish_retry_policy` も参照してください。


task_publish_retry_policy

バージョン2.2の新機能。


デフォルト:メッセージ送信の再試行を参照してください。

接続が失われた場合やその他の接続エラーが発生した場合にタスクメッセージの公開を再試行するときのデフォルトポリシーを定義します。


タスク実行設定

task_always_eager

デフォルト:無効。

これがTrueの場合、すべてのタスクは、タスクが戻るまでブロックすることによってローカルで実行されます。 apply_async()Task.delay()は、EagerResultインスタンスを返します。これは、結果がすでに評価されていることを除いて、AsyncResultのAPIと動作をエミュレートします。

つまり、タスクはキューに送信されるのではなく、ローカルで実行されます。


task_eager_propagates

デフォルト:無効。

これがTrueの場合、熱心に実行されたタスク( task.apply()によって適用されるか、:setting: `task_always_eager` 設定が有効になっている場合)が伝播されます例外。

これは、常にapply()throw=Trueで実行するのと同じです。


task_store_eager_result

バージョン5.1の新機能。


デフォルト:無効。

これがTrueおよび:setting: `task_always_eager`Trueであり、:setting:` task_ignore_result`Falseの場合、熱心に実行されたタスクの結果はバックエンドに保存されます。

デフォルトでは、:setting: `task_always_eager`Trueに設定され、:setting:` task_ignore_result`Falseに設定されていても、結果は保存されません。


task_remote_tracebacks

デフォルト:無効。

有効にすると、タスクの結果には、タスクエラーを再発生させるときにワーカースタックが含まれます。

これには、:pypi: `tblib` ライブラリが必要です。これは、 pip を使用してインストールできます。

$ pip install celery[tblib]

複数の拡張要件を組み合わせる方法については、バンドルを参照してください。


task_ignore_result

デフォルト:無効。

タスクの戻り値を保存するかどうか(トゥームストーン)。 正常な戻り値ではなく、エラーを保存したい場合は、:setting: `task_store_errors_even_if_ignored` を設定できます。


task_store_errors_even_if_ignored

デフォルト:無効。

設定されている場合、Task.ignore_resultがオンの場合でも、ワーカーはすべてのタスクエラーを結果ストアに保存します。


task_track_started

デフォルト:無効。

Trueの場合、タスクがワーカーによって実行されると、タスクはそのステータスを「開始済み」として報告します。 通常の動作ではそのレベルの粒度を報告しないため、デフォルト値はFalseです。 タスクは保留中、終了中、または再試行待ちのいずれかです。 「開始済み」状態にすることは、実行時間の長いタスクがあり、現在実行中のタスクを報告する必要がある場合に役立ちます。


task_time_limit

デフォルト:時間制限なし。

タスクのハードタイム制限(秒単位)。 これを超えると、タスクを処理しているワーカーが強制終了され、新しいワーカーに置き換えられます。


task_soft_time_limit

デフォルト:ソフト時間制限なし。

タスクのソフト制限時間(秒単位)。

これを超えると、@SoftTimeLimitExceeded例外が発生します。 たとえば、タスクはこれをキャッチして、困難な時間制限が来る前にクリーンアップすることができます。

from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()

task_acks_late

デフォルト:無効。

Late ackは、タスクメッセージがの直前ではなくの実行後に確認応答されることを意味します(デフォルトの動作)。

task_acks_on_failure_or_timeout

デフォルト:有効

有効にすると、すべてのタスクのメッセージは、失敗したりタイムアウトしたりしても確認されます。

この設定の構成は、実行後に確認されたタスクにのみ適用され、:setting: `task_acks_late` が有効になっている場合にのみ適用されます。


task_reject_on_worker_lost

デフォルト:無効。

:setting: `task_acks_late` が有効になっている場合でも、タスクを実行しているワーカープロセスが突然終了するか、シグナルが送信されると、ワーカーはタスクを確認します(:sig:` KILL` / [ X186X]:sig: `INT` など)。

これをtrueに設定すると、代わりにメッセージを再キューイングできるため、タスクは同じワーカーまたは別のワーカーによって再度実行されます。

警告

これを有効にすると、メッセージループが発生する可能性があります。 自分が何をしているのかを確認してください。


task_default_rate_limit

デフォルト:レート制限なし。

タスクのグローバルなデフォルトのレート制限。

この値は、カスタムレート制限がないタスクに使用されます

も参照してください

設定: worker_disable_rate_limits 設定は、すべてのレート制限を無効にすることができます。


タスク結果のバックエンド設定

result_backend

デフォルト:デフォルトで有効になっている結果バックエンドはありません。

タスクの結果(トゥームストーン)を格納するために使用されるバックエンド。 次のいずれかになります。


result_backend_always_retry

デフォルト:False

有効にすると、バックエンドは、例外を伝播する代わりに、回復可能な例外のイベントで再試行を試みます。 2回の再試行の間に指数バックオフスリープ時間を使用します。


result_backend_max_sleep_between_retries_ms

デフォルト:10000

これは、2回のバックエンド操作の再試行間の最大スリープ時間を指定します。


result_backend_base_sleep_between_retries_ms

デフォルト:10

これは、2回のバックエンド操作の再試行間の基本スリープ時間を指定します。


result_backend_max_retries

デフォルト:Inf

これは、回復可能な例外の場合の再試行の最大数です。


result_backend_transport_options

デフォルト:{}(空のマッピング)。

基礎となるトランスポートに渡される追加オプションの指示。

サポートされているオプション(ある場合)については、トランスポートのユーザーマニュアルを参照してください。

可視性タイムアウトの設定例(RedisおよびSQSトランスポートでサポート):

result_backend_transport_options = {'visibility_timeout': 18000}  # 5 hours

result_serializer

デフォルト:4.0以降のjson(以前:pickle)。

結果のシリアル化形式。

サポートされているシリアル化形式については、 Serializers を参照してください。


result_compression

デフォルト:圧縮なし。

タスクの結果に使用されるオプションの圧縮方法。 :setting: `task_compression` 設定と同じオプションをサポートします。


result_extended

デフォルト:False

拡張タスク結果属性(name、args、kwargs、worker、retries、queue、delivery_info)をバックエンドに書き込むことができるようにします。


result_expires

デフォルト:1日後に期限切れになります。

保存されたタスクトゥームストーンが削除されるまでの時間(秒単位、またはtimedeltaオブジェクト)。

celery beatが有効になっていると仮定すると、組み込みの定期タスクは、この時間(celery.backend_cleanup)以降に結果を削除します。 タスクは毎日午前4時に実行されます。

Noneまたは0の値は、結果が期限切れにならないことを意味します(バックエンドの仕様によって異なります)。

ノート

現時点では、これはAMQP、データベース、キャッシュ、Couchbase、およびRedisバックエンドでのみ機能します。

データベースバックエンドを使用する場合、結果の有効期限が切れるには、celery beatが実行されている必要があります。


result_cache_max

デフォルト:デフォルトでは無効になっています。

結果のクライアントキャッシュを有効にします。

これは、1つの結果インスタンスが結果を消費するとすぐに結果が利用できなくなる古い非推奨の「amqp」バックエンドに役立ちます。

これは、古い結果が削除される前にキャッシュする結果の総数です。 0またはNoneの値は制限がないことを意味し、-1の値はキャッシュを無効にします。

デフォルトでは無効になっています。


result_chord_join_timeout

デフォルト:3.0。

コード内でグループの結果に参加するときの秒単位のタイムアウト(int / float)。


result_chord_retry_interval

デフォルト:1.0。

コードタスクを再試行するためのデフォルトの間隔。


override_backends

デフォルト:デフォルトでは無効になっています。

バックエンドを実装するクラスへのパス。

バックエンドの実装をオーバーライドできます。 これは、実行されたタスクに関する追加のメタデータを保存したり、再試行ポリシーを上書きしたりする必要がある場合に役立ちます。

例:

override_backends = {"db": "custom_module.backend.class"}

データベースバックエンド設定

データベースURLの例

データベースバックエンドを使用するには、接続URLとdb+プレフィックスを使用して:setting: `result_backend` 設定を構成する必要があります。

result_backend = 'db+scheme://user:password@host:port/dbname'

例:

# sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'

# mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'

# postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'

# oracle
result_backend = 'db+oracle://scott:[email protected]:1521/sidname'

サポートされているデータベースの表についてはサポートされているデータベースを、接続文字列の詳細については接続文字列を参照してください(これはdb+の後に続くURIの一部です。プレフィックス)。


database_engine_options

デフォルト:{}(空のマッピング)。

追加のSQLAlchemyデータベースエンジンオプションを指定するには、:setting: `database_engine_options` 設定を使用できます。

# echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}

database_short_lived_sessions

デフォルト:デフォルトでは無効になっています。

短期間のセッションはデフォルトで無効になっています。 有効にすると、特に多くのタスクを処理するシステムで、パフォーマンスが大幅に低下する可能性があります。 このオプションは、キャッシュされたデータベース接続が非アクティブで古くなった結果としてエラーが発生するトラフィックの少ないワーカーで役立ちます。 たとえば、(OperationalError)(2006、 'MySQLサーバーがなくなりました')のような断続的なエラーは、短期間のセッションを有効にすることで修正できます。 このオプションは、データベースのバックエンドにのみ影響します。


database_table_schemas

デフォルト:{}(空のマッピング)。

SQLAlchemyが結果バックエンドとして構成されている場合、Celeryはタスクの結果メタデータを格納するために2つのテーブルを自動的に作成します。 この設定により、テーブルのスキーマをカスタマイズできます。

# use custom schema for the database result backend.
database_table_schemas = {
    'task': 'celery',
    'group': 'celery',
}

database_table_names

デフォルト:{}(空のマッピング)。

SQLAlchemyが結果バックエンドとして構成されている場合、Celeryはタスクの結果メタデータを格納するために2つのテーブルを自動的に作成します。 この設定により、テーブル名をカスタマイズできます。

# use custom table names for the database result backend.
database_table_names = {
    'task': 'myapp_taskmeta',
    'group': 'myapp_groupmeta',
}

RPCバックエンド設定

result_persistent

デフォルト:デフォルトで無効になっています(一時的なメッセージ)。

Trueに設定すると、結果メッセージは永続的になります。 これは、ブローカーの再始動後にメッセージが失われないことを意味します。


構成例

result_backend = 'rpc://'
result_persistent = False

注意:タスクトゥームストーンが古すぎるの場合、このバックエンドを使用するとcelery.backends.rpc.BacklogLimitExceededのレイズがトリガーされる可能性があります。

例えば

for i in range(10000):
    r = debug_task.delay()

print(r.state)  # this would raise celery.backends.rpc.BacklogLimitExceeded

バックエンド設定をキャッシュする

ノート

キャッシュバックエンドは、:pypi: `pylibmc` および:pypi:` python-memcached` ライブラリをサポートします。 後者は、:pypi: `pylibmc` がインストールされていない場合にのみ使用されます。


単一のMemcachedサーバーを使用する:

result_backend = 'cache+memcached://127.0.0.1:11211/'

複数のMemcachedサーバーの使用:

result_backend = """
    cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()

「メモリ」バックエンドは、キャッシュをメモリにのみ保存します。

result_backend = 'cache'
cache_backend = 'memory'

cache_backend_options

デフォルト:{}(空のマッピング)。

:pypi: `pylibmc` オプションは、:setting:` cache_backend_options` 設定を使用して設定できます。

cache_backend_options = {
    'binary': True,
    'behaviors': {'tcp_nodelay': True},
}

cache_backend

:setting: `result_backend` 設定でキャッシュバックエンドを直接指定できるようになったため、この設定はceleryの組み込みバックエンドでは使用されなくなりました。

ノート

django-celery-results-結果としてDjangoORM / Cacheを使用するバックエンドライブラリは、cache_backendを使用してdjangoキャッシュを選択します。


MongoDBバックエンド設定

ノート

MongoDBバックエンドにはpymongoライブラリが必要です: http://github.com/mongodb/mongo-python-driver/tree/master


mongodb_backend_settings

これは、次のキーをサポートするdictです。

  • データベース

    接続するデータベース名。 デフォルトはceleryです。

  • taskmeta_collection

    タスクのメタデータを格納するコレクション名。 デフォルトはcelery_taskmetaです。

  • max_pool_size

    max_pool_sizeとしてPyMongoのConnectionまたはMongoClientコンストラクターに渡されます。 これは、特定の時間にMongoDBに対して開いたままにするTCP接続の最大数です。 max_pool_sizeよりも多くの開いている接続がある場合、ソケットは解放されたときに閉じられます。 デフォルトは10です。

  • オプション

    mongodb接続コンストラクターに渡す追加のキーワード引数。 サポートされている引数のリストについては、pymongoのドキュメントを参照してください。


構成例

result_backend = 'mongodb://localhost:27017/'
mongodb_backend_settings = {
    'database': 'mydb',
    'taskmeta_collection': 'my_taskmeta_collection',
}

Redisバックエンド設定

バックエンドURLの構成

ノート

Redisバックエンドには、:pypi: `redis` ライブラリが必要です。

このパッケージをインストールするには、 pip を使用します。

$ pip install celery[redis]

複数の拡張要件を組み合わせる方法については、バンドルを参照してください。


このバックエンドでは、:setting: `result_backend` 設定をRedisまたは Redis over TLS URLに設定する必要があります。

result_backend = 'redis://username:password@host:port/db'

例えば:

result_backend = 'redis://localhost/0'

と同じです:

result_backend = 'redis://'

rediss://プロトコルを使用して、TLSを介してredisに接続します。

result_backend = 'rediss://username:password@host:port/db?ssl_cert_reqs=required'

ssl_cert_reqs文字列はrequiredoptional、またはnoneのいずれかである必要があることに注意してください(ただし、下位互換性のために、文字列は[ X151X] 、CERT_OPTIONALCERT_NONE)。

Unixソケット接続を使用する必要がある場合、URLは次の形式である必要があります::

result_backend = 'socket:///path/to/redis.sock'

URLのフィールドは次のように定義されています。

  1. username

    バージョン5.1.0の新機能。

    データベースへの接続に使用されるユーザー名。

    これは、Redis> = 6.0で、py-redis> = 3.4.0がインストールされている場合にのみサポートされることに注意してください。

    古いデータベースバージョンまたは古いクライアントバージョンを使用している場合は、ユーザー名を省略できます。

    result_backend = 'redis://:password@host:port/db'
  2. password

    データベースへの接続に使用されるパスワード。

  3. host

    Redisサーバーのホスト名またはIPアドレス(例: localhost )。

  4. port

    Redisサーバーに移植します。 デフォルトは6379です。

  5. db

    使用するデータベース番号。 デフォルトは0です。 dbには、オプションの先頭のスラッシュを含めることができます。

TLS接続(プロトコルはrediss://)を使用する場合、:setting: `broker_use_ssl` のすべての値をクエリパラメーターとして渡すことができます。 証明書へのパスはURLエンコードする必要があり、ssl_cert_reqsが必要です。 例:

result_backend = 'rediss://:password@host:port/db?\
    ssl_cert_reqs=required\
    &ssl_ca_certs=%2Fvar%2Fssl%2Fmyca.pem\                  # /var/ssl/myca.pem
    &ssl_certfile=%2Fvar%2Fssl%2Fredis-server-cert.pem\     # /var/ssl/redis-server-cert.pem
    &ssl_keyfile=%2Fvar%2Fssl%2Fprivate%2Fworker-key.pem'   # /var/ssl/private/worker-key.pem

ssl_cert_reqs文字列はrequiredoptional、またはnoneのいずれかである必要があることに注意してください(ただし、下位互換性のために、文字列は[ X151X] 、CERT_OPTIONALCERT_NONE)。

バージョン5.1.0の新機能。


redis_backend_health_check_interval

デフォルト:未構成

Redisバックエンドはヘルスチェックをサポートしています。 この値は、ヘルスチェック間の秒数を値とする整数として設定する必要があります。 ヘルスチェック中にConnectionErrorまたはTimeoutErrorが発生した場合、接続が再確立され、コマンドが1回だけ再試行されます。


redis_backend_use_ssl

デフォルト:無効。

RedisバックエンドはSSLをサポートしています。 この値は、辞書の形式で設定する必要があります。 有効なキーと値のペアは、:setting: `broker_use_ssl`redisサブセクションに記載されているものと同じです。


redis_max_connections

デフォルト:制限なし。

結果の送信と取得に使用されるRedis接続プールで使用可能な接続の最大数。

警告

同時接続の数が最大値を超えると、Redisは ConnectionError を発生させます。


redis_socket_connect_timeout

バージョン4.0.1の新機能。


デフォルト:None

結果バックエンドからRedisへの接続のソケットタイムアウト(秒単位)(int / float)


redis_socket_timeout

デフォルト:120.0秒。

Redisサーバーへの読み取り/書き込み操作のソケットタイムアウト(秒単位)(int / float)。redis結果バックエンドで使用されます。


redis_retry_on_timeout

バージョン4.4.1の新機能。


デフォルト:False

Redis結果バックエンドによって使用されるRedisサーバーへのTimeoutErrorの読み取り/書き込み操作を再試行します。 UNIXソケットによるRedis接続を使用している場合は、この変数を設定しないでください。


redis_socket_keepalive

バージョン4.4.1の新機能。


デフォルト:False

Redisサーバーへの接続を正常に保つためのソケットTCPキープアライブ。Redis結果バックエンドで使用されます。


Cassandraバックエンド設定

ノート

このCassandraバックエンドドライバーには:pypi: `cassandra-driver` が必要です。

インストールするには、 pip を使用します。

$ pip install celery[cassandra]

複数の拡張要件を組み合わせる方法については、バンドルを参照してください。


このバックエンドでは、次の構成ディレクティブを設定する必要があります。

cassandra_servers

デフォルト:[](空のリスト)。

host Cassandraサーバーのリスト。 例えば:

cassandra_servers = ['localhost']

cassandra_port

デフォルト:9042。

Cassandraサーバーに接続するためのポート。


cassandra_keyspace

デフォルト:なし。

結果を保存するためのキースペース。 例えば:

cassandra_keyspace = 'tasks_keyspace'

cassandra_table

デフォルト:なし。

結果を格納するテーブル(列ファミリー)。 例えば:

cassandra_table = 'tasks'

cassandra_read_consistency

デフォルト:なし。

使用される読み取り整合性。 値には、ONETWOTHREEQUORUMALLLOCAL_QUORUMEACH_QUORUMがあります。 ]、LOCAL_ONE


cassandra_write_consistency

デフォルト:なし。

使用される書き込みの一貫性。 値には、ONETWOTHREEQUORUMALLLOCAL_QUORUMEACH_QUORUMがあります。 ]、LOCAL_ONE


cassandra_entry_ttl

デフォルト:なし。

ステータスエントリの存続時間。 それらは期限切れになり、追加してから数秒後に削除されます。 None(デフォルト)の値は、期限切れにならないことを意味します。


cassandra_auth_provider

デフォルト:None

使用するcassandra.authモジュール内のAuthProviderクラス。 値はPlainTextAuthProviderまたはSaslAuthProviderです。


cassandra_auth_kwargs

デフォルト:{}(空のマッピング)。

認証プロバイダーに渡す名前付き引数。 例えば:

cassandra_auth_kwargs = {
    username: 'cassandra',
    password: 'cassandra'
}

cassandra_options

デフォルト:{}(空のマッピング)。

cassandra.clusterクラスに渡す名前付き引数。

cassandra_options = {
    'cql_version': '3.2.1'
    'protocol_version': 3
}

構成例

cassandra_servers = ['localhost']
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'ONE'
cassandra_write_consistency = 'ONE'
cassandra_entry_ttl = 86400

S3バックエンド設定

ノート

このs3バックエンドドライバーには:pypi: `s3` が必要です。

インストールするには、 s3 を使用します。

$ pip install celery[s3]

複数の拡張要件を組み合わせる方法については、バンドルを参照してください。


このバックエンドでは、次の構成ディレクティブを設定する必要があります。

s3_access_key_id

デフォルト:なし。

s3アクセスキーID。 例えば:

s3_access_key_id = 'acces_key_id'

s3_secret_access_key

デフォルト:なし。

s3シークレットアクセスキー。 例えば:

s3_secret_access_key = 'acces_secret_access_key'

s3_bucket

デフォルト:なし。

s3バケット名。 例えば:

s3_bucket = 'bucket_name'

s3_base_path

デフォルト:なし。

結果キーを保存するために使用するs3バケットのベースパス。 例えば:

s3_base_path = '/prefix'

s3_endpoint_url

デフォルト:なし。

カスタムのs3エンドポイントURL。 これを使用して、カスタムのセルフホストs3互換バックエンド(Ceph、Scality…)に接続します。 例えば:

s3_endpoint_url = 'https://.s3.custom.url'

s3_region

デフォルト:なし。

s3awsリージョン。 例えば:

s3_region = 'us-east-1'

構成例

s3_access_key_id = 's3-access-key-id'
s3_secret_access_key = 's3-secret-access-key'
s3_bucket = 'mybucket'
s3_base_path = '/celery_result_backend'
s3_endpoint_url = 'https://endpoint_url'

Azure BlockBlobバックエンド設定

結果のバックエンドとして AzureBlockBlob を使用するには、:setting: `result_backend` 設定を正しいURLで構成する必要があります。

必要なURL形式は、azureblockblob://の後にストレージ接続文字列が続きます。 ストレージ接続文字列は、AzureポータルのストレージアカウントリソースのAccess Keysペインにあります。

構成例

result_backend = 'azureblockblob://DefaultEndpointsProtocol=https;AccountName=somename;AccountKey=Lou...bzg==;EndpointSuffix=core.windows.net'

azureblockblob_container_name

デフォルト:セロリ。

結果を保存するストレージコンテナの名前。


azureblockblob_base_path

バージョン5.1の新機能。


デフォルト:なし。

結果キーを格納するために使用するストレージコンテナのベースパス。 例えば:

azureblockblob_base_path = 'prefix/'

azureblockblob_retry_initial_backoff_sec

デフォルト:2。

最初の再試行の初期バックオフ間隔(秒単位)。 その後の再試行は、指数戦略を使用して試行されます。


azureblockblob_retry_increment_base

デフォルト:2。


azureblockblob_retry_max_attempts

デフォルト:3。

再試行の最大回数。


azureblockblob_connection_timeout

デフォルト:20。

AzureブロックBLOB接続を確立するための秒単位のタイムアウト。


azureblockblob_read_timeout

デフォルト:120。

紺碧のブロックブロブを読み取るための秒単位のタイムアウト。


Elasticsearchバックエンド設定

結果のバックエンドとして Elasticsearch を使用するには、:setting: `result_backend` 設定を正しいURLで構成する必要があります。

構成例

result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'

elasticsearch_retry_on_timeout

デフォルト:False

タイムアウトは別のノードで再試行をトリガーする必要がありますか?


elasticsearch_max_retries

デフォルト:3。

例外が伝播されるまでの最大再試行回数。


elasticsearch_timeout

デフォルト:10.0秒。

Elasticsearch結果のバックエンドで使用されるグローバルタイムアウト。


elasticsearch_save_meta_as_text

デフォルト:True

メタはテキストまたはネイティブjsonとして保存する必要があります。 結果は常にテキストとしてシリアル化されます。


AWSDynamoDBバックエンド設定

ノート

Dynamodbバックエンドには、:pypi: `boto3` ライブラリが必要です。

このパッケージをインストールするには、 pip を使用します。

$ pip install celery[dynamodb]

複数の拡張要件を組み合わせる方法については、バンドルを参照してください。


警告

Dynamodbバックエンドは、ソートキーが定義されているテーブルと互換性がありません。

パーティションキー以外のものに基づいて結果テーブルをクエリする場合は、代わりにグローバルセカンダリインデックス(GSI)を定義してください。


このバックエンドでは、:setting: `result_backend` 設定をDynamoDBURLに設定する必要があります。

result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'

たとえば、AWSリージョンとテーブル名を指定します。

result_backend = 'dynamodb://@us-east-1/celery_results'

または、デフォルトのテーブル名(celery)を使用し、読み取りと書き込みのプロビジョニングされたスループットを指定して、環境からAWS構成パラメーターを取得します。

result_backend = 'dynamodb://@/?read=5&write=5'

または、ダウンロード可能なバージョンのDynamoDB ローカルを使用します。

result_backend = 'dynamodb://@localhost:8000'

または、ダウンロード可能なバージョンまたは他のサービスを使用して、任意のホストに準拠するAPIをデプロイします。

result_backend = 'dynamodb://@us-east-1'
dynamodb_endpoint_url = 'http://192.168.0.40:8000'

result_backendのDynamoDBURLのフィールドは次のように定義されています。

  1. aws_access_key_id & aws_secret_access_key

    AWSAPIリソースにアクセスするためのクレデンシャル。 これらは、ここで説明されているように、さまざまなソースからの:pypi: `boto3` ライブラリによって解決することもできます。

  2. region

    AWSリージョン、例: ダウンロード可能バージョンの場合はus-east-1またはlocalhost。 定義オプションについては、:pypi: `boto3` ライブラリドキュメントを参照してください。

  3. port

    ダウンロード可能なバージョンを使用している場合は、ローカルDynamoDBインスタンスのリスニングポート。 regionパラメーターをlocalhostとして指定していない場合、このパラメーターを設定しても効果はありません

  4. table

    使用するテーブル名。 デフォルトはceleryです。 許可される文字と長さについては、 DynamoDB命名規則を参照してください。

  5. read & write

    作成されたDynamoDBテーブルの読み取りおよび書き込み容量単位。 デフォルトは、読み取りと書き込みの両方で1です。 詳細については、プロビジョニングされたスループットのドキュメントを参照してください。

  6. ttl_seconds

    有効期限が切れる前の結果の存続時間(秒単位)。 デフォルトでは、結果を期限切れにせず、DynamoDBテーブルの存続可能時間の設定は変更されません。 ttl_secondsが正の値に設定されている場合、結果は指定された秒数後に期限切れになります。 ttl_secondsを負の値に設定すると、結果が期限切れにならず、DynamoDBテーブルの存続可能時間設定がアクティブに無効になります。 テーブルのTimeto Live設定をすばやく連続して複数回変更しようとすると、スロットリングエラーが発生することに注意してください。 詳細については、 DynamoDBTTLドキュメントを参照してください。


IronCacheバックエンド設定

ノート

IronCacheバックエンドには、:pypi: `iron_celery` ライブラリが必要です。

このパッケージをインストールするには、 pip を使用します。

$ pip install iron_celery

IronCacheは、:setting: `result_backend` で提供されるURLを介して構成されます。例:

result_backend = 'ironcache://project_id:token@'

または、キャッシュ名を変更するには:

ironcache:://project_id:token@/awesomecache

詳細については、 https://github.com/iron-io/iron_celeryを参照してください。


Couchbaseバックエンド設定

ノート

Couchbaseバックエンドには、:pypi: `couchbase` ライブラリが必要です。

このパッケージをインストールするには、 pip を使用します。

$ pip install celery[couchbase]

複数の拡張要件を組み合わせる方法については、バンドルを参照してください。


このバックエンドは、Couchbase URLに設定された:setting: `result_backend` を介して構成できます。

result_backend = 'couchbase://username:password@host:port/bucket'

couchbase_backend_settings

デフォルト:{}(空のマッピング)。

これは、次のキーをサポートするdictです。

  • host

    Couchbaseサーバーのホスト名。 デフォルトはlocalhostです。

  • port

    Couchbaseサーバーがリッスンしているポート。 デフォルトは8091です。

  • bucket

    Couchbaseサーバーが書き込んでいるデフォルトのバケット。 デフォルトはdefaultです。

  • username

    Couchbaseサーバーに対して認証するためのユーザー名(オプション)。

  • password

    Couchbaseサーバーに対して認証するためのパスワード(オプション)。


ArangoDBバックエンド設定

ノート

ArangoDBバックエンドには、:pypi: `pyArango` ライブラリが必要です。

このパッケージをインストールするには、 pip を使用します。

$ pip install celery[arangodb]

複数の拡張要件を組み合わせる方法については、バンドルを参照してください。


このバックエンドは、ArangoDB URLに設定された:setting: `result_backend` を介して構成できます。

result_backend = 'arangodb://username:password@host:port/database/collection'

arangodb_backend_settings

デフォルト:{}(空のマッピング)。

これは、次のキーをサポートするdictです。

  • host

    ArangoDBサーバーのホスト名。 デフォルトはlocalhostです。

  • port

    ArangoDBサーバーがリッスンしているポート。 デフォルトは8529です。

  • database

    ArangoDBサーバーのデフォルトデータベースが書き込みを行っています。 デフォルトはceleryです。

  • collection

    ArangoDBサーバーデータベースのデフォルトのコレクションが書き込みを行っています。 デフォルトはceleryです。

  • username

    ArangoDBサーバーに対して認証するためのユーザー名(オプション)。

  • password

    ArangoDBサーバーに対して認証するためのパスワード(オプション)。

  • http_protocol

    ArangoDBサーバー接続のHTTPプロトコル。 デフォルトはhttpです。

  • verify

    ArangoDB接続の作成中のHTTPS検証チェック。 デフォルトはFalseです。


CosmosDBバックエンド設定(実験的)

結果のバックエンドとして CosmosDB を使用するには、:setting: `result_backend` 設定を正しいURLで構成する必要があります。

構成例

result_backend = 'cosmosdbsql://:{InsertAccountPrimaryKeyHere}@{InsertAccountNameHere}.documents.azure.com'

cosmosdbsql_database_name

デフォルト:celerydb。

結果を保存するデータベースの名前。


cosmosdbsql_collection_name

デフォルト:celerycol。

結果を保存するコレクションの名前。


cosmosdbsql_consistency_level

デフォルト:セッション。

Azure CosmosDBクライアント操作でサポートされる整合性レベルを表します。

強度の順序による整合性レベルは、Strong、BoundedStaleness、Session、ConsistentPrefix、およびEventualです。


cosmosdbsql_max_retry_attempts

デフォルト:9。

リクエストに対して実行される再試行の最大数。


cosmosdbsql_max_retry_wait_time

デフォルト:30。

再試行が行われている間、要求を待機するための最大待機時間(秒単位)。


CouchDBバックエンド設定

ノート

CouchDBバックエンドには:pypi: `pycouchdb` ライブラリが必要です。

このCouchbaseパッケージをインストールするには、 pip を使用します。

$ pip install celery[couchdb]

複数の拡張要件を組み合わせる方法については、バンドルを参照してください。


このバックエンドは、CouchDB URLに設定された:setting: `result_backend` を介して構成できます。

result_backend = 'couchdb://username:password@host:port/container'

URLは、次の部分で構成されています。

  • username

    CouchDBサーバーに対して認証するためのユーザー名(オプション)。

  • password

    CouchDBサーバーに対して認証するためのパスワード(オプション)。

  • host

    CouchDBサーバーのホスト名。 デフォルトはlocalhostです。

  • port

    CouchDBサーバーがリッスンしているポート。 デフォルトは8091です。

  • container

    CouchDBサーバーが書き込んでいるデフォルトのコンテナー。 デフォルトはdefaultです。


ファイルシステムのバックエンド設定

このバックエンドは、ファイルURLを使用して構成できます。次に例を示します。

CELERY_RESULT_BACKEND = 'file:///var/celery/results'

構成されたディレクトリは、バックエンドを使用するすべてのサーバーで共有および書き込み可能である必要があります。

単一のシステムでCeleryを試している場合は、追加の構成なしでバックエンドを使用できます。 大規模なクラスターの場合は、NFS、 GlusterFS 、CIFS、 HDFS (FUSEを使用)、またはその他のファイルシステムを使用できます。


領事K / Vストアのバックエンド設定

ノート

Consulバックエンドには:pypi: `python-consul2` ライブラリが必要です。

このパッケージをインストールするには、 pip を使用します。

$ pip install python-consul2

Consulバックエンドは、次のようなURLを使用して構成できます。

CELERY_RESULT_BACKEND = 'consul://localhost:8500/'

また:

result_backend = 'consul://localhost:8500/'

バックエンドは、結果をConsulのK / Vストアに個別のキーとして保存します。 バックエンドは、ConsulでTTLを使用した結果の自動期限切れをサポートします。 URLの完全な構文は次のとおりです。

consul://host:port[?one_client=1]

URLは、次の部分で構成されています。

  • host

    領事サーバーのホスト名。

  • port

    領事サーバーがリッスンしているポート。

  • one_client

    デフォルトでは、正確さのために、バックエンドは操作ごとに個別のクライアント接続を使用します。 極端な負荷の場合、新しい接続の作成率により、負荷がかかっているときにConsulサーバーからHTTP429の「接続が多すぎます」というエラー応答が発生する可能性があります。 これを処理するための推奨される方法は、 https://github.com/poppyred/python-consul2/pull/31のパッチを使用してpython-consul2で再試行を有効にすることです。

    または、one_clientが設定されている場合は、代わりに単一のクライアント接続がすべての操作に使用されます。 これにより、HTTP 429エラーは解消されますが、結果のバックエンドへの保存は信頼できなくなる可能性があります。


メッセージルーティング

task_queues

デフォルト:None(デフォルトのキュー設定から取得されたキュー)。

ほとんどのユーザーはこの設定を指定したくないので、自動ルーティング機能を使用する必要があります。

高度なルーティングを本当に構成する場合、この設定は、ワーカーが使用するkombu.Queueオブジェクトのリストである必要があります。

-Qオプションを使用してワーカーをこの設定でオーバーライドしたり、-Xオプションを使用してこのリストから(名前で)個々のキューを除外したりできることに注意してください。

詳細については、基本も参照してください。

デフォルトは、celeryのキュー/交換/バインディングキーで、交換タイプはdirectです。

:setting: `task_routes` も参照してください。


task_routes

デフォルト:None

ルーターのリスト、またはタスクをキューにルーティングするために使用される単一のルーター。 タスクの最終的な宛先を決定するときは、ルーターが順番に参照されます。

ルーターは次のいずれかとして指定できます。

  • 署名付きの関数(name, args, kwargs, options, task=None, **kwargs)
  • ルーター機能へのパスを提供する文字列。
  • *; ルーターの仕様を含むdict:
    celery.routes.MapRouteインスタンスに変換されます。
  • *; (pattern, route)タプルのリスト:
    celery.routes.MapRouteインスタンスに変換されます。

例:

task_routes = {
    'celery.ping': 'default',
    'mytasks.add': 'cpu-bound',
    'feed.tasks.*': 'feeds',                           # <-- glob pattern
    re.compile(r'(image|video)\.tasks\..*'): 'media',  # <-- regex
    'video.encode': {
        'queue': 'video',
        'exchange': 'media',
        'routing_key': 'media.video.encode',
    },
}

task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})

myapp.tasks.route_taskは次のようになります:

def route_task(self, name, args, kwargs, options, task=None, **kw):
        if task == 'celery.ping':
            return {'queue': 'default'}

route_taskは、文字列またはdictを返す場合があります。 文字列は、:setting: `task_queues` のキュー名であることを意味し、dictはカスタムルートであることを意味します。

タスクを送信するときは、ルーターが順番に参照されます。 Noneを返さない最初のルーターが使用するルートです。 次に、メッセージオプションは、タスクの設定が優先される、見つかったルート設定とマージされます。

apply_async()に次の引数がある場合の例:

Task.apply_async(immediate=False, exchange='video',
                 routing_key='video.compress')

そしてルーターは以下を返します:

{'immediate': True, 'exchange': 'urgent'}

最終的なメッセージオプションは次のようになります。

immediate=False, exchange='video', routing_key='video.compress'

(およびTaskクラスで定義されているデフォルトのメッセージオプション)

:setting: `task_routes` で定義された値は、2つをマージするときに:setting:` task_queues` で定義された値よりも優先されます。

次の設定で:

task_queues = {
    'cpubound': {
        'exchange': 'cpubound',
        'routing_key': 'cpubound',
    },
}

task_routes = {
    'tasks.add': {
        'queue': 'cpubound',
        'routing_key': 'tasks.add',
        'serializer': 'json',
    },
}

tasks.addの最終的なルーティングオプションは次のようになります。

{'exchange': 'cpubound',
 'routing_key': 'tasks.add',
 'serializer': 'json'}

その他の例については、ルーターを参照してください。


task_queue_max_priority

ブローカー
RabbitMQ

デフォルト:None

RabbitMQメッセージの優先順位を参照してください。


task_default_priority

ブローカー
RabbitMQ、Redis

デフォルト:None

RabbitMQメッセージの優先順位を参照してください。


task_inherit_parent_priority

ブローカー
RabbitMQ

デフォルト:False

有効にすると、子タスクは親タスクの優先度を継承します。

# The last task in chain will also have priority set to 5.
chain = celery.chain(add.s(2) | add.s(2).set(priority=5) | add.s(3))

優先度継承は、 delay または apply_async を使用して親タスクから子タスクを呼び出すときにも機能します。

RabbitMQメッセージの優先順位を参照してください。


worker_direct

デフォルト:無効。

このオプションを有効にすると、すべてのワーカーに専用のキューがあり、タスクを特定のワーカーにルーティングできます。

各ワーカーのキュー名は、C.dq交換を使用して、ワーカーのホスト名と.dqサフィックスに基づいて自動的に生成されます。

たとえば、ノード名が[email protected]のワーカーのキュー名は次のようになります。

次に、ルーティングキーとしてホスト名を指定し、C.dq交換を行うことで、タスクをタスクにルーティングできます。

task_routes = {
    'tasks.add': {'exchange': 'C.dq', 'routing_key': '[email protected]'}
}

task_create_missing_queues

デフォルト:有効。

有効(デフォルト)の場合、:setting: `task_queues` で定義されていない指定されたキューは自動的に作成されます。 自動ルーティングを参照してください。


task_default_queue

デフォルト:"celery"

メッセージにルートがないか、カスタムキューが指定されていない場合に、 .apply_async によって使用されるデフォルトキューの名前。

このキューは、:setting: `task_queues` にリストされている必要があります。 :setting: `task_queues` が指定されていない場合、1つのキューエントリを含むように自動的に作成され、この名前がそのキューの名前として使用されます。

task_default_exchange

デフォルト::setting: `task_default_queue` に設定された値を使用します。

:setting: `task_queues` 設定でキーにカスタム交換が指定されていない場合に使用するデフォルトの交換の名前。


task_default_exchange_type

デフォルト:"direct"

:setting: `task_queues` 設定でキーにカスタム交換タイプが指定されていない場合に使用されるデフォルトの交換タイプ。


task_default_routing_key

デフォルト::setting: `task_default_queue` に設定された値を使用します。

:setting: `task_queues` 設定のキーにカスタムルーティングキーが指定されていない場合に使用されるデフォルトのルーティングキー。


task_default_delivery_mode

デフォルト:"persistent"

過渡(メッセージがディスクに書き込まれない)または永続的(ディスクに書き込まれる)にすることができます。


ブローカー設定

broker_url

デフォルト:"amqp://"

デフォルトのブローカーURL。 これは、次の形式のURLである必要があります。

transport://userid:password@hostname:port/virtual_host

スキーム部分(transport://)のみが必要であり、残りはオプションであり、デフォルトで特定のトランスポートのデフォルト値になります。

トランスポート部分は使用するブローカー実装であり、デフォルトはamqpです(インストールされている場合はlibrabbitmqを使用するか、pyamqpにフォールバックします)。 以下を含む他の選択肢もあります。 redis://sqs://、およびqpid://

このスキームは、独自のトランスポート実装への完全に修飾されたパスにすることもできます。

broker_url = 'proj.transports.MyTransport://localhost'

同じトランスポートの複数のブローカーURLを指定することもできます。 ブローカーのURLは、セミコロンで区切られた単一の文字列として渡すことができます。

broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'

またはリストとして:

broker_url = [
    'transport://userid:password@localhost:port//',
    'transport://userid:password@hostname:port//'
]

その後、ブローカーは:setting: `broker_failover_strategy` で使用されます。

詳細については、昆布のドキュメントの kombu:connection-urls を参照してください。


broker_read_url / broker_write_url

デフォルト::setting: `broker_url` から取得。

:setting: `broker_url` の代わりにこれらの設定を構成して、消費と生成に使用されるブローカー接続にさまざまな接続パラメーターを指定できます。

例:

broker_read_url = 'amqp://user:[email protected]:56721'
broker_write_url = 'amqp://user:[email protected]:56722'

両方のオプションは、フェイルオーバーの代替のリストとして指定することもできます。詳細については、:setting: `broker_url` を参照してください。


broker_failover_strategy

デフォルト:"round-robin"

ブローカー接続オブジェクトのデフォルトのフェイルオーバー戦略。 提供されている場合は、「kombu.connection.failover_strategies」のキーにマップするか、提供されたリストから単一のアイテムを生成する任意のメソッドへの参照にすることができます。

例:

# Random failover strategy
def random_failover_strategy(servers):
    it = list(servers)  # don't modify callers list
    shuffle = random.shuffle
    for _ in repeat(None):
        shuffle(it)
        yield it[0]

broker_failover_strategy = random_failover_strategy

broker_heartbeat

サポートされているトランスポート
pyamqp

デフォルト:120.0(サーバーによってネゴシエートされます)。

注:この値はワーカーによってのみ使用され、クライアントは現時点ではハートビートを使用しません。

TCP / IPのみを使用して接続損失をタイムリーに検出できるとは限らないため、AMQPは、接続が閉じられたかどうかを検出するためにクライアントとブローカーの両方で使用されるハートビートと呼ばれるものを定義します。

ハートビート値が10秒の場合、ハートビートは:setting: `broker_heartbeat_checkrate` 設定で指定された間隔で監視されます(デフォルトでは、これはハートビート値の2倍のレートに設定されているため、 10秒、ハートビートは5秒ごとにチェックされます)。


broker_heartbeat_checkrate

サポートされているトランスポート
pyamqp

デフォルト:2.0。

ワーカーは定期的に、ブローカーがあまりにも多くのハートビートを見逃していないことを監視します。 これがチェックされるレートは、:setting: `broker_heartbeat` の値をこの値で割ることによって計算されるため、ハートビートが10.0で、レートがデフォルトの2.0の場合、チェックは5ごとに実行されます。秒(ハートビート送信レートの2倍)。


broker_use_ssl

サポートされているトランスポート
pyamqpredis

デフォルト:無効。

ブローカー接続とSSL設定でSSLの使用を切り替えます。

このオプションの有効な値は、トランスポートによって異なります。

pyamqp

Trueの場合、接続はデフォルトのSSL設定でSSLを使用します。 dictに設定されている場合、指定されたポリシーに従ってSSL接続を構成します。 使用される形式は、Pythonのssl.wrap_socket()オプションです。

SSLソケットは通常、ブローカーによって別のポートで提供されることに注意してください。

クライアント証明書を提供し、カスタム認証局に対してサーバー証明書を検証する例:

import ssl

broker_use_ssl = {
  'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}

警告

broker_use_ssl=Trueの使用には注意してください。 デフォルトの構成では、サーバー証明書がまったく検証されない可能性があります。 Python sslモジュールのセキュリティに関する考慮事項をお読みください。


redis

設定は、次のキーを使用したdictである必要があります。

  • *; ssl_cert_reqs(必須):SSLContext.verify_mode値の1つ:
    *;* ssl.CERT_NONE
    • ssl.CERT_OPTIONAL
    • ssl.CERT_REQUIRED
  • ssl_ca_certs(オプション):CA証明書へのパス
  • ssl_certfile(オプション):クライアント証明書へのパス
  • ssl_keyfile(オプション):クライアントキーへのパス


broker_pool_limit

バージョン2.3の新機能。


デフォルト:10。

接続プールで開くことができる接続の最大数。

プールはバージョン2.5以降、デフォルトで有効になっており、デフォルトの制限は10接続です。 この数は、接続を使用するスレッド/グリーンスレッド(イベントレット/イベント)の数に応じて調整できます。 たとえば、ブローカーへの接続を使用する1000個のグリーンレットでイベントレットを実行すると、競合が発生する可能性があるため、制限を増やすことを検討する必要があります。

Noneまたは0に設定すると、接続プールが無効になり、使用するたびに接続が確立されて閉じられます。


broker_connection_timeout

デフォルト:4.0。

AMQPサーバーへの接続の確立を断念するまでのデフォルトのタイムアウト(秒単位)。 geventを使用する場合、この設定は無効になります。

ノート

ブローカー接続タイムアウトは、ブローカーに接続しようとしているワーカーにのみ適用されます。 タスクを送信するプロデューサーには適用されません。その状況にタイムアウトを提供する方法については、:setting: `broker_transport_options` を参照してください。


broker_connection_retry

デフォルト:有効。

失われた場合、AMQPブローカーへの接続を自動的に再確立しようとします。

再試行間の時間は再試行ごとに増加し、:setting: `broker_connection_max_retries` を超える前に使い果たされません。


broker_connection_max_retries

デフォルト:100。

AMQPブローカーへの接続の再確立を断念するまでの最大再試行回数。

これが0またはNoneに設定されている場合、永久に再試行します。


broker_login_method

デフォルト:"AMQPLAIN"

カスタムamqpログイン方法を設定します。


broker_transport_options

バージョン2.2の新機能。


デフォルト:{}(空のマッピング)。

基礎となるトランスポートに渡される追加オプションの指示。

サポートされているオプション(ある場合)については、トランスポートのユーザーマニュアルを参照してください。

可視性タイムアウトの設定例(RedisおよびSQSトランスポートでサポート):

broker_transport_options = {'visibility_timeout': 18000}  # 5 hours

プロデューサー接続の最大再試行回数を設定する例(最初のタスク実行時にブローカーが使用できない場合、プロデューサーが永久に再試行しないようにするため):

broker_transport_options = {'max_retries': 5}

ワーカー

imports

デフォルト:[](空のリスト)。

ワーカーの起動時にインポートする一連のモジュール。

これは、インポートするタスクモジュールを指定するためだけでなく、シグナルハンドラーや追加のリモートコントロールコマンドなどをインポートするためにも使用されます。

モジュールは元の順序でインポートされます。


include

デフォルト:[](空のリスト)。

:setting: `imports` とまったく同じセマンティクスですが、異なるインポートカテゴリを持つ手段として使用できます。

この設定のモジュールは、:setting: `imports` のモジュールの後にインポートされます。


worker_deduplicate_successful_tasks

バージョン5.1の新機能。


デフォルト:False

各タスクを実行する前に、このタスクが重複メッセージであるかどうかを確認するようにワーカーに指示してください。

重複排除は、同じ識別子を持ち、遅延確認が有効になっていて、メッセージブローカーによって再配信され、結果バックエンドでの状態がSUCCESSであるタスクでのみ発生します。

結果バックエンドがクエリでオーバーフローするのを防ぐために、タスクを受け取った同じワーカーによってタスクがすでに正常に実行されている場合は、結果バックエンドにクエリを実行する前に、正常に実行されたタスクのローカルキャッシュがチェックされます。

このキャッシュは、:setting: `worker_state_db` 設定を設定することで永続化できます。

結果のバックエンドが永続的でない場合(RPCバックエンドなど)、この設定は無視されます。


worker_concurrency

デフォルト:CPUコアの数。

タスクを実行している同時ワーカープロセス/スレッド/グリーンスレッドの数。

主にI / Oを実行している場合は、より多くのプロセスを使用できますが、主にCPUにバインドされている場合は、マシン上のCPUの数に近づけるようにしてください。 設定されていない場合、ホスト上のCPU /コアの数が使用されます。


worker_prefetch_multiplier

デフォルト:4。

一度にプリフェッチするメッセージの数に並行プロセスの数を掛けたもの。 デフォルトは4です(プロセスごとに4つのメッセージ)。 通常はデフォルト設定が適切です。ただし、キューで待機している非常に長時間実行されるタスクがあり、ワーカーを開始する必要がある場合、最初に開始するワーカーは最初に4倍の数のメッセージを受信することに注意してください。 したがって、タスクがワーカーに公平に分散されない場合があります。

プリフェッチを無効にするには、:setting: `worker_prefetch_multiplier` を1に設定します。 この設定を0に変更すると、ワーカーは必要な数のメッセージを消費し続けることができます。

プリフェッチの詳細については、プリフェッチ制限を参照してください。

ノート

ETA /カウントダウンのあるタスクは、プリフェッチ制限の影響を受けません。


worker_lost_wait

デフォルト:10.0秒。

場合によっては、適切なクリーンアップなしでワーカーが殺され、ワーカーが終了する前に結果を公開している可能性があります。 この値は、@WorkerLostError例外を発生させる前に欠落した結果を待機する時間を指定します。


worker_max_tasks_per_child

プールワーカープロセスが新しいタスクに置き換えられる前に実行できるタスクの最大数。 デフォルトは制限なしです。


worker_max_memory_per_child

デフォルト:制限なし。 タイプ:int(キロバイト)

新しいワーカーに置き換えられる前にワーカーが消費する可能性のある常駐メモリーの最大量(キロバイト単位)。 単一のタスクによってワーカーがこの制限を超えた場合、タスクは完了し、その後ワーカーが置き換えられます。

例:

worker_max_memory_per_child = 12000  # 12MB

worker_disable_rate_limits

デフォルト:無効(レート制限が有効)。

タスクに明示的なレート制限が設定されている場合でも、すべてのレート制限を無効にします。


worker_state_db

デフォルト:None

永続的なワーカー状態(取り消されたタスクなど)を格納するために使用されるファイルの名前。 相対パスまたは絶対パスにすることができますが、ファイル名に接尾辞 .db が追加される場合があることに注意してください(Pythonのバージョンによって異なります)。

celery worker --statedb引数を使用して設定することもできます。


worker_timer_precision

デフォルト:1.0秒。

ETAスケジューラがスケジュールを再確認する間にスリープできる最大時間を秒単位で設定します。

この値を1秒に設定すると、スケジューラーの精度は1秒になります。 ミリ秒近くの精度が必要な場合は、これを0.1に設定できます。


worker_enable_remote_control

デフォルト:デフォルトで有効になっています。

ワーカーのリモートコントロールを有効にするかどうかを指定します。


worker_proc_alive_timeout

デフォルト:4.0。

新しいワーカープロセスの起動を待機するときの秒単位のタイムアウト(int / float)。


worker_cancel_long_running_tasks_on_connection_loss

バージョン5.1の新機能。


デフォルト:デフォルトでは無効になっています。

接続が失われたときに遅延確認応答を有効にして、実行時間の長いタスクをすべて強制終了します。

接続が失われる前に確認応答されていなかったタスクは、チャネルがなくなり、タスクがキューに再配信されるため、確認応答できなくなります。 これが、遅延確認が有効になっているタスクが複数回実行される可能性があるため、べき等でなければならない理由です。 この場合、タスクは接続損失ごとに2回実行されます(他のワーカーでは並行して実行されることもあります)。

このオプションをオンにすると、完了していないタスクはキャンセルされ、実行が終了します。 :setting: `task_ignore_result` が有効になっていない限り、接続が失われる前に何らかの方法で完了したタスクは、結果バックエンドにそのまま記録されます。

警告

この機能は、将来の重大な変更として導入されました。 オフにすると、Celeryは警告メッセージを発します。

Celery 6.0では、:setting: `worker_cancel_long_running_tasks_on_connection_loss` はデフォルトでTrueに設定されます。これは、現在の動作が解決するよりも多くの問題を引き起こすためです。


イベント

worker_send_task_events

デフォルト:デフォルトでは無効になっています。

flower などのツールを使用してタスクを監視できるように、タスク関連のイベントを送信します。 ワーカー-E引数のデフォルト値を設定します。


task_send_sent_event

バージョン2.2の新機能。


デフォルト:デフォルトでは無効になっています。

有効にすると、:event: `task-sent` イベントがすべてのタスクに送信されるため、タスクがワーカーによって消費される前に追跡できます。


event_queue_ttl

サポートされているトランスポート
amqp

デフォルト:5.0秒。

モニタークライアントのイベントキューに送信されたメッセージが削除されたときのメッセージの有効期限(秒単位)(int / float)(x-message-ttl

たとえば、この値が10に設定されている場合、このキューに配信されたメッセージは10秒後に削除されます。


event_queue_expires

サポートされているトランスポート
amqp

デフォルト:60.0秒。

モニタークライアントのイベントキューが削除された後の有効期限(秒(int / float))(x-expires)。


event_queue_prefix

デフォルト:"celeryev"

イベントレシーバーキュー名に使用するプレフィックス。


event_exchange

デフォルト:"celeryev"

イベント交換の名前。

警告

このオプションは実験段階ですので、ご注意ください。


event_serializer

デフォルト:"json"

イベントメッセージの送信時に使用されるメッセージシリアル化形式。

も参照してください

シリアライザー


リモートコントロールコマンド

ノート

リモートコントロールコマンドを無効にするには、:setting: `worker_enable_remote_control` 設定を参照してください。


control_queue_ttl

デフォルト:300.0

リモートコントロールコマンドキュー内のメッセージが期限切れになるまでの秒単位の時間。

デフォルトの300秒を使用している場合、これは、リモート制御コマンドが送信され、300秒以内にワーカーがそれを取得しなかった場合、コマンドが破棄されることを意味します。

この設定は、リモートコントロールの応答キューにも適用されます。


control_queue_expires

デフォルト:10.0

未使用のリモート制御コマンドキューがブローカーから削除されるまでの秒単位の時間。

この設定は、リモートコントロールの応答キューにも適用されます。


control_exchange

デフォルト:"celery"

制御コマンド交換の名前。

警告

このオプションは実験段階ですので、ご注意ください。


ロギング

worker_hijack_root_logger

バージョン2.2の新機能。


デフォルト:デフォルトで有効になっています(ルートロガーをハイジャックします)。

デフォルトでは、ルートロガーで以前に構成されたハンドラーはすべて削除されます。 独自のロギングハンドラーをカスタマイズする場合は、 worker_hijack_root_logger = False を設定することで、この動作を無効にできます。

ノート

:signal: `celery.signals.setup_logging` シグナルに接続することで、ロギングをカスタマイズすることもできます。


worker_log_color

デフォルト:アプリがターミナルにログを記録している場合に有効になります。

Celeryアプリによるログ出力の色を有効/無効にします。


worker_log_format

ディフォルト:

"[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"

ログメッセージに使用する形式。

ログ形式の詳細については、Python loggingモジュールを参照してください。


worker_task_log_format

ディフォルト:

"[%(asctime)s: %(levelname)s/%(processName)s]
    %(task_name)s[%(task_id)s]: %(message)s"

ログインしたタスクのログメッセージに使用する形式。

ログ形式の詳細については、Python loggingモジュールを参照してください。


worker_redirect_stdouts

デフォルト:デフォルトで有効になっています。

有効にすると、 stdout および stderr が現在のロガーにリダイレクトされます。

セロリワーカーセロリビートが使用します。


worker_redirect_stdouts_level

デフォルト:WARNING

stdout および stderr へのログレベル出力はとしてログに記録されます。 DEBUGINFOWARNINGERROR、またはCRITICALのいずれかになります。


安全

security_key

デフォルト:None

バージョン2.5の新機能。


メッセージ署名が使用されている場合にメッセージの署名に使用される秘密鍵を含むファイルへの相対パスまたは絶対パス。


security_certificate

デフォルト:None

バージョン2.5の新機能。


メッセージ署名が使用されている場合にメッセージの署名に使用されるX.509証明書ファイルへの相対パスまたは絶対パス。


security_cert_store

デフォルト:None

バージョン2.5の新機能。


メッセージ署名に使用されるX.509証明書を含むディレクトリ。 ワイルドカードを使用したグロブにすることができます(たとえば、/etc/certs/*.pem)。


security_digest

デフォルト:sha256

バージョン4.3の新機能。


メッセージ署名が使用されている場合にメッセージに署名するために使用される暗号ダイジェスト。 https://cryptography.io/en/latest/hazmat/primitives/cryptographic-hashes/#module-cryptography.hazmat.primitives.hashes


カスタムコンポーネントクラス(高度)

worker_pool

デフォルト:"prefork"celery.concurrency.prefork:TaskPool)。

ワーカーが使用するプールクラスの名前。

イベントレット/イベント

このオプションを使用して、イベントレットまたはgeventプールを選択しないでください。 代わりに、セロリワーカーに対して-Pオプションを使用して、モンキーパッチの適用が遅すぎて、奇妙な方法で問題が発生しないようにする必要があります。


worker_pool_restarts

デフォルト:デフォルトでは無効になっています。

有効にすると、:control: `pool_restart` リモートコントロールコマンドを使用してワーカープールを再起動できます。


worker_autoscaler

バージョン2.2の新機能。


デフォルト:"celery.worker.autoscale:Autoscaler"

使用するオートスケーラークラスの名前。


worker_consumer

デフォルト:"celery.worker.consumer:Consumer"

ワーカーが使用するコンシューマークラスの名前。


worker_timer

デフォルト:"kombu.asynchronous.hub.timer:Timer"

ワーカーが使用するETAスケジューラークラスの名前。 デフォルトは、プールの実装によって設定されます。


ビート設定(セロリビート)

beat_schedule

デフォルト:{}(空のマッピング)。

beatで使用される定期的なタスクスケジュール。 エントリを参照してください。


beat_scheduler

デフォルト:"celery.beat:PersistentScheduler"

デフォルトのスケジューラクラス。 たとえば、:pypi: `django-celery-beat` 拡張子と一緒に使用する場合は、"django_celery_beat.schedulers:DatabaseScheduler"に設定できます。

celery beat -S引数を使用して設定することもできます。


beat_schedule_filename

デフォルト:"celerybeat-schedule"

PersistentScheduler が定期的なタスクの最終実行時間を保存するために使用するファイルの名前。 相対パスまたは絶対パスにすることができますが、ファイル名に接尾辞 .db が追加される場合があることに注意してください(Pythonのバージョンによって異なります)。

celery beat --schedule引数を使用して設定することもできます。


beat_sync_every

デフォルト:0。

別のデータベース同期が発行される前に呼び出すことができる定期的なタスクの数。 値0(デフォルト)は、タイミングに基づく同期を意味します-scheduler.sync_everyによって決定されるデフォルトの3分。 1に設定すると、beatはすべてのタスクメッセージが送信された後にsyncを呼び出します。


beat_max_loop_interval

デフォルト:0。

スケジュールを確認するまでの最大スリープ秒数beat

この値のデフォルトはスケジューラー固有です。 デフォルトのCeleryビートスケジューラの場合、値は300(5分)ですが、:pypi: `django-celery-beat` データベーススケジューラの場合、スケジュールが外部で変更される可能性があるため、5秒です。スケジュールの変更を考慮に入れる必要があります。

また、JythonでCelery beat Embedded(-B)をスレッドとして実行すると、最大間隔がオーバーライドされて1に設定されるため、タイムリーにシャットダウンできます。