Celery 4.0(latentcall)の新機能—Pythonドキュメント
Celery 4.0の新機能(latentcall)
- 著者
- ソレムに聞く(
ask at celeryproject.org
)
変更履歴
新しいドキュメントではメジャーバージョンの変更について説明しています。バグ修正リリース(0.0.x)の変更を一覧表示する変更履歴もありますが、古いシリーズは履歴セクションにアーカイブされています。 。
Celeryは、膨大な量のメッセージを処理すると同時に、そのようなシステムを維持するために必要なツールを操作に提供する、シンプルで柔軟性があり、信頼性の高い分散システムです。
これは、リアルタイム処理に重点を置いたタスクキューであり、タスクのスケジューリングもサポートしています。
Celeryには、ユーザーと寄稿者の大規模で多様なコミュニティがあります。IRCまたはメーリングリストでに参加する必要があります。
セロリの詳細については、はじめにをお読みください。
このバージョンは以前のバージョンと下位互換性がありますが、次のセクションを読むことが重要です。
このバージョンは、CPython 2.7、3.4、および3.5で正式にサポートされています。 また、PyPyでもサポートされています。
目次
このバージョンにアップグレードする前に、必ず重要な注意事項をお読みください。
- 序文
- Celery3.1からのアップグレード
- 重要な注意事項
- ニュース
- 再編成、非推奨、および削除
- 非推奨のタイムラインの変更
序文
セロリ4へようこそ!
これは、2年以上の変更を伴う大規模なリリースです。 多くの新機能が付属しているだけでなく、バグの膨大なリストも修正されているため、多くの点で「SnowLeopard」リリースと呼ぶことができます。
Celeryの次のメジャーバージョンはPython3.5のみをサポートし、新しいasyncioライブラリを利用することを計画しています。
このリリースは、私の雇用主である Robinhood のサポートなしには実現できなかったでしょう(私たちは採用しています!)。
- ソレムに聞く
セバスチャン「ゼブ」ビョルネルド(RIP)に捧げ、新しいロゴをデザインしてくれた Ty Wilkins 、これを実現するのに貢献したすべての貢献者、そして Robinhood の同僚に感謝します。 。
貢献者の壁
Aaron McMillin、Adam Chainz、Adam Renberg、Adriano Martins de Jesus、Adrien Guinet、Ahmet Demir、AitorGómez-Goiri、Alan Justino、Albert Wang、Alex Koshelev、Alex Rattray、Alex Williams、Alexander Koshelev、Alexander Lebedev、Alexander Oblovatniy、Alexey Kotlyarov、Ali Bozorgkhan、AliceZoëBevan–McGregor、Allard Hoeve、Alman One、Amir Rustamzadeh、Andrea Rabbaglietti、Andrea Rosa、Andrei Fokau、Andrew Rodionoff、Andrew Stewart、Andriy Yurchuk、Aneil Mallavarapu、Areski Belaid、Armenak Baburyan、Art Artyom Koval、Asif Saifuddin Auvi、Ask Solem、Balthazar Rouberol、Batiste Bieler、Berker Peksag、Bert Vanderbauwhede、Brendan Smithyman、Brian Bouterse、Bryce Groff、Cameron Will、ChangBo Guo、Chris Clark、Chris Duryee、Chris Erway、Chris Har Martin、Chillar Anand、Colin McIntosh、Conrad Kramer、Corey Farwell、Craig Jellick、Cullen Rhodes、Dallas Marlow、Daniel Devine、Daniel Wallace、Danilo Bargen、Davanum Srinivas、Dave Smith、David Baumgold、David Harrigan、David P ravec、Dennis Brakhane、Derek Anderson、Dmitry Dygalo、Dmitry Malinovsky、Dongweiming、DudásÁdám、DustinJ。 Mitchell、Ed Morley、Edward Betts、ÉloiRivard、Emmanuel Cazenave、Fahad Siddiqui、Fatih Sucu、Feanil Patel、Federico Ficarelli、Felix Schwarz、Felix Yan、Fernando Rocha、Flavio Grossi、Frantisek Holop、Gao Jiangmiao、George Whewell Gilles Dartiguelongue、Gino Ledesma、Greg Wilbur、Guillaume Seguin、Hank John、Hogni Gylfason、Ilya Georgievsky、Ionel CristianMărieș、Ivan Larin、James Pulec、Jared Lewis、Jason Veatch、Jasper Bryant-Greene、Jeff Widman、Jeremy T 、Jocelyn Delalande、Joe Jevnik、Joe Sanford、John Anderson、John Barham、John Kirkham、John Whitlock、Jonathan Vanasco、Joshua Harlow、JoãoRicardo、Juan Carlos Ferrer、Juan Rossi、Justin Patrin、Kai Groner、Kevin Harvey、Kevin Richardson、 Komu Wairagu、Konstantinos Koukopoulos、Kouhei Maeda、Kracekumar Ramaraju、Krzysztof Bujniewicz、LatitiaM。 Haskins、Len Buckens、Lev Berman、lidongming、Lorenzo Mancini、Lucas Wiman、Luke Pomfrey、Luyun Xie、Maciej Obuchowski、Manuel Kaufmann、Marat Sharafutdinov、Marc Sibson、Marcio Ribeiro、Marin Atanasov Nikolov、Mathieu Fenniak、Mark Parncut Maxime Beauchemin、Maxime Vdb、Mher Movsisyan、Michael Aquilina、Michael Duane Mooring、Michael Permana、MickaëlPenhard、Mike Attwood、Mitchel Humpherys、Mohamed Abouelsaoud、Morris Tweed、Morton Fox、Môshevander Sterre、Nat Williams、Nathan Van Gheem、Nicolas Unravel、Nik Nyby、Omer Katz、Omer Korner、Ori Hoch、Paul Pearce、Paulo Bu、Pavlo Kapyshin、Philip Garnero、Pierre Fersing、Piotr Kilczuk、PiotrMaślanka、Quentin Pradet、Radek Czajka、Raghuram Srinivasan、Randy Barlow RémyLéone、Robert Coup、Robert Kolba、Rockallite Wulf、Rodolfo Carvalho、Roger Hu、Romuald Brunet、Rongze Zhu、Ross Deane、Ryan Luckie、RémyGreinhofer、Samuel Giffard、Samuel Jaillet、Sergey Azovskov、Sergey Tikhonov、Seungha Kim、Simon rs、スペンサーE。 Olson、Srinivas Garlapati、Stephen Milner、Steve Peak、Steven Sklar、Stuart Axon、Sukrit Khera、TadejJanež、Taha Jahangir、Takeshi Kanemoto、Tayfun Sen、Tewfik Sadaoui、Thomas French、Thomas Grainger、Tomas Machalek、Tobias Schottdorf、Tocho Tochev Valentyn Klindukh、Vic Kumar、Vladimir Bolshakov、Vladimir Gorbunov、Wayne Chang、Wieland Hoffmann、Wido den Hollander、Wil Langford、Will Thompson、William King、Yury Selivanov、Vytis Banaitis、Zoran Pavlovic、Xin Li、許邱翔、[X475] :github_user: `allenling` 、:github_user:` alzeih` 、:github_user: `bastb` 、:github_user:` bee-keeper` 、:github_user: `ffeast` 、:github_user:` firefly4268` 、:github_user: `flyingfoxlee` 、:github_user:` gdw2` [ X739X]、:github_user: `gitaarik` 、:github_user:` hankjin` 、:github_user: `lvh` 、:github_user:` m -vdb` 、:github_user: `kindule` 、:github_user:` mdk` [X93 5X]:、:github_user: `michael-k` 、:github_user:` mozillazg` 、:github_user: `nokrik` 、:github_user : `ocean1` 、:github_user:` orlo666` 、:github_user: `raducc` 、:github_user:` wanglei` 、 :github_user: `worldexception` 、:github_user:` xBeAsTx` 。
ノート
この壁はgitの履歴から自動的に生成されたため、残念ながら、メーリングリストの質問への回答など、より重要なことを支援する人は含まれていません。
Celery3.1からのアップグレード
ステップ1:Celery3.1.25にアップグレードする
まだ行っていない場合、最初のステップはCelery3.1.25にアップグレードすることです。
このバージョンでは、新しいメッセージプロトコルに上位互換性が追加されているため、3.1から4.0に段階的にアップグレードできます。
3.1.25にアップグレードして、最初にワーカーをデプロイします。これは、これらのワーカーが3.1と4.0の両方を使用してクライアントから送信されたメッセージを処理できることを意味します。
ワーカーがアップグレードされた後、クライアントをアップグレードできます(例: Webサーバー)。
手順2:新しい設定名で構成を更新します
このバージョンでは、構成設定名が大幅に変更され、一貫性が向上しています。
変更には完全な下位互換性があるため、古い設定名が非推奨になるまで待つオプションがありますが、移行を容易にするために、設定を自動的に書き換えるコマンドラインユーティリティが含まれています。
詳細については、小文字の設定名を参照してください。
ステップ3:このドキュメントの重要な注意事項を読む
次のセクションに記載されている重要なアップグレードに関する注意事項の影響を受けていないことを確認してください。
特に重要な注意点は、Celeryがタスクに送信した引数を署名と照合することでチェックするようになったことです(タスク引数のチェック)。
ステップ4:Celery4.0にアップグレードする
この時点で、ワーカーとクライアントを新しいバージョンにアップグレードできます。
重要な注意事項
Python2.6のサポートを終了しました
CeleryにはPython2.7以降が必要になり、Python 3.3のサポートも終了するため、サポートされるバージョンは次のとおりです。
- CPython 2.7
- CPython 3.4
- CPython 3.5
- PyPy 5.4(
pypy2
) - PyPy 5.5-alpha(
pypy3
)
Python2をサポートする最後のメジャーバージョン
Celery 5.0以降、Python3.5以降のみがサポートされます。
この変更の影響を受けないようにするには、要件ファイルのCeleryバージョンを特定のバージョンcelery==4.0.0
または範囲celery>=4.0,<5.0
に固定する必要があります。
Python 2のサポートを終了すると、大量の互換性コードを削除できるようになります。Python3.5を使用すると、入力、async / await、asyncioなど、古いバージョンに代わるものがない概念を利用できます。
Celery4.xは引き続きPython2.7、3.4、3.5で動作します。 Celery3.xがPython2.6で引き続き機能するように。
Djangoのサポート
Celery4.xにはDjango1.8以降が必要ですが、新しいtransaction.on_commit
機能には少なくともDjango1.9を使用することを強くお勧めします。
Djangoからタスクを呼び出すときの一般的な問題は、タスクがモデルの変更に関連していて、トランザクションがロールバックされた場合にタスクをキャンセルするか、変更がデータベースに書き込まれた後にのみタスクが実行されるようにする場合です。
transaction.atomic
を使用すると、トランザクションがコミットされたときにのみ呼び出されるコールバックとしてタスクを追加することで、この問題を解決できます。
使用例:
from functools import partial
from django.db import transaction
from .models import Article, Log
from .tasks import send_article_created_notification
def create_article(request):
with transaction.atomic():
article = Article.objects.create(**request.POST)
# send this task only if the rest of the transaction succeeds.
transaction.on_commit(partial(
send_article_created_notification.delay, article_id=article.pk))
Log.objects.create(type=Log.ARTICLE_CREATED, object_pk=article.pk)
削除された機能
MicrosoftWindowsはサポートされなくなりました。
テストスイートは合格であり、CeleryはWindowsで動作しているようですが、このプラットフォームで問題を診断できないため、保証はできません。 このプラットフォームでのサポートが必要な企業の場合は、ご連絡ください。
Jythonはサポートされなくなりました。
簡単にするために機能が削除されました
Webhookタスク機構(
celery.task.http
)は削除されました。最近では、:pypi: `requests` モジュールを使用してWebhookタスクを手動で作成するのは簡単です。 リクエストを使用したいのですが、Pythonコミュニティには非常に声高な「反依存性」の暴徒がいるため、使用できません。
下位互換性が必要な場合は、モジュールの3.1バージョンをコピーして貼り付け、ワーカーによってインポートされていることを確認できます: https://github.com/celery/celery/blob/3.1/celery/task/http .py
タスクがエラーメールを送信しなくなりました。
これにより、
app.mail_admins
のサポート、および電子メールの送信に関連するすべての機能も削除されます。celery.contrib.batches
は削除されました。これは実験的な機能であったため、非推奨のタイムライン保証の対象にはなりませんでした。
プロジェクト内で使用するために、既存のバッチコードをコピーしてペースを調整できます: https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py
資金不足のために機能が削除されました
3.1リリースで、一部のトランスポートが実験的な状態に移行し、トランスポートに対する公式のサポートがないことを発表しました。
資金調達の必要性に関するこの微妙なヒントが失敗したため、それらを完全に削除し、下位互換性を壊しました。
DjangoORMをブローカーとして使用することはサポートされなくなりました。
結果バックエンドとしてDjangoORMを引き続き使用できます。詳細については、 django-celery-results-結果バックエンドとしてのDjangoORM / Cacheの使用セクションを参照してください。
SQLAlchemyをブローカーとして使用することはサポートされなくなりました。
結果のバックエンドとしてSQLAlchemyを引き続き使用できます。
CouchDBをブローカーとして使用することはサポートされなくなりました。
結果のバックエンドとして引き続きCouchDBを使用できます。
IronMQをブローカーとして使用することはサポートされなくなりました。
Beanstalkをブローカーとして使用することはサポートされなくなりました。
さらに、一部の機能は完全に削除されているため、それらを使用しようとすると例外が発生します。
--autoreload
機能は削除されました。これは実験的な機能であり、非推奨のタイムライン保証の対象ではありません。 フラグが完全に削除されるため、ワーカーが存在すると起動時にクラッシュします。 幸い、このフラグは本番システムでは使用されていません。
実験的な
threads
プールはサポートされなくなり、削除されました。force_execv
機能はサポートされなくなりました。celery worker
コマンドは、--no-execv
、--force-execv
、およびCELERYD_FORCE_EXECV
設定を無視するようになりました。このフラグは5.0で完全に削除され、ワーカーはエラーを発生させます。
古いレガシー「amqp」結果バックエンドは非推奨になり、Celery5.0で削除される予定です。
RPCスタイルの呼び出しには
rpc
結果バックエンドを使用し、マルチコンシューマー結果には永続的な結果バックエンドを使用してください。
これらのほとんどは、かなりの労力をかけなくても修正できると考えています。これらの機能のいずれかを元に戻すことに興味がある場合は、ご連絡ください。
さて、朗報です…
新しいタスクメッセージプロトコル
このバージョンでは、プロジェクトの開始以来、プロトコルに最初の大きな変更が加えられた、まったく新しいタスクメッセージプロトコルが導入されています。
このバージョンでは、新しいプロトコルがデフォルトで有効になっています。新しいバージョンには下位互換性がないため、アップグレードする際には注意が必要です。
3.1.25バージョンは、新しいプロトコルとの互換性を追加するためにリリースされたため、アップグレードする最も簡単な方法は、最初にそのバージョンにアップグレードしてから、2番目の展開で4.0にアップグレードすることです。
古いプロトコルを引き続き使用する場合は、使用するプロトコルのバージョン番号を構成することもできます。
app = Celery()
app.conf.task_protocol = 1
新しいプロトコルで利用可能な機能の詳細については、このドキュメントの後半にあるニュースセクションを参照してください。
小文字の設定名
美しさを追求するために、すべての設定の名前がすべて小文字に変更され、一貫性を保つために一部の設定名の名前が変更されました。
この変更には完全な下位互換性があるため、大文字の設定名を引き続き使用できますが、できるだけ早くアップグレードしてください。セロリアップグレード設定コマンドを使用して自動的にアップグレードできます。
$ celery upgrade settings proj/settings.py
このコマンドは、新しい小文字の名前を使用するようにモジュールをインプレースで変更し(「CELERY
」プレフィックスが付いた大文字が必要な場合は、以下のブロックを参照)、バックアップをproj/settings.py.orig
に保存します。
大文字の名前を保持したいDjangoユーザーやその他のユーザー向け
Django設定モジュールからCelery構成をロードする場合は、大文字の名前を引き続き使用する必要があります。
また、CELERY_
プレフィックスを使用して、Celery設定が他のアプリで使用されるDjango設定と衝突しないようにする必要があります。
これを行うには、最初に設定ファイルを変換して新しい一貫した命名スキームを使用し、Celery関連のすべての設定にプレフィックスを追加する必要があります。
$ celery upgrade settings proj/settings.py --django
設定ファイルをアップグレードした後、proj/celery.py
モジュールでプレフィックスを明示的に設定する必要があります。
app.config_from_object('django.conf:settings', namespace='CELERY')
最新のDjangoCelery統合の例は、 Django の最初のステップにあります。
ノート
これにより、以前はプレフィックスがなかった設定にもプレフィックスが追加されます。たとえば、BROKER_URL
はCELERY
CELERY_BROKER_URL
の名前空間でCELERY_BROKER_URL
と記述する必要があります。
幸い、セロリのアップグレード設定--django プログラムが正しく機能するため、ファイルを手動で変更する必要はありません。
ローダーは、構成が新しい形式を使用しているかどうかを検出し、それに応じて動作しますが、これは、両方の選択肢に値を指定しない限り、新しい設定名と古い設定名を混在させて一致させることはできないことも意味します。
以前のバージョンとの主な違いは、小文字の名前を除いて、celerybeat_
からbeat_
、celeryd_
からworker_
などの一部のプレフィックスの名前が変更されていることです。
celery_
プレフィックスも削除され、この名前空間のタスク関連の設定のプレフィックスはtask_
になり、ワーカー関連の設定のプレフィックスはworker_
になります。
これを除けば、いくつかの特別な設定を除いて、ほとんどの設定は小文字で同じになります。
新しい小文字の設定で変更の完全な表を見ることができます。
Jsonがデフォルトのシリアライザーになりました
ついにデフォルトのシリアル化メカニズムとしてのpickle
の統治を終わらせる時が来ました、そしてjsonはこのバージョンから始まるデフォルトのシリアライザーです。
この変更は、Celery 3.1 のリリースで発表されました。
pickle
がデフォルトのシリアライザーであることに依然依存している場合は、4.0にアップグレードする前にアプリを構成する必要があります。
task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = {'pickle'}
Jsonシリアライザーは、いくつかの追加タイプもサポートするようになりました。
datetime
、time
、date
ISO-8601形式のjsonテキストに変換されます。
Decimal
jsonテキストに変換されます。
django.utils.functional.Promise
Djangoのみ:翻訳などに使用される遅延文字列が評価され、json型への変換が試行されます。
uuid.UUID
jsonテキストに変換されます。
カスタムクラスで__json__
メソッドを定義して、JSONシリアル化をサポートすることもできます(json互換タイプを返す必要があります)。
class Person:
first_name = None
last_name = None
address = None
def __json__(self):
return {
'first_name': self.first_name,
'last_name': self.last_name,
'address': self.address,
}
Task基本クラスはタスクを自動的に登録しなくなりました
@Task
クラスは、タスクをタスクレジストリに自動的に登録する特別なメタクラスを使用しなくなりました。
代わりに、これは@task
デコレータによって処理されるようになりました。
クラスベースのタスクをまだ使用している場合は、これらを手動で登録する必要があります。
class CustomTask(Task):
def run(self):
print('running')
CustomTask = app.register_task(CustomTask())
ベストプラクティスは、一般的な動作をオーバーライドするためにのみカスタムタスククラスを使用し、次にタスクデコレータを使用してタスクを実現することです。
@app.task(bind=True, base=CustomTask)
def custom(self):
print('running')
この変更は、タスクのabstract
属性が無効になったことも意味します。
タスク引数のチェック
タスクを呼び出すときに、非同期でもタスクの引数が検証されるようになりました。
>>> @app.task
... def add(x, y):
... return x + y
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
>>> add.delay(8)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 376, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 485, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)
typing
属性をFalse
に設定することにより、任意のタスクの引数チェックを無効にできます。
>>> @app.task(typing=False)
... def add(x, y):
... return x + y
または、すべてのタスクでこれを完全に無効にしたい場合は、アプリの作成時にstrict_typing=False
を渡すことができます。
app = Celery(..., strict_typing=False)
Redisイベントには下位互換性がありません
Redis fanout_patterns
およびfanout_prefix
トランスポートオプションがデフォルトで有効になりました。
これらのフラグが有効になっていないワーカー/モニターは、このフラグが無効になっているワーカーを表示できません。 彼らはまだタスクを実行できますが、お互いに監視メッセージを受信することはできません。
4.0への最終アップグレードの前に、最初に3.1ワーカーとモニターを構成して設定を有効にすることにより、下位互換性のある方法でアップグレードできます。
BROKER_TRANSPORT_OPTIONS = {
'fanout_patterns': True,
'fanout_prefix': True,
}
Redisの優先順位が逆になりました
優先度0が最低になり、9が最高になります。
この変更は、優先サポートをAMQPでの動作と一致させるために行われました。
Alex Koshelev による寄稿。
Django:自動検出でDjangoアプリの構成がサポートされるようになりました
autodiscover_tasks()
関数を引数なしで呼び出すことができるようになり、Djangoハンドラーがインストール済みのアプリを自動的に検出します。
app.autodiscover_tasks()
ドキュメントのDjango統合の例は、引数なしの呼び出しを使用するように更新されました。
これにより、最近のDjangoバージョンで導入された新しいAppConfig
のものとの互換性も保証されます。
ワーカーの直接キューは自動削除を使用しなくなりました
4.0を実行しているワーカー/クライアントは、古いバージョンを実行しているワーカーにワーカーダイレクトメッセージを送信できなくなります。その逆も同様です。
ワーカーダイレクトメッセージに依存している場合は、celery.utils.worker_direct()
を次の実装に置き換えて、最初に新しいルーティング設定を使用するように3.xワーカーとクライアントをアップグレードする必要があります。
from kombu import Exchange, Queue
worker_direct_exchange = Exchange('C.dq2')
def worker_direct(hostname):
return Queue(
'{hostname}.dq2'.format(hostname),
exchange=worker_direct_exchange,
routing_key=hostname,
)
この機能により、問題#2492がクローズされました。
古いコマンドラインプログラムが削除されました
Celeryをインストールしても、celeryd
、celerybeat
、およびceleryd-multi
プログラムはインストールされなくなります。
これはCelery3.1のリリースで発表されましたが、古い名前を指すスクリプトがまだ残っている可能性があるため、新しいアンブレラコマンドを使用するようにこれらを更新してください。
プログラム | 新しいステータス | 置換 |
---|---|---|
celeryd
|
NS | セロリ労働者 |
celerybeat
|
NS | セロリビート |
celeryd-multi
|
NS | セロリマルチ |
ニュース
新しいプロトコルのハイライト
新しいプロトコルは、古いプロトコルの多くの問題を修正し、長い間要求されていた機能を有効にします。
ほとんどのデータは、メッセージ本文でシリアル化されるのではなく、メッセージヘッダーとして送信されるようになりました。
プロトコルのバージョン1では、タスクID、名前などのタスクメタデータを読み取れるように、ワーカーは常にメッセージを逆シリアル化する必要がありました。 これは、ワーカーがデータを二重にデコードすることを余儀なくされたことも意味します。最初に受信時にメッセージを逆シリアル化し、メッセージを再度シリアル化して子プロセスに送信し、最後に子プロセスがメッセージを再度逆シリアル化します。
メッセージヘッダーにメタデータフィールドを保持することは、ワーカーがタスクを子プロセスに配信する前にペイロードを実際にデコードする必要がないことを意味します。また、ワーカーがとは異なる言語で記述されたタスクを再ルーティングできるようになりました。別のワーカーへのPython。
新しい
lang
メッセージヘッダーを使用して、タスクが記述されているプログラミング言語を指定できます。ワーカーは、
ContentDisallowed
などの内部エラーやその他の逆シリアル化エラーの結果を保存します。ワーカーは結果を保存し、未登録のタスクエラーの監視イベントを送信します。
結果が親プロセスによって送信された場合でも、ワーカーはコールバック/エラーバックを呼び出します(たとえば、子プロセスが終了したときの
WorkerLostError
、逆シリアル化エラー、未登録タスク)。新しい
origin
ヘッダーには、タスクを送信するプロセスに関する情報(ワーカーノード名、またはPIDとホスト名の情報)が含まれます。新しい
shadow
ヘッダーを使用すると、ログで使用されるタスク名を変更できます。これは、pickleを使用して関数を呼び出すタスクのようなパターンのようなディスパッチに役立ちます(自宅ではこれを行わないでください)。
from celery import Task from celery.utils.imports import qualname class call_as_task(Task): def shadow_name(self, args, kwargs, options): return 'call_as_task:{0}'.format(qualname(args[0])) def run(self, fun, *args, **kwargs): return fun(*args, **kwargs) call_as_task = app.register_task(call_as_task())
新しい
argsrepr
およびkwargsrepr
フィールドには、ログやモニターなどで使用するためのタスク引数(切り捨てられる可能性があります)のテキスト表現が含まれています。これは、情報提供の目的でタスク引数を表示するために、ワーカーがメッセージペイロードを逆シリアル化する必要がないことを意味します。
チェーンは専用の
chain
フィールドを使用するようになり、数千以上のタスクのチェーンをサポートできるようになりました。新しい
parent_id
およびroot_id
ヘッダーは、他のタスクとのタスクの関係に関する情報を追加します。parent_id
は、このタスクを呼び出したタスクのタスクIDです。root_id
は、ワークフローの最初のタスクです。
これらのフィールドを使用して、花などのモニターを改善し、関連するメッセージ(チェーン、グループ、コード、完全なワークフローなど)をグループ化できます。
app.TaskProducer
は@amqp.create_task_message()
および@amqp.send_task_message()
に置き換えられました。責任を作成と送信に分割するということは、Python AMQPクライアントを使用してメッセージを直接送信したい人は、プロトコルを実装する必要がないことを意味します。
@amqp.create_task_message()
メソッドは、構成されたタスクプロトコルに応じて、@amqp.as_task_v2()
または@amqp.as_task_v1()
のいずれかを呼び出し、ヘッダー、プロパティ、および本体を含む特別なtask_message
タプルを返します。タスクメッセージ。
プレフォークプールの改善
タスクが子プロセスからログに記録されるようになりました
タスクの成功/失敗のログは、タスクを実行している子プロセスから発生するようになりました。 その結果、Sentryなどのロギングユーティリティは、トレースバックスタック内の変数を含むタスクに関する完全な情報を取得できます。
-Ofairがデフォルトのスケジューリング戦略になりました
3.1でデフォルトの動作を再度有効にするには、-Ofast
コマンドラインオプションを使用します。
-Ofair
コマンドラインオプションの機能については多くの混乱があり、説明で「プリフェッチ」という用語を使用しても、この用語がAMQPでどれほど混乱しているのかを考えると、おそらく役に立たなかったでしょう。
preforkプールを使用しているCeleryワーカーがタスクを受信すると、そのタスクを子プロセスに委任して実行する必要があります。
プリフォークプールには、タスクの実行に使用できる構成可能な数の子プロセス(--concurrency
)があり、各子プロセスはパイプ/ソケットを使用して親プロセスと通信します。
- インキュー(パイプ/ソケット):親が子プロセスにタスクを送信します
- アウトキュー(パイプ/ソケット):子は結果/戻り値を親に送信します。
Celery 3.1では、デフォルトのスケジューリングメカニズムは、書き込み可能な最初のinqueue
にタスクを送信するだけでした。いくつかのヒューリスティックを使用して、各子プロセスが同じ量のタスクを受け取るように、それらの間でラウンドロビンを実行します。 。
これは、デフォルトのスケジューリング戦略では、ワーカーがすでにタスクを実行しているのと同じ子プロセスにタスクを送信できることを意味します。 そのタスクが長時間実行されている場合、待機中のタスクを長時間ブロックする可能性があります。 さらに悪いことに、自由に作業できる子プロセスがある場合でも、何百もの短期間のタスクが長期のタスクの背後に立ち往生する可能性があります。
この状況を回避するために-Ofair
スケジューリング戦略が追加され、有効にすると、すでにタスクを実行している子プロセスにタスクを送信しないというルールが追加されます。
実行時間の短いタスクしかない場合、公平なスケジューリング戦略のパフォーマンスがわずかに低下する可能性があります。
子プロセスの常駐メモリサイズを制限する
ワーカー--max-memory-per-child
オプション、または:setting: `worker_max_memory_per_child` 設定を設定することにより、プリフォークプールの子プロセスごとに割り当てられるメモリの最大量を制限できるようになりました。
制限はRSS /常駐メモリサイズ用であり、キロバイト単位で指定されます。
制限を超えた子プロセスは、現在実行中のタスクが戻った後、終了して新しいプロセスに置き換えられます。
詳細については、子あたりの最大メモリ設定を参照してください。
Dave Smith による寄稿。
子プロセスごとに1つのログファイル
Init-scripsおよび celery multi は、%I ログファイル形式オプション(/var/log/celery/%n%I.log
など)を使用するようになりました。
この変更は、タスクログを子プロセスに移動した後に各子プロセスが個別のログファイルを持つようにするために必要でした。同じログファイルに書き込む複数のプロセスが破損を引き起こす可能性があるためです。
この新しいオプションを使用するには、init-scriptsと celery multi 引数をアップグレードすることをお勧めします。
トランスポート
読み取り/書き込み用にブローカーURLを個別に構成する
新しい:setting: `broker_read_url` および:setting:` broker_write_url` 設定が追加され、消費/公開に使用される接続に個別のブローカーURLを提供できるようになりました。
構成オプションに加えて、2つの新しいメソッドがアプリAPIに追加されました。
app.connection_for_read()
app.connection_for_write()
app.connection()
の代わりにこれらを使用して、必要な接続の目的を指定する必要があります。
ノート
app.pool
(読み取り)とapp.producer_pool
(書き込み)の2つの接続プールを使用できます。 後者は実際には接続を提供しませんが、完全なkombu.Producer
インスタンスを提供します。
def publish_some_message(app, producer=None):
with app.producer_or_acquire(producer) as producer:
...
def consume_messages(app, connection=None):
with app.connection_or_acquire(connection) as connection:
...
RabbitMQキュー拡張機能のサポート
message_ttl
およびexpires
引数を使用して、キュー宣言でメッセージTTLとキューの有効期限を直接設定できるようになりました。
Queue
に新しい引数が追加され、キュー宣言でRabbitMQキュー拡張を直接かつ便利に構成できるようになりました。
Queue(expires=20.0)
キューの有効期限をfloat秒で設定します。
kombu.Queue.expires
を参照してください。Queue(message_ttl=30.0)
キューメッセージの存続可能時間float秒を設定します。
kombu.Queue.message_ttl
を参照してください。Queue(max_length=1000)
キューの最大長(メッセージ数)をintとして設定します。
kombu.Queue.max_length
を参照してください。Queue(max_length_bytes=1000)
キューの最大長(メッセージサイズの合計(バイト単位))をintとして設定します。
kombu.Queue.max_length_bytes
を参照してください。Queue(max_priority=10)
メッセージの
priority
フィールドに基づいてメッセージをルーティングする優先キューとしてキューを宣言します。kombu.Queue.max_priority
を参照してください。
AmazonSQSトランスポートが正式にサポートされるようになりました
SQSブローカートランスポートは非同期I / Oを使用するように書き直されており、公式にサポートされているトランスポートとしてRabbitMQ、Redis、およびQPidに参加しています。
新しい実装はまた、長いポーリングを利用し、ブローカーとしてのSQSの使用に関連するいくつかの問題を解決します。
この作品はNextdoorによって後援されました。
ApacheQPidトランスポートが正式にサポートされるようになりました
Brian Bouterse による寄稿。
Redis:Sentinelのサポート
次のようなセンチネルURLのリストへの接続を指定できます。
sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...
ここで、各歩哨は; で区切られています。 複数の番兵はkombu.Connection
コンストラクターによって処理され、接続に失敗した場合に接続するサーバーの代替リストに配置されます。
Sergey Azovskov 、および Lorenzo Mancini による寄稿。
タスク
タスク自動再試行デコレータ
例外イベントのカスタム再試行処理を作成することは非常に一般的であるため、サポートが組み込まれています。
このため、新しいautoretry_for
引数がタスクデコレータでサポートされるようになりました。ここで、例外のタプルを指定して、次のことを自動的に再試行できます。
from twitter.exceptions import FailWhaleError
@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
return twitter.refresh_timeline(user)
詳細については、既知の例外の自動再試行を参照してください。
Dmitry Malinovsky による寄稿。
Task.replaceの改善
self.replace(signature)
は、任意のタスク、コード、またはグループを置き換えることができるようになりました。置き換える署名は、コード、グループ、またはその他のタイプの署名にすることができます。既存のタスクのコールバックとエラーバックを継承しなくなりました。
ツリー内のノードを置き換える場合、新しいノードが古いノードの子を継承することは期待できません。
Task.replace_in_chord
は削除されました。代わりに.replace
を使用してください。置換がグループの場合、そのグループは自動的にコードに変換され、コールバックはグループタスクの結果を「蓄積」します。
新しい組み込みタスク( celery.accumulate がこの目的のために追加されました)
Steeve Morin 、および Ask Solem による寄稿。
リモートタスクのトレースバック
新しい:setting: `task_remote_tracebacks` は、リモートワーカーのスタックを挿入することにより、タスクのトレースバックをより便利にします。
この機能には、追加の:pypi: `tblib` ライブラリが必要です。
IonelCristianMărieș による寄稿。
タスク接続エラーの処理
タスクの送信中に発生した接続関連のエラーは、kombu.exceptions.OperationalError
エラーとして再発生するようになりました。
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... print('Could not send task %r: %r' % (add, exc))
詳細については、接続エラー処理を参照してください。
Gevent / Eventlet:結果を消費するための専用スレッド
:pypi: `gevent` 、または:pypi:` eventlet` を使用する場合、イベントの消費を担当する単一のスレッドが存在するようになりました。
これは、結果を取得する呼び出しが多数ある場合、それらを消費するための専用スレッドがあることを意味します。
result = add.delay(2, 2)
# this call will delegate to the result consumer thread:
# once the consumer thread has received the result this greenlet can
# continue.
value = result.get(timeout=3)
これにより、gevent / eventletを使用する場合のRPC呼び出しの実行が大幅に向上します。
AsyncResult.then(on_success, on_error)
AsyncResult APIは、promise
プロトコルをサポートするように拡張されました。
これは現在、RPC(amqp)とRedisの結果バックエンドでのみ機能しますが、タスクが終了したときにコールバックをアタッチできます。
import gevent.monkey
monkey.patch_all()
import time
from celery import Celery
app = Celery(broker='amqp://', backend='rpc')
@app.task
def add(x, y):
return x + y
def on_result_ready(result):
print('Received result for id %r: %r' % (result.id, result.result,))
add.delay(2, 2).then(on_result_ready)
time.sleep(3) # run gevent event loop for a while.
ここでは:pypi: `gevent` を使用して示されていますが、実際には、これは:pypi:` twisted` やなどのコールバックベースのイベントループでより役立つAPIです。 pypi: `tornado` 。
新しいタスクルーターAPI
:setting: `task_routes` 設定で関数を保持できるようになり、マップルートでグロブパターンと正規表現がサポートされるようになりました。
ルータークラスを使用する代わりに、関数を簡単に定義できるようになりました。
def route_for_task(name, args, kwargs, options, task=None, **kwargs):
from proj import tasks
if name == tasks.add.name:
return {'queue': 'hipri'}
開始引数を使用できる引数が必要ない場合は、将来的に機能を追加できるように、常にスター引数も受け入れるようにしてください。
def route_for_task(name, *args, **kwargs):
from proj import tasks
if name == tasks.add.name:
return {'queue': 'hipri', 'priority': 9}
options
引数と新しいtask
キーワード引数はどちらも関数型ルーターの新機能であり、実行オプションまたはタスクのプロパティに基づいてルーターを簡単に作成できるようになります。
@send_task()
を使用してタスクが名前で呼び出された場合、オプションのtask
キーワード引数は設定されません。
ルーターでのglob / regexesの使用など、その他の例については、:setting: `task_routes` および自動ルーティングを参照してください。
キャンバスリファクタリング
キャンバス/ワークフローの実装は、いくつかの長い未解決の問題を修正するために大幅にリファクタリングされました。
エラーコールバックは、実際の例外インスタンスとトレースバックインスタンスを受け取ることができるようになりました(問題#2538)。
>>> add.s(2, 2).on_error(log_error.s()).delay()
ここで、
log_error
は次のように定義できます。@app.task def log_error(request, exc, traceback): with open(os.path.join('/var/errors', request.id), 'a') as fh: print('--\n\n{0} {1} {2}'.format( task_id, exc, traceback), file=fh)
その他の例については、 Canvas:ワークフローの設計を参照してください。
chain(a, b, c)
はa | b | c
と同じように機能するようになりました。これは、チェーンが
chain
のインスタンスを返さなくなる可能性があることを意味します。代わりに、ワークフローを最適化して、たとえば 連鎖した2つのグループが1つのグループになります。グループ内のグループを1つのグループに展開するようになりました(問題#1509)。
チャンク/マップ/スターマップタスクは、ターゲットタスクに基づいてルーティングされるようになりました
コードとチェーンは不変になりました。
シリアル化された署名が署名に変換されないバグを修正しました(問題#2078)
Ross Deane によって提供された修正。
JSONシリアル化を使用するとチェーンとグループが機能しなかった問題を修正しました(問題#2076)。
Ross Deane によって提供された修正。
コードを作成しても、キーワード引数 'task_id'に複数の値が表示されなくなりました(問題#2225)。
Aneil Mallavarapu によって提供された修正。
チェーンに最後から2番目のタスクとしてコードが含まれている場合に間違った結果が返される問題を修正しました。
Aneil Mallavarapu によって提供された修正。
group(A.s() | group(B.s() | C.s()))
の特殊なケースが機能するようになりました。チェーン:サブタスクもチェーンである場合に誤ったIDが設定されるバグを修正しました。
group | group
が1つのグループにフラット化されるようになりました(問題#2573)。group | task
が正しくコードにアップグレードされなかった問題を修正しました(問題#2922)。Chordsは
result.parent
リンクを正しく設定するようになりました。chunks
/map
/starmap
は、ターゲットタスクに基づいてルーティングされるようになりました。Signature.link
は、引数がスカラー(リストではない)の場合に機能するようになりました(問題#2019)。
group()
は、キーワード引数を適切に転送するようになりました(問題#3426)。Samuel Giffard によって提供された修正。
ヘッダーグループが単一のタスクのみで構成される
chord
は、単純なチェーンになりました。link
引数をgroup.apply_async()
に渡すと、エラーが発生するようになりました(問題#3508)。chord | sig
がコードコールバックにアタッチされるようになりました(問題#3356)。
定期的なタスク
定期的なタスクを構成するための新しいAPI
この新しいAPIを使用すると、定期的なタスクを定義するときに署名を使用できるため、タスク名の入力ミスの可能性がなくなります。
新しいAPIの例は、こちらです。
最適化されたBeatの実装
セロリビートの実装は、ヒープを使用してエントリをスケジュールすることにより、何百万もの定期的なタスク用に最適化されています。
Ask Solem および Alexander Koshelev による寄稿。
結果のバックエンド
RPC結果バックエンドが成熟しました
以前に実験的なRPC結果バックエンドの多くのバグが修正され、本番環境での使用を検討できるようになりました。
Ask Solem 、 Morris Tweed による寄稿。
Redis:結果のバックエンドの最適化
result.get()はストリーミングタスクの結果にpub / subを使用するようになりました
Redis結果バックエンドを使用するときにresult.get()
を呼び出すと、結果が利用可能になるのを待つためにポーリングを使用していたため、非常にコストがかかりました。 デフォルトのポーリング間隔0.5秒はパフォーマンスに役立ちませんでしたが、スピンループを回避するために必要でした。
新しい実装では、Redis Pub / Subメカニズムを使用して結果をすぐにパブリッシュおよび取得し、タスクのラウンドトリップ時間を大幅に改善しています。
Yaroslav Zhavoronkov および Ask Solem による寄稿。
新しい最適化されたコード結合の実装
これはCelery3.1で導入された実験的な機能であり、結果のバックエンドURL構成に?new_join=1
を追加することによってのみ有効にできました。
実装は十分にテストされており、安定していると見なされ、デフォルトで有効になっていると思われます。
新しい実装により、コードのオーバーヘッドが大幅に削減されます。特に、コードが大きい場合、パフォーマンス上のメリットは非常に大きくなります。
新しいRiak結果バックエンドが導入されました
詳細については、 conf-riak-result-backend を参照してください。
Gilles Dartiguelongue 、 Alman One 、 NoKriK による寄稿。
新しい領事結果バックエンドが導入されました
ConsulのKey / Valueストアを使用して、バックエンドとしてConsulのサポートを追加します。
ConsulにはHTTPAPIがあり、キーをその値とともに保存できます。
バックエンドはKeyValueStoreBackendを拡張し、ほとんどのメソッドを実装します。
主にオブジェクトを設定、取得、削除します。
これにより、Celeryはタスクの結果を領事のK / Vストアに保存できます。
領事は、領事からのセッションを使用してキーにTTLを設定することもできます。 このようにして、バックエンドはタスク結果の自動有効期限をサポートします。
領事の詳細については、 https://consul.io/をご覧ください。
バックエンドは、HTTP APIとの通信に:pypi: `python-consul` を使用します。 このバックエンドが次のように、このパッケージはPython3に完全に準拠しています。
$ pip install python-consul
これにより、PythonからConsulのHTTPAPIと通信するために必要なパッケージがインストールされます。
Celeryへの依存関係の拡張機能としてconsulを指定することもできます。
$ pip install celery[consul]
詳細については、バンドルを参照してください。
Wido den Hollander による寄稿。
真新しいカサンドラ結果バックエンド
新しい:pypi: `cassandra-driver` ライブラリを利用する新しいCassandraバックエンドは、古い:pypi:` pycassa` ライブラリを使用する古い結果バックエンドを置き換えています。
詳細については、 Cassandraバックエンド設定を参照してください。
結果としてバックエンドの使用としてCassandraでCeleryに依存するには:
$ pip install celery[cassandra]
複数の拡張要件を組み合わせることもできます。詳細については、バンドルを参照してください。
新しいElasticsearch結果バックエンドが導入されました
詳細については、 Elasticsearchバックエンド設定を参照してください。
結果としてElasticsearchでCeleryに依存するには、次のように使用します。
$ pip install celery[elasticsearch]
複数の拡張要件を組み合わせることもできます。詳細については、バンドルを参照してください。
Ahmet Demir による寄稿。
イベントのバッチ処理
イベントはワーカーにバッファリングされ、リストとして送信されるようになり、監視イベントの送信に必要なオーバーヘッドが削減されます。
カスタムイベントモニターの作成者の場合、Python Celeryヘルパー(Receiver
)を使用してモニターを実装している限り、アクションは必要ありません。
ただし、生のイベントメッセージを解析する場合は、次の点で通常のイベントメッセージとは異なるため、バッチイベントメッセージを考慮する必要があります。
- イベントメッセージのバッチのルーティングキーは
<event-group>.multi
に設定され、バッチ処理されたイベントグループは現在task
のみです(task.multi
のルーティングキーを指定)。 - メッセージ本文は、辞書ではなく、シリアル化された辞書のリストになります。 リスト内の各項目は、通常のイベントメッセージ本文と見なすことができます。
ほかのニュースでは…
要件
- 昆布4.0 に依存するようになりました。
- 現在、:pypi: `billiard` バージョン3.5に依存しています。
- :pypi: `anyjson` に依存しなくなりました。 さようなら古くからの友人:(
タスク
「anon-exchange」は、単純な名前と名前の直接ルーティングに使用されるようになりました。
これにより、ルーティングテーブルが完全にバイパスされるため、パフォーマンスが向上します。さらに、Redisブローカートランスポートの信頼性も向上します。
空のResultSetはTrueと評価されるようになりました。
Colin McIntosh によって提供された修正。
デフォルトのルーティングキー(:setting: `task_default_routing_key` )と交換名(:setting:` task_default_exchange` )は、:setting: `task_default_queue` [から取得されるようになりました。 X188X]設定。
つまり、デフォルトキューの名前を変更するには、1つの設定を設定するだけで済みます。
新しい:setting: `task_reject_on_worker_lost` 設定、および
reject_on_worker_lost
タスク属性は、遅延ackタスクを実行している子ワーカープロセスが終了したときに何が起こるかを決定します。Michael Permana による寄稿。
Task.subtask
はエイリアス付きのTask.signature
に名前が変更されました。Task.subtask_from_request
はエイリアス付きのTask.signature_from_request
に名前が変更されました。kombu.Queue
のdelivery_mode
属性が尊重されるようになりました(問題#1953)。:setting: `task-routes` のルートで、
Queue
インスタンスを直接指定できるようになりました。例:
task_routes = {'proj.tasks.add': {'queue': Queue('add')}}
task_idがNoneの場合、
AsyncResult
はValueError
を発生させるようになりました。 (問題#1996)。再試行されたタスクは、有効期限の設定を転送しませんでした(問題#3297)。
result.get()
は、on_message
引数をサポートして、受信したすべてのメッセージに対して呼び出されるコールバックを設定するようになりました。追加された新しい抽象クラス:
CallableTask
タスクのように見えます。
CallableSignature
タスクの署名のように見えます。
Task.replace
は、コールバックを適切に転送するようになりました(問題#2722)。Nicolas Unravel によって提供された修正。
Task.replace
:チェーン/コードに追加(#3232を閉じる)チェーンに署名を追加する問題#3232を修正しました(存在する場合)。 指定された署名にコードが含まれている場合のコード抑制を修正しました。
:github_user: `honux` によって提供された修正。
タスクの再試行もイーガーモードでスローされるようになりました。
Feanil Patel によって提供された修正。
ビート
日付が無効なcrontab無限ループを修正しました。
オカレンスに到達できない場合(たとえば、4月31日)、次のオカレンスに到達しようとすると、無限ループがトリガーされます。
2,000回の反復後に
RuntimeError
を上げて、それを修正してみてください(また、プロセスでcrontabうるう年のテストを追加しました)
Romuald Brunet によって提供された修正。
例外がサービスを終了したときに、プログラムがゼロ以外の終了コードで終了することを保証するようになりました。
Simon Peeters によって提供された修正。
アプリ
:setting: `enable_utc` が無効になっている場合でも、日付は常にタイムゾーンに対応するようになりました(問題#943)。
Omer Katz によって提供された修正。
Config :アプリの事前構成も構成に組み込まれるようになりました。
Jeremy Zafran によって提供された修正。
- アプリケーションは、を使用してタスク名の生成方法を変更できるようになりました
@gen_task_name()
メソッド。Dmitry Malinovsky による寄稿。
アプリには、現在作業中のタスク(または
None
)を返す新しいapp.current_worker_task
プロパティがあります。 (問題#2100)。
ロギング
get_task_logger()
は、「celery」または「celery.task」という名前を使用しようとすると例外を発生させるようになりました(問題#3475)。
実行プール
Eventlet / Gevent :AMQPハートビートを有効にするようになりました(問題#3338)。
Eventlet / Gevent :「同時読み取り」エラーにつながる競合状態を修正しました(問題#2755)。
Prefork :Preforkプールは、使用可能な場合、
select
の代わりにpoll
を使用するようになりました(問題#2373)。Prefork :プールがワーカーのシャットダウンを拒否するバグを修正しました(問題#2606)。
Eventlet : celery inspect stats コマンドでプールサイズを返すようになりました。
Alexander Oblovatniy による寄稿。
テスト
Celeryは:pypi: `pytest` プラグインになり、ユニットテストと統合テストに役立つフィクスチャが含まれています。
詳細については、テストユーザーガイドを参照してください。
トランスポート
amqps://
を指定してSSLを要求できるようになりました。Redisトランスポート:Redisトランスポートが:setting: `broker_use_ssl` オプションをサポートするようになりました。
Robert Kolba による寄稿。
JSONシリアライザーは、サポートされていない型に対して
obj.__json__
を呼び出すようになりました。これは、組み込みのjson型に縮小できるカスタム型の
__json__
メソッドを定義できることを意味します。例:
class Person: first_name = None last_name = None address = None def __json__(self): return { 'first_name': self.first_name, 'last_name': self.last_name, 'address': self.address, }
JSONシリアライザーは、日時、Django promise、UUID、およびDecimalを処理するようになりました。
新しい
Queue.consumer_arguments
は、x-priority
を介して消費者の優先順位を設定する機能に使用できます。https://www.rabbitmq.com/consumer-priority.htmlを参照してください
例:
consumer = Consumer(channel, consumer_arguments={'x-priority': 3})
キュー/交換:
no_declare
オプションが追加されました(内部amqでも有効になっています)。 交換)。
プログラム
Celeryは、
optparse
の代わりにargparse
を使用するようになりました。制御端末がTTYでない場合、すべてのプログラムが色を無効にするようになりました。
セロリワーカー:
-q
引数は、スタートアップバナーを無効にするようになりました。セロリワーカー:「ワーカー準備完了」メッセージは、警告ではなく重大度情報を使用してログに記録されるようになりました。
セロリマルチ:
%n
の形式は、セロリワーカーと一致するように%N
と同義になりました。celery inspect / celery control :新しい
--json
オプションをサポートして、json形式で出力を提供するようになりました。登録済みのセロリ検査:組み込みタスクを無視するようになりました。
セロリパージは、パージに含めるキューとパージから除外するキューを指定するために使用される
-Q
および-X
オプションを使用するようになりました。新しい celery logtool :celeryworkerログファイルをフィルタリングおよび解析するためのユーティリティ
celery multi :%i および%I ログファイル形式を通過するようになりました。
一般:
%p
を使用して、log-file / pid-file引数で完全なワーカーノード名に展開できるようになりました。- 新しいコマンドラインオプション
--executable
は、プログラムのデーモン化に使用できるようになりました(セロリワーカーおよびセロリビート)。Bert Vanderbauwhede による寄稿。
セロリワーカー:新しい
--prefetch-multiplier
オプションをサポートします。MickaëlPenhardによる寄稿。
--loader
引数は、アプリの引数が設定されている場合でも常に有効になりました(問題#3405)。inspect / controlがレジストリからコマンドを取得するようになりました
これは、ユーザーのリモートコントロールコマンドもコマンドラインから使用できることを意味します。
コマンドラインで引数を正しく渡すには、引数/および引数のタイプを指定する必要があることに注意してください。
コマンドのタイプに応じて使用する2つのデコレータがあります: @inspect_command + @control_command :
from celery.worker.control import control_command @control_command( args=[('n', int)] signature='[N=1]', ) def something(state, n=1, **kwargs): ...
ここで、
args
は、コマンドでサポートされている引数のリストです。 リストには、(argument_name, type)
のタプルが含まれている必要があります。signature
は、たとえばで使用されるコマンドラインヘルプです。celery -A proj control --help
。コマンドは variadic 引数もサポートします。これは、残った引数が1つの変数に追加されることを意味します。 ここでは、シグナル引数と可変数のtask_idsを受け取る
terminate
コマンドによって示されています。from celery.worker.control import control_command @control_command( args=[('signal', str)], signature='<signal> [id1, [id2, [..., [idN]]]]', variadic='ids', ) def terminate(state, signal, ids, **kwargs): ...
このコマンドは、次を使用して呼び出すことができます。
$ celery -A proj control terminate SIGKILL id1 id2 id3`
詳細については、独自のリモコンコマンドの作成を参照してください。
ワーカー
LimitedSet
の改善と修正。メモリリークを取り除く+セットの
minlen
サイズを追加する:しばらく操作した後のセットの最小残存サイズ。minlen
アイテムは、有効期限が切れているはずのアイテムでも保持されます。古いコードとさらに古いコードの問題:
一部のシナリオ(アイテムを複数回追加するなど)では、ヒープが大きくなる傾向があります。
多くのアイテムをすばやく追加しても、(あったとしても)すぐにそれらをクリーンアップすることはできません。
他のワーカーと話しているときに、revoked._dataが送信されましたが、反対側で反復可能として処理されました。 つまり、これらのキーに新しい(現在の)タイムスタンプを付けることを意味します。 これを行うことにより、労働者はアイテムを永久にリサイクルすることができます。 1)と2)を組み合わせると、これは、大規模なワーカーセットでは、すぐにメモリが不足することを意味します。
これらの問題はすべて今すぐ修正する必要があります。
これにより、問題#3095、#3086が修正されます。
David Pravec による寄稿。
リモートコントロールコマンドキューを制御するための新しい設定。
:setting: `control_queue_expires`
リモートコントロールコマンドキューとリモートコントロール応答キューの両方のキュー有効期限を設定します。
-
リモートコントロールコマンドキューとリモートコントロール応答キューの両方のメッセージの存続時間を設定します。
Alan Justino による寄稿。
:signal: `worker_shutdown` シグナルは、シャットダウン中に常に呼び出されるようになりました。
以前は、ワーカーインスタンスが最初にgcによって収集された場合は呼び出されませんでした。
ワーカーは、使用されるブローカートランスポートが実際にそれらをサポートしている場合にのみ、リモートコントロールコマンドコンシューマーを起動するようになりました。
Gossipは、イベントキューの
x-message-ttl
をheartbeat_intervalに設定するようになりました。 (問題#2005)。終了コードを保持するようになりました(問題#2024)。
無効なETA値を持つメッセージを拒否するようになりました(ackの代わりに、メッセージが構成されている場合、配信不能交換に送信されます)。
-purge
引数が使用されたときのクラッシュを修正しました。回復不能なエラーのログレベルが
error
からcritical
に変更されました。レート制限の精度が向上しました。
タスクの有効期限フィールドで欠落しているタイムゾーン情報を考慮してください。
Albert Wang によって提供された修正。
- ワーカーには、
Queues
ブートステップがなくなりました。 余計な。
- ワーカーには、
取り消されたタスクに対しても「Receivedtask」行を出力するようになりました。 (問題#3155)。
:setting: `broker_connection_retry` 設定を尊重するようになりました。
Nat Williams によって提供された修正。
新しい:setting: `control_queue_ttl` および:setting:` control_queue_expires` 設定により、リモートコントロールコマンドメッセージのTTLとキューの有効期限を構成できるようになりました。
Alan Justino による寄稿。
新しい
celery.worker.state.requests
は、IDによるアクティブ/予約済みタスクのO(1)ルックアップを有効にします。自動スケールは、スケールダウン時にキープアライブを常に更新するとは限りませんでした。
Philip Garnero によって提供された修正。
タイプミス
options_list
->option_list
を修正しました。Greg Wilbur によって提供された修正。
一部のワーカーコマンドライン引数と
Worker()
クラス引数は、一貫性を保つために名前が変更されました。これらはすべて、下位互換性のためのエイリアスを持っています。
--send-events
->--task-events
--schedule
->--schedule-filename
--maxtasksperchild
->--max-tasks-per-child
Beat(scheduler_cls=)
->Beat(scheduler=)
Worker(send_events=True)
->Worker(task_events=True)
Worker(task_time_limit=)
->Worker(time_limit=
)Worker(task_soft_time_limit=)
->Worker(soft_time_limit=)
Worker(state_db=)
->Worker(statedb=)
Worker(working_directory=)
->Worker(workdir=)
デバッグユーティリティ
celery.contrib.rdb
:アドレスを簡単にコピーして貼り付けることができるようにリモートデバッガバナーを変更しました(アドレスにピリオドが含まれなくなりました)。Jonathan Vanasco による寄稿。
最近の:pypi: `psutil` バージョンとの互換性を修正しました(問題#3262)。
信号
アプリ:アプリの構成/ファイナライズの新しいシグナル:
app.on_configure
app.on_after_configure
app.on_after_finalize
タスク:拒否されたタスクメッセージの新しいタスクシグナル:
celery.signals.task_rejected
。celery.signals.task_unknown
。
Worker :ハートビートイベントが送信されたときの新しい信号。
celery.signals.heartbeat_sent
Kevin Richardson による寄稿。
イベント
イベントメッセージは、RabbitMQ
x-message-ttl
オプションを使用して、古いイベントメッセージが確実に破棄されるようになりました。デフォルトは5秒ですが、:setting: `event_queue_ttl` 設定を使用して変更できます。
Task.send_event
は、タスクの公開再試行設定に従って、接続障害時にイベントの送信を自動的に再試行するようになりました。イベントモニターは、デフォルトで:setting: `event_queue_expires` 設定を設定するようになりました。
キューは、モニターがキューからの消費を停止してから60秒後に期限切れになります。
None値が適切に処理されなかったバグを修正しました。
Dongweiming によって提供された修正。
新しい:setting: `event_queue_prefix` 設定を使用して、イベントレシーバーキューのデフォルトの
celeryev
キュープレフィックスを変更できるようになりました。金本武による寄稿。
State.tasks_by_type
およびState.tasks_by_worker
を、この情報への高速アクセスのマッピングとして使用できるようになりました。
展開
汎用init-scriptsは、
CELERY_SU
およびCELERYD_SU_ARGS
環境変数をサポートして、 su (のパスと引数を設定するようになりました。 ] su(1))。一般的なinit-scriptsは、
/usr/local/etc/
で構成ファイルを検索することにより、FreeBSDおよびその他のBSDシステムをより適切にサポートするようになりました。Taha Jahangir による寄稿。
一般的なinit-script:再起動が常に機能しない
celerybeat
の奇妙なバグを修正しました(問題#3018)。systemd initスクリプトは、サービスの実行時にシェルを使用するようになりました。
Tomas Machalek による寄稿。
結果のバックエンド
Redis:デフォルトのソケットタイムアウトが120秒になりました。
デフォルトは、新しい:setting: `redis_socket_timeout` 設定を使用して変更できます。
Raghuram Srinivasan による寄稿。
RPCバックエンドの結果キューがデフォルトで自動削除されるようになりました(問題#2001)。
RPCバックエンド:例外がjsonシリアライザーで適切に逆シリアル化されなかった問題を修正しました(問題#2518)。
Allard Hoeve によって提供された修正。
CouchDB:結果をdouble-jsonエンコードするために使用されるバックエンド。
Andrew Stewart によって提供された修正。
CouchDB:バックエンドが見つからない原因となるタイプミスを修正しました(問題#3287)。
Andrew Stewart によって提供された修正。
MongoDB::setting: `result_serialzier` 設定を
bson
に設定して、MongoDBライブラリ独自のシリアライザーを使用できるようになりました。Davide Quarta による寄稿。
- MongoDB:URI処理が改善されて使用できるようになりました
提供されている場合は、URIからのデータベース名、ユーザー、およびパスワード。
Samuel Jaillet による寄稿。
SQLAlchemy結果バックエンド:NullPoolを使用するときにすべての結果エンジンオプションを無視するようになりました(問題#1930)。
SQLAlchemyの結果バックエンド:脳に損傷を受けたMySQL Unicode実装を処理するために、最大文字サイズを155に設定するようになりました(問題#1748)。
General :すべてのCelery例外/警告は、共通の
CeleryError
/CeleryWarning
から継承するようになりました。 (問題#2643)。
ドキュメントの改善
寄稿者:
- アダム・チェインズ
- アミール・ルスタムザデ
- アーサー・ヴュイヤール
- Batiste Bieler
- バーカーペクサグ
- ブライス・グロフ
- ダニエル・ディヴァイン
- エドワード・ベッツ
- ジェイソンビーチ
- ジェフ・ウィドマン
- Maciej Obuchowski
- マヌエルカウフマン
- マキシムボーケミン
- ミッチェルハンファリー
- パブロ・カピシン
- ピエール・ファーシング
- リック
- スティーブン・スクラール
- Tayfun Sen
- ウィーランドホフマン
再編成、非推奨、および削除
互換性のない変更
プリフォーク:
result.get()
を呼び出すか、タスク内から結果を結合すると、RuntimeError
が発生するようになりました。以前のバージョンでは、これは警告を発していました。
celery.worker.consumer
は、モジュールではなくパッケージになりました。モジュール
celery.worker.job
の名前がcelery.worker.request
に変更されました。ビート:
Scheduler.Publisher
/.publisher
は.Producer
/.producer
に名前が変更されました。結果:
@AsyncResult
のtask_name引数/属性が削除されました。これは歴史的に
pickle
互換性のために使用されていたフィールドでしたが、現在は必要ありません。バックエンド:
status
という名前の引数の名前がstate
に変更されました。バックエンド:
backend.get_status()
の名前がbackend.get_state()
に変更されました。バックエンド:
backend.maybe_reraise()
の名前が.maybe_throw()
に変更されましたpromise APIは.throw()を使用するため、この変更は一貫性を高めるために行われました。
使用可能なエイリアスがあるため、Celery5.0までmaybe_reraiseを引き続き使用できます。
予定外の削除
実験的な
celery.contrib.methods
機能は削除されました。これは、実装に役立つバグが非常に多かったためです。CentOSのinitスクリプトは削除されました。
これらは実際には一般的なinit-scriptsに機能を追加しなかったので、代わりにそれらを使用するか、:pypi: `supervisor` のようなものを使用することをお勧めします。
再編成の非推奨
これらのシンボルの名前は変更されており、下位互換性のためにこのバージョンで使用できるエイリアスがありますが、Celery 5.0で削除されるため、これらのシンボルの名前をできるだけ早く変更して、そのリリースで壊れないようにしてください。
このリストの最初のリストのみを使用する可能性がありますが、次のことはわかりません。
celery.utils.worker_direct
->celery.utils.nodenames.worker_direct()
。celery.utils.nodename
->celery.utils.nodenames.nodename()
。celery.utils.anon_nodename
->celery.utils.nodenames.anon_nodename()
。celery.utils.nodesplit
->celery.utils.nodenames.nodesplit()
。celery.utils.default_nodename
->celery.utils.nodenames.default_nodename()
。celery.utils.node_format
->celery.utils.nodenames.node_format()
。celery.utils.host_format
->celery.utils.nodenames.host_format()
。
予定された削除
モジュール
モジュール
celery.worker.job
はcelery.worker.request
に名前が変更されました。これは内部モジュールであったため、効果はありません。 現在はパブリックAPIの一部であるため、再度変更しないでください。
モジュール
celery.task.trace
は、celery.task
パッケージが段階的に廃止されるため、celery.app.trace
に名前が変更されました。 モジュールはバージョン5.0で削除されるため、インポートを次の場所から変更してください。from celery.task.trace import X
に:
from celery.app.trace import X
celery.loaders
モジュールの古い互換性エイリアスは削除されました。celery.loaders.current_loader()
を削除し、次を使用します:current_app.loader
celery.loaders.load_settings()
を削除し、次を使用します:current_app.conf
結果
AsyncResult.serializable()
およびcelery.result.from_serializable
削除されました:
代わりに使用してください:
>>> tup = result.as_tuple() >>> from celery.result import result_from_tuple >>> result = result_from_tuple(tup)
BaseAsyncResult
を削除し、代わりにAsyncResult
をインスタンスチェックに使用します。TaskSetResult
を削除し、代わりにGroupResult
を使用してください。TaskSetResult.total
->len(GroupResult)
TaskSetResult.taskset_id
->GroupResult.id
ResultSet.subtasks
を削除し、代わりにResultSet.results
を使用してください。
TaskSet
TaskSetは、Celery3.0でgroup
構造に置き換えられたため、削除されました。
このようなコードがある場合:
>>> from celery.task import TaskSet
>>> TaskSet(add.subtask((i, i)) for i in xrange(10)).apply_async()
これを次のように置き換える必要があります。
>>> from celery import group
>>> group(add.s(i, i) for i in xrange(10))()
イベント
クラス
celery.events.state.Worker
の削除:Worker._defaults
属性。{k: getattr(worker, k) for k in worker._fields}
を使用してください。Worker.update_heartbeat
Worker.event(None, timestamp, received)
を使用するWorker.on_online
Worker.event('online', timestamp, received, fields)
を使用するWorker.on_offline
Worker.event('offline', timestamp, received, fields)
を使用するWorker.on_heartbeat
Worker.event('heartbeat', timestamp, received, fields)
を使用する
クラス
celery.events.state.Task
の削除:Task._defaults
属性。{k: getattr(task, k) for k in task._fields}
を使用してください。Task.on_sent
Worker.event('sent', timestamp, received, fields)
を使用するTask.on_received
Task.event('received', timestamp, received, fields)
を使用するTask.on_started
Task.event('started', timestamp, received, fields)
を使用するTask.on_failed
Task.event('failed', timestamp, received, fields)
を使用するTask.on_retried
Task.event('retried', timestamp, received, fields)
を使用するTask.on_succeeded
Task.event('succeeded', timestamp, received, fields)
を使用するTask.on_revoked
Task.event('revoked', timestamp, received, fields)
を使用するTask.on_unknown_event
Task.event(short_type, timestamp, received, fields)
を使用するTask.update
Task.event(short_type, timestamp, received, fields)
を使用するTask.merge
これが必要な場合はお問い合わせください。
魔法のキーワード引数
タスクによって受け入れられる非常に古い魔法のキーワード引数のサポートは、このバージョンで最終的に削除されました。
これらをまだ使用している場合は、古いcelery.decorators
モジュールを使用し、タスクに渡されるキーワード引数に応じて、タスクを書き直す必要があります。次に例を示します。
from celery.decorators import task
@task()
def add(x, y, task_id=None):
print('My task id is %r' % (task_id,))
次のように書き直す必要があります。
from celery import task
@task(bind=True)
def add(self, x, y):
print('My task id is {0.request.id}'.format(self))
削除された設定
次の設定は削除され、サポートされなくなりました。
ロギング設定
設定名 | と置換する |
---|---|
CELERYD_LOG_LEVEL
|
celery worker --loglevel
|
CELERYD_LOG_FILE
|
celery worker --logfile
|
CELERYBEAT_LOG_LEVEL
|
celery beat --loglevel
|
CELERYBEAT_LOG_FILE
|
celery beat --logfile
|
CELERYMON_LOG_LEVEL
|
セレリモンは非推奨です、花を使用してください |
CELERYMON_LOG_FILE
|
セレリモンは非推奨です、花を使用してください |
CELERYMON_LOG_FORMAT
|
セレリモンは非推奨です、花を使用してください |
タスク設定
設定名 | と置換する |
---|---|
CELERY_CHORD_PROPAGATES
|
該当なし |
内部APIの変更
モジュール
celery.datastructures
の名前がcelery.utils.collections
に変更されました。モジュール
celery.utils.timeutils
の名前がcelery.utils.time
に変更されました。celery.utils.datastructures.DependencyGraph
はcelery.utils.graph
に移動しました。celery.utils.jsonify
はcelery.utils.serialization.jsonify()
になりました。celery.utils.strtobool
はcelery.utils.serialization.strtobool()
になりました。celery.utils.is_iterable
は削除されました。代わりに以下を使用してください:
isinstance(x, collections.Iterable)
celery.utils.lpmerge
はcelery.utils.collections.lpmerge()
になりました。celery.utils.cry
はcelery.utils.debug.cry()
になりました。celery.utils.isatty
はcelery.platforms.isatty()
になりました。celery.utils.gen_task_name
はcelery.utils.imports.gen_task_name()
になりました。celery.utils.deprecated
はcelery.utils.deprecated.Callable()
になりましたcelery.utils.deprecated_property
はcelery.utils.deprecated.Property()
になりました。celery.utils.warn_deprecated
はcelery.utils.deprecated.warn()
になりました