よくある質問—Pythonドキュメント
よくある質問
- 全般的
- 誤解
- トラブルシューティング
- MySQLがデッドロックエラーをスローしています。どうすればよいですか?
- 労働者は何もしていません、ただぶら下がっています
- タスクの結果が確実に返されません
- Task.delay / apply * /ワーカーがぶら下がっているのはなぜですか?
- FreeBSDで動作しますか?
- IntegrityError:Duplicate Key エラーが発生しています。 どうして?
- タスクが処理されないのはなぜですか?
- タスクが実行されないのはなぜですか?
- 定期的なタスクが実行されないのはなぜですか?
- 待機中のすべてのタスクを削除するにはどうすればよいですか?
- メッセージを削除しましたが、まだメッセージがキューに残っていますか?
- 結果
- 安全
- ブローカー
- タスク
- タスクを呼び出すときに同じ接続を再利用するにはどうすればよいですか?
subprocess
の sudo はNone
を返します- ワーカーがタスクを処理できない場合、なぜキューからタスクを削除するのですか?
- タスクを名前で呼び出すことはできますか?
- 現在のタスクのタスクIDを取得できますか?
- カスタムtask_idを指定できますか?
- タスクでデコレータを使用できますか?
- 自然なタスクIDを使用できますか?
- 別のタスクが終了したら、タスクを実行できますか?
- タスクの実行をキャンセルできますか?
- リモートコントロールコマンドがすべてのワーカーに受信されないのはなぜですか?
- 一部のサーバーにのみ一部のタスクを送信できますか?
- タスクのプリフェッチを無効にできますか?
- 実行時に定期的なタスクの間隔を変更できますか?
- Celeryはタスクの優先順位をサポートしていますか?
- 再試行またはacks_lateを使用する必要がありますか?
- 特定の時間に実行するようにタスクをスケジュールできますか?
- ワーカーを安全にシャットダウンできますか?
- [プラットフォーム]のバックグラウンドでワーカーを実行できますか?
- Django
- ウィンドウズ
全般的
セロリはどのような用途に使えますか?
答え: すべてをキューに入れて、みんなを喜ばせましょう is a good article describing why you’d use a queue in a web context.
これらはいくつかの一般的な使用例です:
- バックグラウンドで何かを実行します。 たとえば、Webリクエストをできるだけ早く終了するには、ユーザーページを段階的に更新します。 これにより、実際の作業には時間がかかる場合でも、ユーザーは優れたパフォーマンスと「スナップ感」の印象を得ることができます。
- Webリクエストが終了した後に何かを実行します。
- 非同期で実行し、再試行を使用して、何かが行われたことを確認します。
- 定期的な作業のスケジュール。
そしてある程度:
- 分散コンピューティング。
- 並列実行。
誤解
Celeryは本当に50.000行のコードで構成されていますか?
回答:いいえ、これと同様に多数がさまざまな場所で報告されています。
この記事の執筆時点での数字は次のとおりです。
- コア:7,141行のコード。
- テスト:14,209行。
- バックエンド、contrib、compatユーティリティ:9,032行。
コード行は有用なメトリックではないため、Celeryが5万行のコードで構成されていたとしても、そのような数から結論を引き出すことはできません。
セロリには多くの依存関係がありますか?
一般的な批判は、Celeryがあまりにも多くの依存関係を使用しているということです。 このような恐れの背後にある理論的根拠は想像しがたいものです。特に、コードの再利用を最新のソフトウェア開発の複雑さに対抗するための確立された方法と考えると、pipやPyPIなどのパッケージマネージャーがインストールの手間を省くようになったため、依存関係を追加するコストは非常に低くなっています。依存関係を維持することは過去のものです。
Celeryは途中でいくつかの依存関係を置き換えました。現在の依存関係のリストは、次のとおりです。
セロリ
昆布はセロリエコシステムの一部であり、メッセージの送受信に使用されるライブラリです。 また、さまざまなメッセージブローカーをサポートできるようにするライブラリでもあります。 また、OpenStackプロジェクトや他の多くのプロジェクトでも使用されており、Celeryコードベースから分離するという選択を検証しています。
ビリヤードは、多くのパフォーマンスと安定性の改善を含むPythonマルチプロセッシングモジュールのフォークです。 これらの改善がいつかPythonに統合されることが最終的な目標です。
また、マルチプロセッシングモジュールに付属していない古いバージョンのPythonとの互換性のためにも使用されます。
pytzモジュールは、タイムゾーン定義と関連ツールを提供します。
昆布
昆布は以下のパッケージに依存しています。
基盤となる純粋なPythonamqpクライアントの実装。 AMQPはデフォルトのブローカーであるため、これは自然な依存関係です。
セロリは重いですか?
セロリは、メモリフットプリントとパフォーマンスの両方でオーバーヘッドをほとんど引き起こしません。
ただし、デフォルトの構成は時間やスペースに対して最適化されていないことに注意してください。詳細については、最適化ガイドを参照してください。
セロリはピクルスに依存していますか?
回答:いいえ、Celeryは任意のシリアル化スキームをサポートできます。
JSON、YAML、Pickle、およびmsgpackのサポートが組み込まれています。 すべてのタスクはコンテンツタイプに関連付けられているため、pickleを使用して1つのタスクを送信し、JSONを使用して別のタスクを送信することもできます。
デフォルトのシリアル化サポートは以前はpickleでしたが、4.0以降、デフォルトはJSONになりました。 複雑なPythonオブジェクトをタスク引数として送信する必要がある場合は、シリアル化形式としてpickleを使用できますが、 Serializers の注を参照してください。
他の言語と通信する必要がある場合は、そのタスクに適したシリアル化形式を使用する必要があります。これは、ほとんどの場合、pickleではないシリアライザーを意味します。
グローバルなデフォルトのシリアライザー、特定のタスクのデフォルトのシリアライザー、または単一のタスクインスタンスを送信するときに使用するシリアライザーを設定できます。
セロリはDjango専用ですか?
回答:いいえ、Celeryは任意のフレームワーク、Web、またはその他の方法で使用できます。
AMQP / RabbitMQを使用する必要がありますか?
Answer :いいえ。ただし、RabbitMQを使用することをお勧めしますが、Redis、SQS、またはQpidを使用することもできます。
詳細については、バックエンドとブローカーを参照してください。
ブローカーとしてのRedisはAMQPブローカーほどのパフォーマンスはありませんが、ブローカーとしてのRabbitMQと結果ストアとしてのRedisの組み合わせが一般的に使用されます。 厳密な信頼性要件がある場合は、RabbitMQまたは別のAMQPブローカーを使用することをお勧めします。 一部のトランスポートもポーリングを使用するため、より多くのリソースを消費する可能性があります。 ただし、何らかの理由でAMQPを使用できない場合は、これらの代替手段を自由に使用してください。 これらはおそらくほとんどのユースケースで正常に機能します。上記の点はCeleryに固有のものではないことに注意してください。 以前はRedis /データベースをキューとして使用しても問題なく機能していた場合は、おそらく今は問題ありません。 必要に応じて、後でいつでもアップグレードできます。
セロリは多言語ですか?
回答:はい。
worker
は、PythonでのCeleryの実装です。 言語にAMQPクライアントがある場合、その言語でワーカーを作成するための作業はそれほど多くないはずです。 セロリワーカーは、メッセージを処理するためにブローカーに接続する単なるプログラムです。
また、言語に依存しない別の方法があります。それは、タスクが関数ではなく、URLであるRESTタスクを使用することです。 この情報を使用して、コードのプリロードを可能にする単純なWebサーバーを作成することもできます。 操作を実行するエンドポイントを公開し、そのエンドポイントに対してHTTPリクエストを実行するだけのタスクを作成するだけです。
Flowerの REST API を使用してタスクを呼び出すこともできます。
トラブルシューティング
MySQLがデッドロックエラーをスローしています。どうすればよいですか?
回答: MySQLのデフォルトの分離レベルは REPEATABLE-READ に設定されています。本当に必要ない場合は、 READ-COMMITTED に設定してください。 my.cnf
に以下を追加することで、これを行うことができます。
[mysqld]
transaction-isolation = READ-COMMITTED
InnoDBのトランザクションモデルの詳細については、MySQLユーザーマニュアルの MySQL-InnoDBトランザクションモデルとロックを参照してください。
(このソリューションを提供してくれたHonzaKralとAntonTsigularovに感謝します)
労働者は何もしていません、ただぶら下がっています
回答: MySQLがデッドロックエラーをスローしています。どうすればよいですか?または Task.delay / apply * /ワーカーがハングしているのはなぜですか?を参照してください。
タスクの結果が確実に返されません
回答:結果にデータベースバックエンドを使用している場合、特にMySQLを使用している場合は、 MySQLがデッドロックエラーをスローしています。どうすればよいですか?を参照してください。
Task.delay / apply * /ワーカーがぶら下がっているのはなぜですか?
回答:一部のAMQPクライアントには、現在のユーザーを認証できない場合、パスワードが一致しない場合、またはユーザーが指定された仮想ホストにアクセスできない場合にハングするバグがあります。 。 ブローカーログ(ほとんどのシステムで/var/log/rabbitmq/rabbit.log
であるRabbitMQの場合)を必ず確認してください。通常、理由を説明するメッセージが含まれています。
FreeBSDで動作しますか?
回答:依存します。
RabbitMQ(AMQP)とRedisトランスポートを使用する場合、箱から出してすぐに機能するはずです。
他のトランスポートでは、互換性プリフォークプールが使用され、POSIXセマフォの実装が機能している必要があります。これは、FreeBSD 8.x以降、デフォルトでFreeBSDで有効になっています。 古いバージョンのFreeBSDの場合、カーネルでPOSIXセマフォを有効にし、ビリヤードを手動で再コンパイルする必要があります。
幸いなことに、Viktor Peterssonは、FreeBSDでCeleryを使い始めるためのチュートリアルをここに書いています: http://www.playingwithwire.com/2009/10/how-to-get-celeryd-to-work-on-freebsd /
IntegrityError:Duplicate Key エラーが発生しています。 どうして?
回答: MySQLがデッドロックエラーをスローしています。どうすればよいですか?を参照してください。 :github_user: `@ howsthedotcom` に感謝します。
タスクが処理されないのはなぜですか?
回答: RabbitMQを使用すると、次のコマンドを実行して、現在タスクを受信しているコンシューマーの数を確認できます。
$ rabbitmqctl list_queues -p <myvhost> name messages consumers
Listing queues ...
celery 2891 2
これは、タスクキューで処理されるのを待っている2891のメッセージがあり、それらを処理している2つのコンシューマーがあることを示しています。
キューが空にならない理由の1つは、メッセージを人質に取っている古いワーカープロセスがあることである可能性があります。 これは、ワーカーが適切にシャットダウンされていない場合に発生する可能性があります。
ワーカーがメッセージを受信すると、ブローカーはメッセージが確認応答されるのを待ってから、メッセージを処理済みとしてマークします。 ブローカーは、コンシューマーが適切にシャットダウンされるまで、そのメッセージを別のコンシューマーに再送信しません。
この問題が発生した場合は、すべてのワーカーを手動で強制終了して再起動する必要があります。
$ pkill 'celery worker'
$ # - If you don't have pkill use:
$ # ps auxww | awk '/celery worker/ {print $2}' | xargs kill
すべてのワーカーがタスクの実行を終了するまで、しばらく待たなければならない場合があります。 久しぶりにぶら下がっている場合は、次の方法で強制的に殺すことができます。
$ pkill -9 'celery worker'
$ # - If you don't have pkill use:
$ # ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
タスクが実行されないのはなぜですか?
回答:タスクモジュールのインポートを妨げる構文エラーがある可能性があります。
タスクを手動で実行することで、Celeryがタスクを実行できるかどうかを確認できます。
>>> from myapp.tasks import MyPeriodicTask
>>> MyPeriodicTask.delay()
ワーカーのログファイルを監視して、タスクを見つけることができるかどうか、またはその他のエラーが発生していないかどうかを確認します。
待機中のすべてのタスクを削除するにはどうすればよいですか?
回答: celery purge
コマンドを使用して、構成されているすべてのタスクキューを削除できます。
$ celery -A proj purge
またはプログラムで:
>>> from proj.celery import app
>>> app.control.purge()
1753
特定のキューからメッセージをパージするだけの場合は、AMQPAPIまたは celery amqp ユーティリティを使用する必要があります。
$ celery -A proj amqp queue.purge <queue name>
番号1753は、削除されたメッセージの数です。
--purge
オプションを有効にしてワーカーを起動し、ワーカーの起動時にメッセージを削除することもできます。
メッセージを削除しましたが、まだメッセージがキューに残っていますか?
回答:タスクは、実際に実行されるとすぐに確認応答(キューから削除)されます。 ワーカーがタスクを受け取った後、実際に実行されるまで、特に実行を待機しているタスクが多数ある場合は、しばらく時間がかかります。 確認応答されないメッセージは、ブローカー(AMQPサーバー)への接続を閉じるまでワーカーによって保持されます。 その接続が閉じられると(たとえば、ワーカーが停止されたため)、タスクはブローカーによって次に使用可能なワーカー(または再起動されたときは同じワーカー)に再送信されるため、待機中のタスクのキューを適切にパージします。すべてのワーカーを停止してから、celery.control.purge()
を使用してタスクをパージする必要があります。
結果
そこを指すIDを持っている場合、タスクの結果を取得するにはどうすればよいですか?
Answer : task.AsyncResult を使用します:
>>> result = my_task.AsyncResult(task_id)
>>> result.get()
これにより、タスクの現在の結果バックエンドを使用してAsyncResult
インスタンスが提供されます。
カスタム結果バックエンドを指定する必要がある場合、または現在のアプリケーションのデフォルトのバックエンドを使用する場合は、@AsyncResult
を使用できます。
>>> result = app.AsyncResult(task_id)
>>> result.get()
安全
pickle を使用することはセキュリティ上の懸念事項ではありませんか?
Answer :確かに、Celery 4.0以降、デフォルトのシリアライザーはJSONになり、人々がこの懸念を意識して認識していることを確認しています。
ピクルス化されたデータを送信するブローカー、データベース、およびその他のサービスへの不正アクセスから保護することが不可欠です。
これはCeleryで知っておくべきことだけではないことに注意してください。たとえば、Djangoはキャッシュクライアントにpickleを使用しています。
タスクメッセージの場合、:setting: `task_serializer` 設定をpickleではなく「json」または「yaml」に設定できます。
同様に、タスクの結果については、:setting: `result_serializer` を設定できます。
使用されるフォーマットとタスクに使用するフォーマットをチェックする際のルックアップ順序の詳細については、 Serializers を参照してください。
メッセージを暗号化できますか?
Answer :一部のAMQPブローカーはSSLの使用をサポートしています(RabbitMQを含む)。 これは、:setting: `broker_use_ssl` 設定を使用して有効にできます。
メッセージに暗号化とセキュリティを追加することも可能です。これが必要な場合は、メーリングリストに連絡する必要があります。
セロリワーカーをrootとして実行しても安全ですか?
回答:いいえ!
現在、セキュリティの問題は認識していませんが、セキュリティの問題が存在しないと想定するのは非常に単純なので、Celeryサービス(セロリワーカー、セロリビート、非特権ユーザーとして celeryev など)をお勧めします。
ブローカー
RabbitMQがクラッシュするのはなぜですか?
回答:メモリが不足するとRabbitMQがクラッシュします。 これは、RabbitMQの将来のリリースで修正される予定です。 RabbitMQ FAQを参照してください: https://www.rabbitmq.com/faq.html#node-runs-out-of-memory
ノート
これはもはや当てはまりません。RabbitMQバージョン2.0以降には、メモリ不足エラーに耐性のある新しい永続機能が含まれています。 CeleryにはRabbitMQ2.1以降をお勧めします。
古いバージョンのRabbitMQをまだ実行していてクラッシュが発生する場合は、アップグレードしてください。
Celeryの設定を誤ると、最終的に古いバージョンのRabbitMQでクラッシュが発生する可能性があります。 クラッシュしなくても、多くのリソースを消費する可能性があるため、よくある落とし穴に注意することが重要です。
- イベント。
worker
を-E
オプションとともに実行すると、ワーカー内で発生したイベントのメッセージが送信されます。
イベントを有効にする必要があるのは、アクティブモニターがイベントを消費している場合、またはイベントキューを定期的にパージする場合のみです。
- AMQPバックエンドの結果。
AMQP結果バックエンドで実行すると、すべてのタスク結果がメッセージとして送信されます。 これらの結果を収集しないと、結果が蓄積され、RabbitMQは最終的にメモリを使い果たします。
この結果バックエンドは非推奨になりましたので、使用しないでください。 rpcスタイルの呼び出しにはRPCバックエンドを使用するか、結果への複数のコンシューマーアクセスが必要な場合は永続的なバックエンドを使用します。
デフォルトでは、結果は1日後に期限切れになります。 :setting: `result_expires` 設定を構成して、この値を下げることをお勧めします。
タスクの結果を使用しない場合は、必ず ignore_result オプションを設定してください。
@app.task(ignore_result=True)
def mytask():
pass
class MyTask(Task):
ignore_result = True
ActiveMQ / STOMPでCeleryを使用できますか?
回答:いいえ。 以前は:pypi: `Carrot` (古いメッセージングライブラリ)でサポートされていましたが、現在:pypi:`昆布 `(新しいメッセージングライブラリ)ではサポートされていません。
AMQPブローカーを使用しない場合にサポートされない機能は何ですか?
これは、仮想トランスポートを使用するときに使用できない機能の不完全なリストです。
- リモートコントロールコマンド(Redisでのみサポート)。
- イベントによる監視は、すべての仮想トランスポートで機能するとは限りません。
- *; ヘッダーとファンアウトの交換タイプ
- (ファンアウトはRedisでサポートされています)。
タスク
タスクを呼び出すときに同じ接続を再利用するにはどうすればよいですか?
Answer ::setting: `broker_pool_limit` 設定を参照してください。 バージョン2.5以降、接続プールはデフォルトで有効になっています。
subprocessの sudo はNoneを返します
sudo 構成オプションがあり、ttyのないプロセスが sudo を実行することを違法にします。
Defaults requiretty
/etc/sudoers
ファイルにこの構成がある場合、ワーカーがデーモンとして実行されているときに、タスクは sudo を呼び出すことができません。 これを有効にする場合は、/etc/sudoers
から行を削除する必要があります。
参照: http://timelordz.com/wiki/Apache_Sudo_Commands
ワーカーがタスクを処理できない場合、なぜキューからタスクを削除するのですか?
回答:
ワーカーは、不明なタスク、エンコードエラーのあるメッセージ、および適切なフィールドを含まないメッセージを拒否します(タスクメッセージプロトコルに従って)。
それらを拒否しなかった場合、それらは何度も再配信され、ループが発生する可能性があります。
RabbitMQの最近のバージョンには、交換用の配信不能キューを構成する機能があるため、拒否されたメッセージはそこに移動されます。
タスクを名前で呼び出すことはできますか?
回答:はい、@send_task()
を使用してください。
AMQPクライアントを使用して、任意の言語から名前でタスクを呼び出すこともできます。
>>> app.send_task('tasks.add', args=[2, 2], kwargs={})
<AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
名前で呼び出されるタスクでchain
、chord
、またはgroup
を使用するには、@Celery.signature()
メソッドを使用します。
>>> chain(
... app.signature('tasks.add', args=[2, 2], kwargs={}),
... app.signature('tasks.add', args=[1, 1], kwargs={})
... ).apply_async()
<AsyncResult: e9d52312-c161-46f0-9013-2713e6df812d>
現在のタスクのタスクIDを取得できますか?
Answer :はい、現在のIDなどがタスクリクエストで利用できます。
@app.task(bind=True)
def mytask(self):
cache.set(self.request.id, "Running")
詳細については、タスクリクエストを参照してください。
タスクインスタンスへの参照がない場合は、app.current_task
を使用できます。
>>> app.current_task.request.id
ただし、これは、ワーカーによって実行されるタスク、そのタスクによって直接呼び出されるタスク、または熱心に呼び出されるタスクなど、任意のタスクになることに注意してください。
現在取り組んでいるタスクを具体的に取得するには、app.current_worker_task
を使用します。
>>> app.current_worker_task.request.id
ノート
@current_task
と@current_worker_task
はどちらもNone
にすることができます。
カスタムtask_idを指定できますか?
Answer :はい、Task.apply_async()
に task_id 引数を使用してください。
>>> task.apply_async(args, kwargs, task_id='…')
自然なタスクIDを使用できますか?
Answer :はい。ただし、同じIDを持つ2つのタスクの動作が定義されていないため、一意であることを確認してください。
世界はおそらく爆発しないでしょうが、彼らは間違いなくお互いの結果を上書きすることができます。
別のタスクが終了したら、タスクを実行できますか?
回答:はい、タスク内でタスクを安全に起動できます。
一般的なパターンは、タスクにコールバックを追加することです。
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
return x + y
@app.task(ignore_result=True)
def log_result(result):
logger.info("log_result got: %r", result)
呼び出し:
>>> (add.s(2, 2) | log_result.s()).delay()
詳細については、 Canvas:ワークフローの設計を参照してください。
タスクの実行をキャンセルできますか?
回答:はい、result.revoke()
を使用してください:
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
または、タスクIDしかない場合:
>>> from proj.celery import app
>>> app.control.revoke(task_id)
後者は、引数としてタスクIDのリストを渡すこともサポートします。
リモートコントロールコマンドがすべてのワーカーに受信されないのはなぜですか?
Answer :ブロードキャストリモートコントロールコマンドを受信するために、すべてのワーカーノードはワーカーのノード名に基づいて一意のキュー名を作成します。
同じホスト名のワーカーが複数ある場合、制御コマンドはそれらの間でラウンドロビンで受信されます。
これを回避するには、-n
引数をworker
に使用して、すべてのワーカーのノード名を明示的に設定できます。
$ celery -A proj worker -n worker1@%h
$ celery -A proj worker -n worker2@%h
ここで、%h
は現在のホスト名に展開されます。
一部のサーバーにのみ一部のタスクを送信できますか?
回答:はい、異なるメッセージルーティングトポロジを使用して、タスクを1つ以上のワーカーにルーティングでき、ワーカーインスタンスは複数のキューにバインドできます。
詳細については、ルーティングタスクを参照してください。
タスクのプリフェッチを無効にできますか?
回答:たぶん! AMQPの用語「プリフェッチ」は、タスクのプリフェッチ limit を説明するためにのみ使用されるため、混乱を招きます。 実際のプリフェッチは含まれていません。
プリフェッチ制限を無効にすることは可能ですが、それはワーカーができるだけ多くのタスクをできるだけ早く消費することを意味します。
プリフェッチ制限、および一度に1つのタスクのみを予約するワーカーの構成設定に関する説明は、プリフェッチ制限にあります。
実行時に定期的なタスクの間隔を変更できますか?
Answer :はい、Djangoデータベーススケジューラを使用するか、新しいスケジュールサブクラスを作成してis_due()
をオーバーライドできます。
from celery.schedules import schedule
class my_schedule(schedule):
def is_due(self, last_run_at):
return run_now, next_time_to_check
Celeryはタスクの優先順位をサポートしていますか?
Answer :はい、RabbitMQはバージョン3.5.0以降の優先度をサポートし、Redisトランスポートは優先度のサポートをエミュレートします。
優先度の高いタスクを別のワーカーにルーティングすることで、作業に優先順位を付けることもできます。 現実の世界では、これは通常、メッセージの優先順位よりもうまく機能します。 これをレート制限と組み合わせて、メッセージごとの優先順位と組み合わせて使用すると、応答性の高いシステムを実現できます。
再試行またはacks_lateを使用する必要がありますか?
回答:状況によって異なります。 必ずしもどちらか一方である必要はありません。両方を使用することをお勧めします。
Task.retry は、特にtry
ブロックでキャッチ可能な予想されるエラーに対して、タスクを再試行するために使用されます。 AMQPトランザクションはこれらのエラーには使用されません:タスクが例外を発生させた場合でも、それは確認されます!
acks_late 設定は、ワーカーが(何らかの理由で)実行中にクラッシュした場合にタスクを再度実行する必要がある場合に使用されます。 ワーカーがクラッシュすることは知られていないことに注意することが重要です。クラッシュした場合、通常は回復不能なエラーであり、人間の介入が必要です(ワーカーのバグ、またはタスクコード)。
理想的な世界では、失敗したタスクを安全に再試行できますが、これはめったにありません。 次のタスクを想像してみてください。
@app.task
def process_upload(filename, tmpfile):
# Increment a file count stored in a database
increment_file_counter()
add_file_metadata_to_db(filename, tmpfile)
copy_file_to_destination(filename, tmpfile)
ファイルをコピー先にコピーしている最中にこれがクラッシュした場合、ワールドには不完全な状態が含まれます。 もちろん、これは重要なシナリオではありませんが、おそらくはるかに不吉なものを想像することができます。 したがって、プログラミングを容易にするために、信頼性は低くなります。 これは適切なデフォルトです。これを必要とし、何をしているのかを知っているユーザーは、acks_lateを有効にすることができます(将来的には手動確認を使用することをお勧めします)。
さらに、 Task.retry には、AMQPトランザクションでは使用できない機能があります。再試行間の遅延、最大再試行などです。
したがって、Pythonエラーに対して再試行を使用し、タスクがべき等である場合、そのレベルの信頼性が必要な場合は、それを acks_late と組み合わせます。
ワーカーを安全にシャットダウンできますか?
Answer :はい、:sig: `TERM` 信号を使用してください。
これにより、ワーカーは現在実行中のすべてのジョブを終了し、できるだけ早くシャットダウンするように指示されます。 シャットダウンが完了する限り、実験的なトランスポートを使用してもタスクが失われることはありません。
:sig: `TERM` を試したことがない限り、:sig:` KILL` 信号(kill -9
)でworker
を停止しないでください。 ]数回、シャットダウンする機会が得られるまで数分待ちました。
また、子プロセスではなく、メインワーカープロセスのみを強制終了するようにしてください。 プロセスが現在ワーカーのシャットダウンが依存しているタスクを実行していることがわかっている場合は、killシグナルを特定の子プロセスに送信できますが、これは、タスクにWorkerLostError
状態が設定されることも意味します。二度と実行されません。
:pypi: `setproctitle` モジュールをインストールすると、プロセスのタイプを簡単に識別できます。
$ pip install setproctitle
このライブラリをインストールすると、 ps リストでプロセスのタイプを確認できますが、これを有効にするには、ワーカーを再起動する必要があります。
Django
django-celery-beatによって作成されたデータベーステーブルにはどのような目的がありますか?
データベースに基づくスケジュールを使用する場合、定期的なタスクスケジュールはPeriodicTask
モデルから取得され、他にもいくつかのヘルパーテーブル(IntervalSchedule
、CrontabSchedule
、 [ X174X])。
django-celery-resultsによって作成されたデータベーステーブルにはどのような目的がありますか?
Djangoデータベースの結果バックエンド拡張には、TaskResult
とGroupResult
の2つの追加モデルが必要です。
ウィンドウズ
CeleryはWindowsをサポートしていますか?
回答:いいえ。
Celery 4.x以降、リソースが不足しているため、Windowsはサポートされなくなりました。
ただし、それでも機能する可能性があり、パッチを喜んで受け入れます。