構成とデフォルト—Pythonドキュメント
構成とデフォルト
このドキュメントでは、使用可能な構成オプションについて説明します。
デフォルトのローダーを使用している場合は、celeryconfig.py
モジュールを作成し、Pythonパスで使用できることを確認する必要があります。
- 設定ファイルの例
- 新しい小文字の設定
- 構成ディレクティブ
- 一般設定
- 時間と日付の設定
- タスク設定
- タスク実行設定
- タスク結果のバックエンド設定
- データベースバックエンド設定
- RPCバックエンド設定
- バックエンド設定をキャッシュする
- MongoDBバックエンド設定
- Redisバックエンド設定
- Cassandraバックエンド設定
- S3バックエンド設定
- Azure BlockBlobバックエンド設定
- Elasticsearchバックエンド設定
- AWSDynamoDBバックエンド設定
- IronCacheバックエンド設定
- Couchbaseバックエンド設定
- ArangoDBバックエンド設定
- CosmosDBバックエンド設定(実験的)
- CouchDBバックエンド設定
- ファイルシステムのバックエンド設定
- 領事K / Vストアのバックエンド設定
- メッセージルーティング
- ブローカー設定
- ワーカー
- イベント
- リモートコントロールコマンド
- ロギング
- 安全
- カスタムコンポーネントクラス(高度)
- ビート設定(セロリビート)
設定ファイルの例
これは、開始するための構成ファイルの例です。 基本的なCeleryセットアップを実行するために必要なものがすべて含まれている必要があります。
## Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'
# List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)
## Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'
task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
新しい小文字の設定
バージョン4.0では、新しい小文字の設定と設定の構成が導入されました。
以前のバージョンとの主な違いは、小文字の名前を除いて、celery_beat_
からbeat_
、celeryd_
からworker_
などの一部のプレフィックスの名前が変更されていることです。また、最上位のcelery_
設定のほとんどは、新しいtask_
プレフィックスに移動されました。
警告
Celeryは、Celery 6.0まで、古い構成ファイルを引き続き読み取ることができます。 その後、古い構成ファイルのサポートは削除されます。 多くのケース( Django を含む)を処理する必要があるcelery upgrade
コマンドを提供します。
できるだけ早く新しい構成スキームに移行してください。
構成ディレクティブ
一般設定
accept_content
デフォルト:{'json'}
(セット、リスト、またはタプル)。
許可するコンテンツタイプ/シリアライザーのホワイトリスト。
このリストにないメッセージを受信した場合、そのメッセージはエラーとともに破棄されます。
デフォルトではjsonのみが有効になっていますが、pickleやyamlなどの任意のコンテンツタイプを追加できます。 この場合、信頼できない当事者がブローカーにアクセスできないようにしてください。 詳細については、セキュリティを参照してください。
例:
# using serializer name
accept_content = ['json']
# or the actual content-type (MIME)
accept_content = ['application/json']
result_accept_content
デフォルト:None
(設定、リスト、またはタプルが可能)。
バージョン4.3の新機能。
結果のバックエンドを可能にするコンテンツタイプ/シリアライザーのホワイトリスト。
このリストにないメッセージを受信した場合、そのメッセージはエラーとともに破棄されます。
デフォルトでは、accept_content
と同じシリアライザーです。 ただし、結果バックエンドの受け入れられたコンテンツに対して別のシリアライザーを指定できます。 通常、これは、署名付きメッセージングが使用され、結果が署名なしで結果バックエンドに保存される場合に必要です。 詳細については、セキュリティを参照してください。
例:
# using serializer name
result_accept_content = ['json']
# or the actual content-type (MIME)
result_accept_content = ['application/json']
時間と日付の設定
enable_utc
バージョン2.5の新機能。
デフォルト:バージョン3.0以降、デフォルトで有効になっています。
有効にすると、メッセージの日付と時刻がUTCタイムゾーンを使用するように変換されます。
2.5未満のCeleryバージョンを実行しているワーカーは、すべてのメッセージに対してローカルタイムゾーンを想定しているため、すべてのワーカーがアップグレードされている場合にのみ有効にすることに注意してください。
timezone
バージョン2.5の新機能。
デフォルト:"UTC"
。
カスタムタイムゾーンを使用するようにCeleryを構成します。 タイムゾーン値は、:pypi: `pytz` ライブラリでサポートされている任意のタイムゾーンにすることができます。
設定されていない場合、UTCタイムゾーンが使用されます。 下位互換性のために、:setting: `enable_utc` 設定もあり、これがfalseに設定されている場合は、代わりにシステムのローカルタイムゾーンが使用されます。
タスク設定
task_annotations
バージョン2.5の新機能。
デフォルト:None
。
この設定を使用して、構成から任意のタスク属性を書き換えることができます。 設定は、dict、またはタスクをフィルタリングして変更する属性のマップを返す注釈オブジェクトのリストにすることができます。
これにより、tasks.add
タスクのrate_limit
属性が変更されます。
task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
または、すべてのタスクで同じように変更します。
task_annotations = {'*': {'rate_limit': '10/s'}}
on_failure
ハンドラーなど、メソッドを変更することもできます。
def my_on_failure(self, exc, task_id, args, kwargs, einfo):
print('Oh no! Task failed: {0!r}'.format(exc))
task_annotations = {'*': {'on_failure': my_on_failure}}
より柔軟性が必要な場合は、dictの代わりにオブジェクトを使用して、注釈を付けるタスクを選択できます。
class MyAnnotate:
def annotate(self, task):
if task.name.startswith('tasks.'):
return {'rate_limit': '10/s'}
task_annotations = (MyAnnotate(), {other,})
task_compression
デフォルト:None
タスクメッセージに使用されるデフォルトの圧縮。 gzip
、bzip2
(利用可能な場合)、または昆布圧縮レジストリに登録されている任意のカスタム圧縮スキームにすることができます。
デフォルトでは、非圧縮メッセージを送信します。
task_protocol
デフォルト:2(4.0以降)。
タスクの送信に使用されるデフォルトのタスクメッセージプロトコルバージョンを設定します。 プロトコルをサポート:1および2。
プロトコル2は、3.1.24および4.x +でサポートされています。
task_serializer
デフォルト:"json"
(4.0以降:ピクルス)。
使用するデフォルトのシリアル化方法を識別する文字列。 json (デフォルト)、 pickle 、 yaml 、 msgpack 、またはkombu.serialization.registry
。
task_publish_retry
バージョン2.2の新機能。
デフォルト:有効。
接続が失われた場合やその他の接続エラーが発生した場合に、公開タスクメッセージを再試行するかどうかを決定します。 :setting: `task_publish_retry_policy` も参照してください。
task_publish_retry_policy
バージョン2.2の新機能。
デフォルト:メッセージ送信の再試行を参照してください。
接続が失われた場合やその他の接続エラーが発生した場合にタスクメッセージの公開を再試行するときのデフォルトポリシーを定義します。
タスク実行設定
task_always_eager
デフォルト:無効。
これがTrue
の場合、すべてのタスクは、タスクが戻るまでブロックすることによってローカルで実行されます。 apply_async()
とTask.delay()
は、EagerResult
インスタンスを返します。これは、結果がすでに評価されていることを除いて、AsyncResult
のAPIと動作をエミュレートします。
つまり、タスクはキューに送信されるのではなく、ローカルで実行されます。
task_eager_propagates
デフォルト:無効。
これがTrue
の場合、熱心に実行されたタスク( task.apply()によって適用されるか、:setting: `task_always_eager` 設定が有効になっている場合)が伝播されます例外。
これは、常にapply()
をthrow=True
で実行するのと同じです。
task_store_eager_result
バージョン5.1の新機能。
デフォルト:無効。
これがTrue
および:setting: `task_always_eager` がTrue
であり、:setting:` task_ignore_result` がFalse
の場合、熱心に実行されたタスクの結果はバックエンドに保存されます。
デフォルトでは、:setting: `task_always_eager` がTrue
に設定され、:setting:` task_ignore_result` がFalse
に設定されていても、結果は保存されません。
task_remote_tracebacks
デフォルト:無効。
有効にすると、タスクの結果には、タスクエラーを再発生させるときにワーカースタックが含まれます。
これには、:pypi: `tblib` ライブラリが必要です。これは、 pip を使用してインストールできます。
$ pip install celery[tblib]
複数の拡張要件を組み合わせる方法については、バンドルを参照してください。
task_ignore_result
デフォルト:無効。
タスクの戻り値を保存するかどうか(トゥームストーン)。 正常な戻り値ではなく、エラーを保存したい場合は、:setting: `task_store_errors_even_if_ignored` を設定できます。
task_store_errors_even_if_ignored
デフォルト:無効。
設定されている場合、Task.ignore_result
がオンの場合でも、ワーカーはすべてのタスクエラーを結果ストアに保存します。
task_track_started
デフォルト:無効。
True
の場合、タスクがワーカーによって実行されると、タスクはそのステータスを「開始済み」として報告します。 通常の動作ではそのレベルの粒度を報告しないため、デフォルト値はFalse
です。 タスクは保留中、終了中、または再試行待ちのいずれかです。 「開始済み」状態にすることは、実行時間の長いタスクがあり、現在実行中のタスクを報告する必要がある場合に役立ちます。
task_time_limit
デフォルト:時間制限なし。
タスクのハードタイム制限(秒単位)。 これを超えると、タスクを処理しているワーカーが強制終了され、新しいワーカーに置き換えられます。
task_soft_time_limit
デフォルト:ソフト時間制限なし。
タスクのソフト制限時間(秒単位)。
これを超えると、@SoftTimeLimitExceeded
例外が発生します。 たとえば、タスクはこれをキャッチして、困難な時間制限が来る前にクリーンアップすることができます。
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
task_acks_late
デフォルト:無効。
Late ackは、タスクメッセージがの直前ではなくの実行後に確認応答されることを意味します(デフォルトの動作)。
task_acks_on_failure_or_timeout
デフォルト:有効
有効にすると、すべてのタスクのメッセージは、失敗したりタイムアウトしたりしても確認されます。
この設定の構成は、実行後に確認されたタスクにのみ適用され、:setting: `task_acks_late` が有効になっている場合にのみ適用されます。
task_reject_on_worker_lost
デフォルト:無効。
:setting: `task_acks_late` が有効になっている場合でも、タスクを実行しているワーカープロセスが突然終了するか、シグナルが送信されると、ワーカーはタスクを確認します(:sig:` KILL` / [ X186X]:sig: `INT` など)。
これをtrueに設定すると、代わりにメッセージを再キューイングできるため、タスクは同じワーカーまたは別のワーカーによって再度実行されます。
警告
これを有効にすると、メッセージループが発生する可能性があります。 自分が何をしているのかを確認してください。
task_default_rate_limit
デフォルト:レート制限なし。
タスクのグローバルなデフォルトのレート制限。
この値は、カスタムレート制限がないタスクに使用されます
も参照してください
設定: worker_disable_rate_limits 設定は、すべてのレート制限を無効にすることができます。
タスク結果のバックエンド設定
result_backend
デフォルト:デフォルトで有効になっている結果バックエンドはありません。
タスクの結果(トゥームストーン)を格納するために使用されるバックエンド。 次のいずれかになります。
- *;
rpc
- 結果をAMQPメッセージとして送り返します RPCバックエンド設定を参照してください。
- *;
database
- SQLAlchemy でサポートされているリレーショナルデータベースを使用してください。 conf-database-result-backend を参照してください。
- *;
redis
- Redis を使用して結果を保存します。 Redisバックエンド設定を参照してください。
- *;
cache
- Memcached を使用して結果を保存します。 キャッシュバックエンド設定を参照してください。
- *; mongodb
- MongoDB を使用して結果を保存します。 MongoDBバックエンド設定を参照してください。
- *;
cassandra
- Cassandra を使用して結果を保存します。 Cassandraバックエンド設定を参照してください。
- *;
elasticsearch
- Elasticsearch を使用して結果を保存します。 Elasticsearchバックエンド設定を参照してください。
- *;
ironcache
- IronCache を使用して結果を保存します。 IronCacheバックエンド設定を参照してください。
- *;
couchbase
- Couchbase を使用して結果を保存します。 Couchbaseバックエンド設定を参照してください。
- *;
arangodb
- ArangoDB を使用して結果を保存します。 ArangoDBバックエンド設定を参照してください。
- *;
couchdb
- CouchDB を使用して結果を保存します。 CouchDBバックエンド設定を参照してください。
- *;
cosmosdbsql (experimental)
- CosmosDB PaaSを使用して結果を保存します。 CosmosDBバックエンド設定(実験的)を参照してください。
- *;
filesystem
- 共有ディレクトリを使用して結果を保存します。 ファイルシステムバックエンド設定を参照してください。
- *;
consul
- Consul K / Vストアを使用して結果を保存します。 ConsulK / Vストアのバックエンド設定を参照してください。
- *;
azureblockblob
- AzureBlockBlob PaaSストアを使用して、結果を保存します。 Azure BlockBlobバックエンド設定を参照してください。
- *;
s3
- S3 を使用して結果を保存します S3バックエンド設定を参照してください。
result_backend_always_retry
デフォルト:False
有効にすると、バックエンドは、例外を伝播する代わりに、回復可能な例外のイベントで再試行を試みます。 2回の再試行の間に指数バックオフスリープ時間を使用します。
result_backend_max_sleep_between_retries_ms
デフォルト:10000
これは、2回のバックエンド操作の再試行間の最大スリープ時間を指定します。
result_backend_base_sleep_between_retries_ms
デフォルト:10
これは、2回のバックエンド操作の再試行間の基本スリープ時間を指定します。
result_backend_max_retries
デフォルト:Inf
これは、回復可能な例外の場合の再試行の最大数です。
result_backend_transport_options
デフォルト:{}
(空のマッピング)。
基礎となるトランスポートに渡される追加オプションの指示。
サポートされているオプション(ある場合)については、トランスポートのユーザーマニュアルを参照してください。
可視性タイムアウトの設定例(RedisおよびSQSトランスポートでサポート):
result_backend_transport_options = {'visibility_timeout': 18000} # 5 hours
result_serializer
デフォルト:4.0以降のjson
(以前:pickle)。
結果のシリアル化形式。
サポートされているシリアル化形式については、 Serializers を参照してください。
result_compression
デフォルト:圧縮なし。
タスクの結果に使用されるオプションの圧縮方法。 :setting: `task_compression` 設定と同じオプションをサポートします。
result_extended
デフォルト:False
拡張タスク結果属性(name、args、kwargs、worker、retries、queue、delivery_info)をバックエンドに書き込むことができるようにします。
result_expires
デフォルト:1日後に期限切れになります。
保存されたタスクトゥームストーンが削除されるまでの時間(秒単位、またはtimedelta
オブジェクト)。
celery beat
が有効になっていると仮定すると、組み込みの定期タスクは、この時間(celery.backend_cleanup
)以降に結果を削除します。 タスクは毎日午前4時に実行されます。
None
または0の値は、結果が期限切れにならないことを意味します(バックエンドの仕様によって異なります)。
ノート
現時点では、これはAMQP、データベース、キャッシュ、Couchbase、およびRedisバックエンドでのみ機能します。
データベースバックエンドを使用する場合、結果の有効期限が切れるには、celery beat
が実行されている必要があります。
result_cache_max
デフォルト:デフォルトでは無効になっています。
結果のクライアントキャッシュを有効にします。
これは、1つの結果インスタンスが結果を消費するとすぐに結果が利用できなくなる古い非推奨の「amqp」バックエンドに役立ちます。
これは、古い結果が削除される前にキャッシュする結果の総数です。 0またはNoneの値は制限がないことを意味し、-1
の値はキャッシュを無効にします。
デフォルトでは無効になっています。
result_chord_join_timeout
デフォルト:3.0。
コード内でグループの結果に参加するときの秒単位のタイムアウト(int / float)。
result_chord_retry_interval
デフォルト:1.0。
コードタスクを再試行するためのデフォルトの間隔。
override_backends
デフォルト:デフォルトでは無効になっています。
バックエンドを実装するクラスへのパス。
バックエンドの実装をオーバーライドできます。 これは、実行されたタスクに関する追加のメタデータを保存したり、再試行ポリシーを上書きしたりする必要がある場合に役立ちます。
例:
override_backends = {"db": "custom_module.backend.class"}
データベースバックエンド設定
データベースURLの例
データベースバックエンドを使用するには、接続URLとdb+
プレフィックスを使用して:setting: `result_backend` 設定を構成する必要があります。
result_backend = 'db+scheme://user:password@host:port/dbname'
例:
# sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'
# mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'
# postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'
# oracle
result_backend = 'db+oracle://scott:[email protected]:1521/sidname'
サポートされているデータベースの表についてはサポートされているデータベースを、接続文字列の詳細については接続文字列を参照してください(これはdb+
の後に続くURIの一部です。プレフィックス)。
database_engine_options
デフォルト:{}
(空のマッピング)。
追加のSQLAlchemyデータベースエンジンオプションを指定するには、:setting: `database_engine_options` 設定を使用できます。
# echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}
database_short_lived_sessions
デフォルト:デフォルトでは無効になっています。
短期間のセッションはデフォルトで無効になっています。 有効にすると、特に多くのタスクを処理するシステムで、パフォーマンスが大幅に低下する可能性があります。 このオプションは、キャッシュされたデータベース接続が非アクティブで古くなった結果としてエラーが発生するトラフィックの少ないワーカーで役立ちます。 たとえば、(OperationalError)(2006、 'MySQLサーバーがなくなりました')のような断続的なエラーは、短期間のセッションを有効にすることで修正できます。 このオプションは、データベースのバックエンドにのみ影響します。
database_table_schemas
デフォルト:{}
(空のマッピング)。
SQLAlchemyが結果バックエンドとして構成されている場合、Celeryはタスクの結果メタデータを格納するために2つのテーブルを自動的に作成します。 この設定により、テーブルのスキーマをカスタマイズできます。
# use custom schema for the database result backend.
database_table_schemas = {
'task': 'celery',
'group': 'celery',
}
database_table_names
デフォルト:{}
(空のマッピング)。
SQLAlchemyが結果バックエンドとして構成されている場合、Celeryはタスクの結果メタデータを格納するために2つのテーブルを自動的に作成します。 この設定により、テーブル名をカスタマイズできます。
# use custom table names for the database result backend.
database_table_names = {
'task': 'myapp_taskmeta',
'group': 'myapp_groupmeta',
}
RPCバックエンド設定
result_persistent
デフォルト:デフォルトで無効になっています(一時的なメッセージ)。
True
に設定すると、結果メッセージは永続的になります。 これは、ブローカーの再始動後にメッセージが失われないことを意味します。
構成例
result_backend = 'rpc://'
result_persistent = False
注意:タスクトゥームストーンが古すぎるの場合、このバックエンドを使用するとcelery.backends.rpc.BacklogLimitExceeded
のレイズがトリガーされる可能性があります。
例えば
for i in range(10000):
r = debug_task.delay()
print(r.state) # this would raise celery.backends.rpc.BacklogLimitExceeded
バックエンド設定をキャッシュする
ノート
キャッシュバックエンドは、:pypi: `pylibmc` および:pypi:` python-memcached` ライブラリをサポートします。 後者は、:pypi: `pylibmc` がインストールされていない場合にのみ使用されます。
単一のMemcachedサーバーを使用する:
result_backend = 'cache+memcached://127.0.0.1:11211/'
複数のMemcachedサーバーの使用:
result_backend = """
cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()
「メモリ」バックエンドは、キャッシュをメモリにのみ保存します。
result_backend = 'cache'
cache_backend = 'memory'
cache_backend_options
デフォルト:{}
(空のマッピング)。
:pypi: `pylibmc` オプションは、:setting:` cache_backend_options` 設定を使用して設定できます。
cache_backend_options = {
'binary': True,
'behaviors': {'tcp_nodelay': True},
}
cache_backend
:setting: `result_backend` 設定でキャッシュバックエンドを直接指定できるようになったため、この設定はceleryの組み込みバックエンドでは使用されなくなりました。
ノート
django-celery-results-結果としてDjangoORM / Cacheを使用するバックエンドライブラリは、cache_backend
を使用してdjangoキャッシュを選択します。
MongoDBバックエンド設定
mongodb_backend_settings
これは、次のキーをサポートするdictです。
- データベース
接続するデータベース名。 デフォルトは
celery
です。
- taskmeta_collection
タスクのメタデータを格納するコレクション名。 デフォルトは
celery_taskmeta
です。
- max_pool_size
max_pool_sizeとしてPyMongoのConnectionまたはMongoClientコンストラクターに渡されます。 これは、特定の時間にMongoDBに対して開いたままにするTCP接続の最大数です。 max_pool_sizeよりも多くの開いている接続がある場合、ソケットは解放されたときに閉じられます。 デフォルトは10です。
オプション
mongodb接続コンストラクターに渡す追加のキーワード引数。 サポートされている引数のリストについては、
pymongo
のドキュメントを参照してください。
構成例
result_backend = 'mongodb://localhost:27017/'
mongodb_backend_settings = {
'database': 'mydb',
'taskmeta_collection': 'my_taskmeta_collection',
}
Redisバックエンド設定
バックエンドURLの構成
ノート
Redisバックエンドには、:pypi: `redis` ライブラリが必要です。
このパッケージをインストールするには、 pip を使用します。
$ pip install celery[redis]
複数の拡張要件を組み合わせる方法については、バンドルを参照してください。
このバックエンドでは、:setting: `result_backend` 設定をRedisまたは Redis over TLS URLに設定する必要があります。
result_backend = 'redis://username:password@host:port/db'
例えば:
result_backend = 'redis://localhost/0'
と同じです:
result_backend = 'redis://'
rediss://
プロトコルを使用して、TLSを介してredisに接続します。
result_backend = 'rediss://username:password@host:port/db?ssl_cert_reqs=required'
ssl_cert_reqs
文字列はrequired
、optional
、またはnone
のいずれかである必要があることに注意してください(ただし、下位互換性のために、文字列は[ X151X] 、CERT_OPTIONAL
、CERT_NONE
)。
Unixソケット接続を使用する必要がある場合、URLは次の形式である必要があります::
result_backend = 'socket:///path/to/redis.sock'
URLのフィールドは次のように定義されています。
username
バージョン5.1.0の新機能。
データベースへの接続に使用されるユーザー名。
これは、Redis> = 6.0で、py-redis> = 3.4.0がインストールされている場合にのみサポートされることに注意してください。
古いデータベースバージョンまたは古いクライアントバージョンを使用している場合は、ユーザー名を省略できます。
result_backend = 'redis://:password@host:port/db'
password
データベースへの接続に使用されるパスワード。
host
Redisサーバーのホスト名またはIPアドレス(例: localhost )。
port
Redisサーバーに移植します。 デフォルトは6379です。
db
使用するデータベース番号。 デフォルトは0です。 dbには、オプションの先頭のスラッシュを含めることができます。
TLS接続(プロトコルはrediss://
)を使用する場合、:setting: `broker_use_ssl` のすべての値をクエリパラメーターとして渡すことができます。 証明書へのパスはURLエンコードする必要があり、ssl_cert_reqs
が必要です。 例:
result_backend = 'rediss://:password@host:port/db?\
ssl_cert_reqs=required\
&ssl_ca_certs=%2Fvar%2Fssl%2Fmyca.pem\ # /var/ssl/myca.pem
&ssl_certfile=%2Fvar%2Fssl%2Fredis-server-cert.pem\ # /var/ssl/redis-server-cert.pem
&ssl_keyfile=%2Fvar%2Fssl%2Fprivate%2Fworker-key.pem' # /var/ssl/private/worker-key.pem
ssl_cert_reqs
文字列はrequired
、optional
、またはnone
のいずれかである必要があることに注意してください(ただし、下位互換性のために、文字列は[ X151X] 、CERT_OPTIONAL
、CERT_NONE
)。
バージョン5.1.0の新機能。
redis_backend_health_check_interval
デフォルト:未構成
Redisバックエンドはヘルスチェックをサポートしています。 この値は、ヘルスチェック間の秒数を値とする整数として設定する必要があります。 ヘルスチェック中にConnectionErrorまたはTimeoutErrorが発生した場合、接続が再確立され、コマンドが1回だけ再試行されます。
redis_backend_use_ssl
デフォルト:無効。
RedisバックエンドはSSLをサポートしています。 この値は、辞書の形式で設定する必要があります。 有効なキーと値のペアは、:setting: `broker_use_ssl` のredis
サブセクションに記載されているものと同じです。
redis_max_connections
デフォルト:制限なし。
結果の送信と取得に使用されるRedis接続プールで使用可能な接続の最大数。
警告
同時接続の数が最大値を超えると、Redisは ConnectionError を発生させます。
redis_socket_connect_timeout
バージョン4.0.1の新機能。
デフォルト:None
結果バックエンドからRedisへの接続のソケットタイムアウト(秒単位)(int / float)
redis_socket_timeout
デフォルト:120.0秒。
Redisサーバーへの読み取り/書き込み操作のソケットタイムアウト(秒単位)(int / float)。redis結果バックエンドで使用されます。
redis_retry_on_timeout
バージョン4.4.1の新機能。
デフォルト:False
Redis結果バックエンドによって使用されるRedisサーバーへのTimeoutErrorの読み取り/書き込み操作を再試行します。 UNIXソケットによるRedis接続を使用している場合は、この変数を設定しないでください。
redis_socket_keepalive
バージョン4.4.1の新機能。
デフォルト:False
Redisサーバーへの接続を正常に保つためのソケットTCPキープアライブ。Redis結果バックエンドで使用されます。
Cassandraバックエンド設定
ノート
このCassandraバックエンドドライバーには:pypi: `cassandra-driver` が必要です。
インストールするには、 pip を使用します。
$ pip install celery[cassandra]
複数の拡張要件を組み合わせる方法については、バンドルを参照してください。
このバックエンドでは、次の構成ディレクティブを設定する必要があります。
cassandra_servers
デフォルト:[]
(空のリスト)。
host
Cassandraサーバーのリスト。 例えば:
cassandra_servers = ['localhost']
cassandra_port
デフォルト:9042。
Cassandraサーバーに接続するためのポート。
cassandra_keyspace
デフォルト:なし。
結果を保存するためのキースペース。 例えば:
cassandra_keyspace = 'tasks_keyspace'
cassandra_table
デフォルト:なし。
結果を格納するテーブル(列ファミリー)。 例えば:
cassandra_table = 'tasks'
cassandra_read_consistency
デフォルト:なし。
使用される読み取り整合性。 値には、ONE
、TWO
、THREE
、QUORUM
、ALL
、LOCAL_QUORUM
、EACH_QUORUM
があります。 ]、LOCAL_ONE
。
cassandra_write_consistency
デフォルト:なし。
使用される書き込みの一貫性。 値には、ONE
、TWO
、THREE
、QUORUM
、ALL
、LOCAL_QUORUM
、EACH_QUORUM
があります。 ]、LOCAL_ONE
。
cassandra_entry_ttl
デフォルト:なし。
ステータスエントリの存続時間。 それらは期限切れになり、追加してから数秒後に削除されます。 None
(デフォルト)の値は、期限切れにならないことを意味します。
cassandra_auth_provider
デフォルト:None
。
使用するcassandra.auth
モジュール内のAuthProviderクラス。 値はPlainTextAuthProvider
またはSaslAuthProvider
です。
cassandra_auth_kwargs
デフォルト:{}
(空のマッピング)。
認証プロバイダーに渡す名前付き引数。 例えば:
cassandra_auth_kwargs = {
username: 'cassandra',
password: 'cassandra'
}
cassandra_options
デフォルト:{}
(空のマッピング)。
cassandra.cluster
クラスに渡す名前付き引数。
cassandra_options = {
'cql_version': '3.2.1'
'protocol_version': 3
}
構成例
cassandra_servers = ['localhost']
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'ONE'
cassandra_write_consistency = 'ONE'
cassandra_entry_ttl = 86400
S3バックエンド設定
ノート
このs3バックエンドドライバーには:pypi: `s3` が必要です。
インストールするには、 s3 を使用します。
$ pip install celery[s3]
複数の拡張要件を組み合わせる方法については、バンドルを参照してください。
このバックエンドでは、次の構成ディレクティブを設定する必要があります。
s3_access_key_id
デフォルト:なし。
s3アクセスキーID。 例えば:
s3_access_key_id = 'acces_key_id'
s3_secret_access_key
デフォルト:なし。
s3シークレットアクセスキー。 例えば:
s3_secret_access_key = 'acces_secret_access_key'
s3_bucket
デフォルト:なし。
s3バケット名。 例えば:
s3_bucket = 'bucket_name'
s3_base_path
デフォルト:なし。
結果キーを保存するために使用するs3バケットのベースパス。 例えば:
s3_base_path = '/prefix'
s3_endpoint_url
デフォルト:なし。
カスタムのs3エンドポイントURL。 これを使用して、カスタムのセルフホストs3互換バックエンド(Ceph、Scality…)に接続します。 例えば:
s3_endpoint_url = 'https://.s3.custom.url'
s3_region
デフォルト:なし。
s3awsリージョン。 例えば:
s3_region = 'us-east-1'
構成例
s3_access_key_id = 's3-access-key-id'
s3_secret_access_key = 's3-secret-access-key'
s3_bucket = 'mybucket'
s3_base_path = '/celery_result_backend'
s3_endpoint_url = 'https://endpoint_url'
Azure BlockBlobバックエンド設定
結果のバックエンドとして AzureBlockBlob を使用するには、:setting: `result_backend` 設定を正しいURLで構成する必要があります。
必要なURL形式は、azureblockblob://
の後にストレージ接続文字列が続きます。 ストレージ接続文字列は、AzureポータルのストレージアカウントリソースのAccess Keys
ペインにあります。
構成例
result_backend = 'azureblockblob://DefaultEndpointsProtocol=https;AccountName=somename;AccountKey=Lou...bzg==;EndpointSuffix=core.windows.net'
azureblockblob_container_name
デフォルト:セロリ。
結果を保存するストレージコンテナの名前。
azureblockblob_base_path
バージョン5.1の新機能。
デフォルト:なし。
結果キーを格納するために使用するストレージコンテナのベースパス。 例えば:
azureblockblob_base_path = 'prefix/'
azureblockblob_retry_initial_backoff_sec
デフォルト:2。
最初の再試行の初期バックオフ間隔(秒単位)。 その後の再試行は、指数戦略を使用して試行されます。
azureblockblob_retry_increment_base
デフォルト:2。
azureblockblob_retry_max_attempts
デフォルト:3。
再試行の最大回数。
azureblockblob_connection_timeout
デフォルト:20。
AzureブロックBLOB接続を確立するための秒単位のタイムアウト。
azureblockblob_read_timeout
デフォルト:120。
紺碧のブロックブロブを読み取るための秒単位のタイムアウト。
Elasticsearchバックエンド設定
結果のバックエンドとして Elasticsearch を使用するには、:setting: `result_backend` 設定を正しいURLで構成する必要があります。
構成例
result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
elasticsearch_retry_on_timeout
デフォルト:False
タイムアウトは別のノードで再試行をトリガーする必要がありますか?
elasticsearch_max_retries
デフォルト:3。
例外が伝播されるまでの最大再試行回数。
elasticsearch_timeout
デフォルト:10.0秒。
Elasticsearch結果のバックエンドで使用されるグローバルタイムアウト。
elasticsearch_save_meta_as_text
デフォルト:True
メタはテキストまたはネイティブjsonとして保存する必要があります。 結果は常にテキストとしてシリアル化されます。
AWSDynamoDBバックエンド設定
ノート
Dynamodbバックエンドには、:pypi: `boto3` ライブラリが必要です。
このパッケージをインストールするには、 pip を使用します。
$ pip install celery[dynamodb]
複数の拡張要件を組み合わせる方法については、バンドルを参照してください。
警告
Dynamodbバックエンドは、ソートキーが定義されているテーブルと互換性がありません。
パーティションキー以外のものに基づいて結果テーブルをクエリする場合は、代わりにグローバルセカンダリインデックス(GSI)を定義してください。
このバックエンドでは、:setting: `result_backend` 設定をDynamoDBURLに設定する必要があります。
result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'
たとえば、AWSリージョンとテーブル名を指定します。
result_backend = 'dynamodb://@us-east-1/celery_results'
または、デフォルトのテーブル名(celery
)を使用し、読み取りと書き込みのプロビジョニングされたスループットを指定して、環境からAWS構成パラメーターを取得します。
result_backend = 'dynamodb://@/?read=5&write=5'
または、ダウンロード可能なバージョンのDynamoDB ローカルを使用します。
result_backend = 'dynamodb://@localhost:8000'
または、ダウンロード可能なバージョンまたは他のサービスを使用して、任意のホストに準拠するAPIをデプロイします。
result_backend = 'dynamodb://@us-east-1'
dynamodb_endpoint_url = 'http://192.168.0.40:8000'
result_backend
のDynamoDBURLのフィールドは次のように定義されています。
aws_access_key_id & aws_secret_access_key
AWSAPIリソースにアクセスするためのクレデンシャル。 これらは、ここで説明されているように、さまざまなソースからの:pypi: `boto3` ライブラリによって解決することもできます。
region
AWSリージョン、例: ダウンロード可能バージョンの場合は
us-east-1
またはlocalhost
。 定義オプションについては、:pypi: `boto3` ライブラリドキュメントを参照してください。port
ダウンロード可能なバージョンを使用している場合は、ローカルDynamoDBインスタンスのリスニングポート。
region
パラメーターをlocalhost
として指定していない場合、このパラメーターを設定しても効果はありません。table
使用するテーブル名。 デフォルトは
celery
です。 許可される文字と長さについては、 DynamoDB命名規則を参照してください。read & write
作成されたDynamoDBテーブルの読み取りおよび書き込み容量単位。 デフォルトは、読み取りと書き込みの両方で
1
です。 詳細については、プロビジョニングされたスループットのドキュメントを参照してください。ttl_seconds
有効期限が切れる前の結果の存続時間(秒単位)。 デフォルトでは、結果を期限切れにせず、DynamoDBテーブルの存続可能時間の設定は変更されません。
ttl_seconds
が正の値に設定されている場合、結果は指定された秒数後に期限切れになります。ttl_seconds
を負の値に設定すると、結果が期限切れにならず、DynamoDBテーブルの存続可能時間設定がアクティブに無効になります。 テーブルのTimeto Live設定をすばやく連続して複数回変更しようとすると、スロットリングエラーが発生することに注意してください。 詳細については、 DynamoDBTTLドキュメントを参照してください。
IronCacheバックエンド設定
ノート
IronCacheバックエンドには、:pypi: `iron_celery` ライブラリが必要です。
このパッケージをインストールするには、 pip を使用します。
$ pip install iron_celery
IronCacheは、:setting: `result_backend` で提供されるURLを介して構成されます。例:
result_backend = 'ironcache://project_id:token@'
または、キャッシュ名を変更するには:
ironcache:://project_id:token@/awesomecache
詳細については、 https://github.com/iron-io/iron_celeryを参照してください。
Couchbaseバックエンド設定
ノート
Couchbaseバックエンドには、:pypi: `couchbase` ライブラリが必要です。
このパッケージをインストールするには、 pip を使用します。
$ pip install celery[couchbase]
複数の拡張要件を組み合わせる方法については、バンドルを参照してください。
このバックエンドは、Couchbase URLに設定された:setting: `result_backend` を介して構成できます。
result_backend = 'couchbase://username:password@host:port/bucket'
couchbase_backend_settings
デフォルト:{}
(空のマッピング)。
これは、次のキーをサポートするdictです。
host
Couchbaseサーバーのホスト名。 デフォルトは
localhost
です。port
Couchbaseサーバーがリッスンしているポート。 デフォルトは
8091
です。bucket
Couchbaseサーバーが書き込んでいるデフォルトのバケット。 デフォルトは
default
です。username
Couchbaseサーバーに対して認証するためのユーザー名(オプション)。
password
Couchbaseサーバーに対して認証するためのパスワード(オプション)。
ArangoDBバックエンド設定
ノート
ArangoDBバックエンドには、:pypi: `pyArango` ライブラリが必要です。
このパッケージをインストールするには、 pip を使用します。
$ pip install celery[arangodb]
複数の拡張要件を組み合わせる方法については、バンドルを参照してください。
このバックエンドは、ArangoDB URLに設定された:setting: `result_backend` を介して構成できます。
result_backend = 'arangodb://username:password@host:port/database/collection'
arangodb_backend_settings
デフォルト:{}
(空のマッピング)。
これは、次のキーをサポートするdictです。
host
ArangoDBサーバーのホスト名。 デフォルトは
localhost
です。port
ArangoDBサーバーがリッスンしているポート。 デフォルトは
8529
です。database
ArangoDBサーバーのデフォルトデータベースが書き込みを行っています。 デフォルトは
celery
です。collection
ArangoDBサーバーデータベースのデフォルトのコレクションが書き込みを行っています。 デフォルトは
celery
です。username
ArangoDBサーバーに対して認証するためのユーザー名(オプション)。
password
ArangoDBサーバーに対して認証するためのパスワード(オプション)。
http_protocol
ArangoDBサーバー接続のHTTPプロトコル。 デフォルトは
http
です。verify
ArangoDB接続の作成中のHTTPS検証チェック。 デフォルトは
False
です。
CosmosDBバックエンド設定(実験的)
結果のバックエンドとして CosmosDB を使用するには、:setting: `result_backend` 設定を正しいURLで構成する必要があります。
構成例
result_backend = 'cosmosdbsql://:{InsertAccountPrimaryKeyHere}@{InsertAccountNameHere}.documents.azure.com'
cosmosdbsql_database_name
デフォルト:celerydb。
結果を保存するデータベースの名前。
cosmosdbsql_collection_name
デフォルト:celerycol。
結果を保存するコレクションの名前。
cosmosdbsql_consistency_level
デフォルト:セッション。
Azure CosmosDBクライアント操作でサポートされる整合性レベルを表します。
強度の順序による整合性レベルは、Strong、BoundedStaleness、Session、ConsistentPrefix、およびEventualです。
cosmosdbsql_max_retry_attempts
デフォルト:9。
リクエストに対して実行される再試行の最大数。
cosmosdbsql_max_retry_wait_time
デフォルト:30。
再試行が行われている間、要求を待機するための最大待機時間(秒単位)。
CouchDBバックエンド設定
ノート
CouchDBバックエンドには:pypi: `pycouchdb` ライブラリが必要です。
このCouchbaseパッケージをインストールするには、 pip を使用します。
$ pip install celery[couchdb]
複数の拡張要件を組み合わせる方法については、バンドルを参照してください。
このバックエンドは、CouchDB URLに設定された:setting: `result_backend` を介して構成できます。
result_backend = 'couchdb://username:password@host:port/container'
URLは、次の部分で構成されています。
username
CouchDBサーバーに対して認証するためのユーザー名(オプション)。
password
CouchDBサーバーに対して認証するためのパスワード(オプション)。
host
CouchDBサーバーのホスト名。 デフォルトは
localhost
です。port
CouchDBサーバーがリッスンしているポート。 デフォルトは
8091
です。container
CouchDBサーバーが書き込んでいるデフォルトのコンテナー。 デフォルトは
default
です。
ファイルシステムのバックエンド設定
このバックエンドは、ファイルURLを使用して構成できます。次に例を示します。
CELERY_RESULT_BACKEND = 'file:///var/celery/results'
構成されたディレクトリは、バックエンドを使用するすべてのサーバーで共有および書き込み可能である必要があります。
単一のシステムでCeleryを試している場合は、追加の構成なしでバックエンドを使用できます。 大規模なクラスターの場合は、NFS、 GlusterFS 、CIFS、 HDFS (FUSEを使用)、またはその他のファイルシステムを使用できます。
領事K / Vストアのバックエンド設定
ノート
Consulバックエンドには:pypi: `python-consul2` ライブラリが必要です。
このパッケージをインストールするには、 pip を使用します。
$ pip install python-consul2
Consulバックエンドは、次のようなURLを使用して構成できます。
CELERY_RESULT_BACKEND = 'consul://localhost:8500/'
また:
result_backend = 'consul://localhost:8500/'
バックエンドは、結果をConsulのK / Vストアに個別のキーとして保存します。 バックエンドは、ConsulでTTLを使用した結果の自動期限切れをサポートします。 URLの完全な構文は次のとおりです。
consul://host:port[?one_client=1]
URLは、次の部分で構成されています。
host
領事サーバーのホスト名。
port
領事サーバーがリッスンしているポート。
one_client
デフォルトでは、正確さのために、バックエンドは操作ごとに個別のクライアント接続を使用します。 極端な負荷の場合、新しい接続の作成率により、負荷がかかっているときにConsulサーバーからHTTP429の「接続が多すぎます」というエラー応答が発生する可能性があります。 これを処理するための推奨される方法は、 https://github.com/poppyred/python-consul2/pull/31のパッチを使用して
python-consul2
で再試行を有効にすることです。または、
one_client
が設定されている場合は、代わりに単一のクライアント接続がすべての操作に使用されます。 これにより、HTTP 429エラーは解消されますが、結果のバックエンドへの保存は信頼できなくなる可能性があります。
メッセージルーティング
task_queues
デフォルト:None
(デフォルトのキュー設定から取得されたキュー)。
ほとんどのユーザーはこの設定を指定したくないので、自動ルーティング機能を使用する必要があります。
高度なルーティングを本当に構成する場合、この設定は、ワーカーが使用するkombu.Queue
オブジェクトのリストである必要があります。
-Q
オプションを使用してワーカーをこの設定でオーバーライドしたり、-X
オプションを使用してこのリストから(名前で)個々のキューを除外したりできることに注意してください。
詳細については、基本も参照してください。
デフォルトは、celery
のキュー/交換/バインディングキーで、交換タイプはdirect
です。
:setting: `task_routes` も参照してください。
task_routes
デフォルト:None
。
ルーターのリスト、またはタスクをキューにルーティングするために使用される単一のルーター。 タスクの最終的な宛先を決定するときは、ルーターが順番に参照されます。
ルーターは次のいずれかとして指定できます。
- 署名付きの関数
(name, args, kwargs, options, task=None, **kwargs)
- ルーター機能へのパスを提供する文字列。
- *; ルーターの仕様を含むdict:
celery.routes.MapRoute
インスタンスに変換されます。
- *;
(pattern, route)
タプルのリスト:celery.routes.MapRoute
インスタンスに変換されます。
例:
task_routes = {
'celery.ping': 'default',
'mytasks.add': 'cpu-bound',
'feed.tasks.*': 'feeds', # <-- glob pattern
re.compile(r'(image|video)\.tasks\..*'): 'media', # <-- regex
'video.encode': {
'queue': 'video',
'exchange': 'media',
'routing_key': 'media.video.encode',
},
}
task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})
myapp.tasks.route_task
は次のようになります:
def route_task(self, name, args, kwargs, options, task=None, **kw):
if task == 'celery.ping':
return {'queue': 'default'}
route_task
は、文字列またはdictを返す場合があります。 文字列は、:setting: `task_queues` のキュー名であることを意味し、dictはカスタムルートであることを意味します。
タスクを送信するときは、ルーターが順番に参照されます。 None
を返さない最初のルーターが使用するルートです。 次に、メッセージオプションは、タスクの設定が優先される、見つかったルート設定とマージされます。
apply_async()
に次の引数がある場合の例:
Task.apply_async(immediate=False, exchange='video',
routing_key='video.compress')
そしてルーターは以下を返します:
{'immediate': True, 'exchange': 'urgent'}
最終的なメッセージオプションは次のようになります。
immediate=False, exchange='video', routing_key='video.compress'
(およびTask
クラスで定義されているデフォルトのメッセージオプション)
:setting: `task_routes` で定義された値は、2つをマージするときに:setting:` task_queues` で定義された値よりも優先されます。
次の設定で:
task_queues = {
'cpubound': {
'exchange': 'cpubound',
'routing_key': 'cpubound',
},
}
task_routes = {
'tasks.add': {
'queue': 'cpubound',
'routing_key': 'tasks.add',
'serializer': 'json',
},
}
tasks.add
の最終的なルーティングオプションは次のようになります。
{'exchange': 'cpubound',
'routing_key': 'tasks.add',
'serializer': 'json'}
その他の例については、ルーターを参照してください。
task_inherit_parent_priority
- ブローカー
- RabbitMQ
デフォルト:False
。
有効にすると、子タスクは親タスクの優先度を継承します。
# The last task in chain will also have priority set to 5.
chain = celery.chain(add.s(2) | add.s(2).set(priority=5) | add.s(3))
優先度継承は、 delay または apply_async を使用して親タスクから子タスクを呼び出すときにも機能します。
RabbitMQメッセージの優先順位を参照してください。
worker_direct
デフォルト:無効。
このオプションを有効にすると、すべてのワーカーに専用のキューがあり、タスクを特定のワーカーにルーティングできます。
各ワーカーのキュー名は、C.dq
交換を使用して、ワーカーのホスト名と.dq
サフィックスに基づいて自動的に生成されます。
たとえば、ノード名が[email protected]
のワーカーのキュー名は次のようになります。
次に、ルーティングキーとしてホスト名を指定し、C.dq
交換を行うことで、タスクをタスクにルーティングできます。
task_routes = {
'tasks.add': {'exchange': 'C.dq', 'routing_key': '[email protected]'}
}
task_create_missing_queues
デフォルト:有効。
有効(デフォルト)の場合、:setting: `task_queues` で定義されていない指定されたキューは自動的に作成されます。 自動ルーティングを参照してください。
task_default_queue
デフォルト:"celery"
。
メッセージにルートがないか、カスタムキューが指定されていない場合に、 .apply_async によって使用されるデフォルトキューの名前。
このキューは、:setting: `task_queues` にリストされている必要があります。 :setting: `task_queues` が指定されていない場合、1つのキューエントリを含むように自動的に作成され、この名前がそのキューの名前として使用されます。
task_default_exchange
デフォルト::setting: `task_default_queue` に設定された値を使用します。
:setting: `task_queues` 設定でキーにカスタム交換が指定されていない場合に使用するデフォルトの交換の名前。
task_default_exchange_type
デフォルト:"direct"
。
:setting: `task_queues` 設定でキーにカスタム交換タイプが指定されていない場合に使用されるデフォルトの交換タイプ。
task_default_routing_key
デフォルト::setting: `task_default_queue` に設定された値を使用します。
:setting: `task_queues` 設定のキーにカスタムルーティングキーが指定されていない場合に使用されるデフォルトのルーティングキー。
task_default_delivery_mode
デフォルト:"persistent"
。
過渡(メッセージがディスクに書き込まれない)または永続的(ディスクに書き込まれる)にすることができます。
ブローカー設定
broker_url
デフォルト:"amqp://"
デフォルトのブローカーURL。 これは、次の形式のURLである必要があります。
transport://userid:password@hostname:port/virtual_host
スキーム部分(transport://
)のみが必要であり、残りはオプションであり、デフォルトで特定のトランスポートのデフォルト値になります。
トランスポート部分は使用するブローカー実装であり、デフォルトはamqp
です(インストールされている場合はlibrabbitmq
を使用するか、pyamqp
にフォールバックします)。 以下を含む他の選択肢もあります。 redis://
、sqs://
、およびqpid://
。
このスキームは、独自のトランスポート実装への完全に修飾されたパスにすることもできます。
broker_url = 'proj.transports.MyTransport://localhost'
同じトランスポートの複数のブローカーURLを指定することもできます。 ブローカーのURLは、セミコロンで区切られた単一の文字列として渡すことができます。
broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'
またはリストとして:
broker_url = [
'transport://userid:password@localhost:port//',
'transport://userid:password@hostname:port//'
]
その後、ブローカーは:setting: `broker_failover_strategy` で使用されます。
詳細については、昆布のドキュメントの kombu:connection-urls を参照してください。
broker_read_url / broker_write_url
デフォルト::setting: `broker_url` から取得。
:setting: `broker_url` の代わりにこれらの設定を構成して、消費と生成に使用されるブローカー接続にさまざまな接続パラメーターを指定できます。
例:
broker_read_url = 'amqp://user:[email protected]:56721'
broker_write_url = 'amqp://user:[email protected]:56722'
両方のオプションは、フェイルオーバーの代替のリストとして指定することもできます。詳細については、:setting: `broker_url` を参照してください。
broker_failover_strategy
デフォルト:"round-robin"
。
ブローカー接続オブジェクトのデフォルトのフェイルオーバー戦略。 提供されている場合は、「kombu.connection.failover_strategies」のキーにマップするか、提供されたリストから単一のアイテムを生成する任意のメソッドへの参照にすることができます。
例:
# Random failover strategy
def random_failover_strategy(servers):
it = list(servers) # don't modify callers list
shuffle = random.shuffle
for _ in repeat(None):
shuffle(it)
yield it[0]
broker_failover_strategy = random_failover_strategy
broker_heartbeat
- サポートされているトランスポート
pyamqp
デフォルト:120.0
(サーバーによってネゴシエートされます)。
注:この値はワーカーによってのみ使用され、クライアントは現時点ではハートビートを使用しません。
TCP / IPのみを使用して接続損失をタイムリーに検出できるとは限らないため、AMQPは、接続が閉じられたかどうかを検出するためにクライアントとブローカーの両方で使用されるハートビートと呼ばれるものを定義します。
ハートビート値が10秒の場合、ハートビートは:setting: `broker_heartbeat_checkrate` 設定で指定された間隔で監視されます(デフォルトでは、これはハートビート値の2倍のレートに設定されているため、 10秒、ハートビートは5秒ごとにチェックされます)。
broker_heartbeat_checkrate
- サポートされているトランスポート
pyamqp
デフォルト:2.0。
ワーカーは定期的に、ブローカーがあまりにも多くのハートビートを見逃していないことを監視します。 これがチェックされるレートは、:setting: `broker_heartbeat` の値をこの値で割ることによって計算されるため、ハートビートが10.0で、レートがデフォルトの2.0の場合、チェックは5ごとに実行されます。秒(ハートビート送信レートの2倍)。
broker_use_ssl
- サポートされているトランスポート
pyamqp
、redis
デフォルト:無効。
ブローカー接続とSSL設定でSSLの使用を切り替えます。
このオプションの有効な値は、トランスポートによって異なります。
pyamqp
True
の場合、接続はデフォルトのSSL設定でSSLを使用します。 dictに設定されている場合、指定されたポリシーに従ってSSL接続を構成します。 使用される形式は、Pythonのssl.wrap_socket()
オプションです。
SSLソケットは通常、ブローカーによって別のポートで提供されることに注意してください。
クライアント証明書を提供し、カスタム認証局に対してサーバー証明書を検証する例:
import ssl
broker_use_ssl = {
'keyfile': '/var/ssl/private/worker-key.pem',
'certfile': '/var/ssl/amqp-server-cert.pem',
'ca_certs': '/var/ssl/myca.pem',
'cert_reqs': ssl.CERT_REQUIRED
}
警告
broker_use_ssl=True
の使用には注意してください。 デフォルトの構成では、サーバー証明書がまったく検証されない可能性があります。 Python sslモジュールのセキュリティに関する考慮事項をお読みください。
redis
設定は、次のキーを使用したdictである必要があります。
- *;
ssl_cert_reqs
(必須):SSLContext.verify_mode
値の1つ:- *;*
ssl.CERT_NONE
ssl.CERT_OPTIONAL
ssl.CERT_REQUIRED
- *;*
ssl_ca_certs
(オプション):CA証明書へのパスssl_certfile
(オプション):クライアント証明書へのパスssl_keyfile
(オプション):クライアントキーへのパス
broker_pool_limit
バージョン2.3の新機能。
デフォルト:10。
接続プールで開くことができる接続の最大数。
プールはバージョン2.5以降、デフォルトで有効になっており、デフォルトの制限は10接続です。 この数は、接続を使用するスレッド/グリーンスレッド(イベントレット/イベント)の数に応じて調整できます。 たとえば、ブローカーへの接続を使用する1000個のグリーンレットでイベントレットを実行すると、競合が発生する可能性があるため、制限を増やすことを検討する必要があります。
None
または0に設定すると、接続プールが無効になり、使用するたびに接続が確立されて閉じられます。
broker_connection_timeout
デフォルト:4.0。
AMQPサーバーへの接続の確立を断念するまでのデフォルトのタイムアウト(秒単位)。 geventを使用する場合、この設定は無効になります。
ノート
ブローカー接続タイムアウトは、ブローカーに接続しようとしているワーカーにのみ適用されます。 タスクを送信するプロデューサーには適用されません。その状況にタイムアウトを提供する方法については、:setting: `broker_transport_options` を参照してください。
broker_connection_retry
デフォルト:有効。
失われた場合、AMQPブローカーへの接続を自動的に再確立しようとします。
再試行間の時間は再試行ごとに増加し、:setting: `broker_connection_max_retries` を超える前に使い果たされません。
broker_connection_max_retries
デフォルト:100。
AMQPブローカーへの接続の再確立を断念するまでの最大再試行回数。
これが0
またはNone
に設定されている場合、永久に再試行します。
broker_login_method
デフォルト:"AMQPLAIN"
。
カスタムamqpログイン方法を設定します。
broker_transport_options
バージョン2.2の新機能。
デフォルト:{}
(空のマッピング)。
基礎となるトランスポートに渡される追加オプションの指示。
サポートされているオプション(ある場合)については、トランスポートのユーザーマニュアルを参照してください。
可視性タイムアウトの設定例(RedisおよびSQSトランスポートでサポート):
broker_transport_options = {'visibility_timeout': 18000} # 5 hours
プロデューサー接続の最大再試行回数を設定する例(最初のタスク実行時にブローカーが使用できない場合、プロデューサーが永久に再試行しないようにするため):
broker_transport_options = {'max_retries': 5}
ワーカー
imports
デフォルト:[]
(空のリスト)。
ワーカーの起動時にインポートする一連のモジュール。
これは、インポートするタスクモジュールを指定するためだけでなく、シグナルハンドラーや追加のリモートコントロールコマンドなどをインポートするためにも使用されます。
モジュールは元の順序でインポートされます。
include
デフォルト:[]
(空のリスト)。
:setting: `imports` とまったく同じセマンティクスですが、異なるインポートカテゴリを持つ手段として使用できます。
この設定のモジュールは、:setting: `imports` のモジュールの後にインポートされます。
worker_deduplicate_successful_tasks
バージョン5.1の新機能。
デフォルト:False
各タスクを実行する前に、このタスクが重複メッセージであるかどうかを確認するようにワーカーに指示してください。
重複排除は、同じ識別子を持ち、遅延確認が有効になっていて、メッセージブローカーによって再配信され、結果バックエンドでの状態がSUCCESS
であるタスクでのみ発生します。
結果バックエンドがクエリでオーバーフローするのを防ぐために、タスクを受け取った同じワーカーによってタスクがすでに正常に実行されている場合は、結果バックエンドにクエリを実行する前に、正常に実行されたタスクのローカルキャッシュがチェックされます。
このキャッシュは、:setting: `worker_state_db` 設定を設定することで永続化できます。
結果のバックエンドが永続的でない場合(RPCバックエンドなど)、この設定は無視されます。
worker_concurrency
デフォルト:CPUコアの数。
タスクを実行している同時ワーカープロセス/スレッド/グリーンスレッドの数。
主にI / Oを実行している場合は、より多くのプロセスを使用できますが、主にCPUにバインドされている場合は、マシン上のCPUの数に近づけるようにしてください。 設定されていない場合、ホスト上のCPU /コアの数が使用されます。
worker_prefetch_multiplier
デフォルト:4。
一度にプリフェッチするメッセージの数に並行プロセスの数を掛けたもの。 デフォルトは4です(プロセスごとに4つのメッセージ)。 通常はデフォルト設定が適切です。ただし、キューで待機している非常に長時間実行されるタスクがあり、ワーカーを開始する必要がある場合、最初に開始するワーカーは最初に4倍の数のメッセージを受信することに注意してください。 したがって、タスクがワーカーに公平に分散されない場合があります。
プリフェッチを無効にするには、:setting: `worker_prefetch_multiplier` を1に設定します。 この設定を0に変更すると、ワーカーは必要な数のメッセージを消費し続けることができます。
プリフェッチの詳細については、プリフェッチ制限を参照してください。
ノート
ETA /カウントダウンのあるタスクは、プリフェッチ制限の影響を受けません。
worker_lost_wait
デフォルト:10.0秒。
場合によっては、適切なクリーンアップなしでワーカーが殺され、ワーカーが終了する前に結果を公開している可能性があります。 この値は、@WorkerLostError
例外を発生させる前に欠落した結果を待機する時間を指定します。
worker_max_tasks_per_child
プールワーカープロセスが新しいタスクに置き換えられる前に実行できるタスクの最大数。 デフォルトは制限なしです。
worker_max_memory_per_child
デフォルト:制限なし。 タイプ:int(キロバイト)
新しいワーカーに置き換えられる前にワーカーが消費する可能性のある常駐メモリーの最大量(キロバイト単位)。 単一のタスクによってワーカーがこの制限を超えた場合、タスクは完了し、その後ワーカーが置き換えられます。
例:
worker_max_memory_per_child = 12000 # 12MB
worker_disable_rate_limits
デフォルト:無効(レート制限が有効)。
タスクに明示的なレート制限が設定されている場合でも、すべてのレート制限を無効にします。
worker_state_db
デフォルト:None
。
永続的なワーカー状態(取り消されたタスクなど)を格納するために使用されるファイルの名前。 相対パスまたは絶対パスにすることができますが、ファイル名に接尾辞 .db が追加される場合があることに注意してください(Pythonのバージョンによって異なります)。
celery worker --statedb
引数を使用して設定することもできます。
worker_timer_precision
デフォルト:1.0秒。
ETAスケジューラがスケジュールを再確認する間にスリープできる最大時間を秒単位で設定します。
この値を1秒に設定すると、スケジューラーの精度は1秒になります。 ミリ秒近くの精度が必要な場合は、これを0.1に設定できます。
worker_enable_remote_control
デフォルト:デフォルトで有効になっています。
ワーカーのリモートコントロールを有効にするかどうかを指定します。
worker_proc_alive_timeout
デフォルト:4.0。
新しいワーカープロセスの起動を待機するときの秒単位のタイムアウト(int / float)。
worker_cancel_long_running_tasks_on_connection_loss
バージョン5.1の新機能。
デフォルト:デフォルトでは無効になっています。
接続が失われたときに遅延確認応答を有効にして、実行時間の長いタスクをすべて強制終了します。
接続が失われる前に確認応答されていなかったタスクは、チャネルがなくなり、タスクがキューに再配信されるため、確認応答できなくなります。 これが、遅延確認が有効になっているタスクが複数回実行される可能性があるため、べき等でなければならない理由です。 この場合、タスクは接続損失ごとに2回実行されます(他のワーカーでは並行して実行されることもあります)。
このオプションをオンにすると、完了していないタスクはキャンセルされ、実行が終了します。 :setting: `task_ignore_result` が有効になっていない限り、接続が失われる前に何らかの方法で完了したタスクは、結果バックエンドにそのまま記録されます。
警告
この機能は、将来の重大な変更として導入されました。 オフにすると、Celeryは警告メッセージを発します。
Celery 6.0では、:setting: `worker_cancel_long_running_tasks_on_connection_loss` はデフォルトでTrue
に設定されます。これは、現在の動作が解決するよりも多くの問題を引き起こすためです。
イベント
worker_send_task_events
デフォルト:デフォルトでは無効になっています。
flower などのツールを使用してタスクを監視できるように、タスク関連のイベントを送信します。 ワーカー-E
引数のデフォルト値を設定します。
task_send_sent_event
バージョン2.2の新機能。
デフォルト:デフォルトでは無効になっています。
有効にすると、:event: `task-sent` イベントがすべてのタスクに送信されるため、タスクがワーカーによって消費される前に追跡できます。
event_queue_ttl
- サポートされているトランスポート
amqp
デフォルト:5.0秒。
モニタークライアントのイベントキューに送信されたメッセージが削除されたときのメッセージの有効期限(秒単位)(int / float)(x-message-ttl
)
たとえば、この値が10に設定されている場合、このキューに配信されたメッセージは10秒後に削除されます。
event_queue_expires
- サポートされているトランスポート
amqp
デフォルト:60.0秒。
モニタークライアントのイベントキューが削除された後の有効期限(秒(int / float))(x-expires
)。
event_queue_prefix
デフォルト:"celeryev"
。
イベントレシーバーキュー名に使用するプレフィックス。
event_exchange
デフォルト:"celeryev"
。
イベント交換の名前。
警告
このオプションは実験段階ですので、ご注意ください。
リモートコントロールコマンド
control_queue_ttl
デフォルト:300.0
リモートコントロールコマンドキュー内のメッセージが期限切れになるまでの秒単位の時間。
デフォルトの300秒を使用している場合、これは、リモート制御コマンドが送信され、300秒以内にワーカーがそれを取得しなかった場合、コマンドが破棄されることを意味します。
この設定は、リモートコントロールの応答キューにも適用されます。
control_queue_expires
デフォルト:10.0
未使用のリモート制御コマンドキューがブローカーから削除されるまでの秒単位の時間。
この設定は、リモートコントロールの応答キューにも適用されます。
control_exchange
デフォルト:"celery"
。
制御コマンド交換の名前。
警告
このオプションは実験段階ですので、ご注意ください。
ロギング
worker_hijack_root_logger
バージョン2.2の新機能。
デフォルト:デフォルトで有効になっています(ルートロガーをハイジャックします)。
デフォルトでは、ルートロガーで以前に構成されたハンドラーはすべて削除されます。 独自のロギングハンドラーをカスタマイズする場合は、 worker_hijack_root_logger = False を設定することで、この動作を無効にできます。
worker_log_color
デフォルト:アプリがターミナルにログを記録している場合に有効になります。
Celeryアプリによるログ出力の色を有効/無効にします。
worker_log_format
ディフォルト:
"[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
ログメッセージに使用する形式。
ログ形式の詳細については、Python logging
モジュールを参照してください。
worker_task_log_format
ディフォルト:
"[%(asctime)s: %(levelname)s/%(processName)s]
%(task_name)s[%(task_id)s]: %(message)s"
ログインしたタスクのログメッセージに使用する形式。
ログ形式の詳細については、Python logging
モジュールを参照してください。
worker_redirect_stdouts
デフォルト:デフォルトで有効になっています。
有効にすると、 stdout および stderr が現在のロガーにリダイレクトされます。
セロリワーカーとセロリビートが使用します。
worker_redirect_stdouts_level
デフォルト:WARNING
。
stdout および stderr へのログレベル出力はとしてログに記録されます。 DEBUG
、INFO
、WARNING
、ERROR
、またはCRITICAL
のいずれかになります。
安全
security_certificate
デフォルト:None
。
バージョン2.5の新機能。
メッセージ署名が使用されている場合にメッセージの署名に使用されるX.509証明書ファイルへの相対パスまたは絶対パス。
security_cert_store
デフォルト:None
。
バージョン2.5の新機能。
メッセージ署名に使用されるX.509証明書を含むディレクトリ。 ワイルドカードを使用したグロブにすることができます(たとえば、/etc/certs/*.pem
)。
security_digest
デフォルト:sha256
。
バージョン4.3の新機能。
メッセージ署名が使用されている場合にメッセージに署名するために使用される暗号ダイジェスト。 https://cryptography.io/en/latest/hazmat/primitives/cryptographic-hashes/#module-cryptography.hazmat.primitives.hashes
カスタムコンポーネントクラス(高度)
worker_pool
デフォルト:"prefork"
(celery.concurrency.prefork:TaskPool
)。
ワーカーが使用するプールクラスの名前。
イベントレット/イベント
このオプションを使用して、イベントレットまたはgeventプールを選択しないでください。 代わりに、セロリワーカーに対して-P
オプションを使用して、モンキーパッチの適用が遅すぎて、奇妙な方法で問題が発生しないようにする必要があります。
worker_pool_restarts
デフォルト:デフォルトでは無効になっています。
有効にすると、:control: `pool_restart` リモートコントロールコマンドを使用してワーカープールを再起動できます。
worker_autoscaler
バージョン2.2の新機能。
デフォルト:"celery.worker.autoscale:Autoscaler"
。
使用するオートスケーラークラスの名前。
worker_consumer
デフォルト:"celery.worker.consumer:Consumer"
。
ワーカーが使用するコンシューマークラスの名前。
worker_timer
デフォルト:"kombu.asynchronous.hub.timer:Timer"
。
ワーカーが使用するETAスケジューラークラスの名前。 デフォルトは、プールの実装によって設定されます。
ビート設定(セロリビート)
beat_scheduler
デフォルト:"celery.beat:PersistentScheduler"
。
デフォルトのスケジューラクラス。 たとえば、:pypi: `django-celery-beat` 拡張子と一緒に使用する場合は、"django_celery_beat.schedulers:DatabaseScheduler"
に設定できます。
celery beat -S
引数を使用して設定することもできます。
beat_schedule_filename
デフォルト:"celerybeat-schedule"
。
PersistentScheduler が定期的なタスクの最終実行時間を保存するために使用するファイルの名前。 相対パスまたは絶対パスにすることができますが、ファイル名に接尾辞 .db が追加される場合があることに注意してください(Pythonのバージョンによって異なります)。
celery beat --schedule
引数を使用して設定することもできます。
beat_sync_every
デフォルト:0。
別のデータベース同期が発行される前に呼び出すことができる定期的なタスクの数。 値0(デフォルト)は、タイミングに基づく同期を意味します-scheduler.sync_everyによって決定されるデフォルトの3分。 1に設定すると、beatはすべてのタスクメッセージが送信された後にsyncを呼び出します。
beat_max_loop_interval
デフォルト:0。
スケジュールを確認するまでの最大スリープ秒数beat
。
この値のデフォルトはスケジューラー固有です。 デフォルトのCeleryビートスケジューラの場合、値は300(5分)ですが、:pypi: `django-celery-beat` データベーススケジューラの場合、スケジュールが外部で変更される可能性があるため、5秒です。スケジュールの変更を考慮に入れる必要があります。
また、JythonでCelery beat Embedded(-B
)をスレッドとして実行すると、最大間隔がオーバーライドされて1に設定されるため、タイムリーにシャットダウンできます。