Celery 4.0(latentcall)の新機能—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/history/whatsnew-4.0
移動先:案内検索

Celery 4.0の新機能(latentcall)

著者
ソレムに聞く(ask at celeryproject.org

変更履歴

新しいドキュメントではメジャーバージョンの変更について説明しています。バグ修正リリース(0.0.x)の変更を一覧表示する変更履歴もありますが、古いシリーズは履歴セクションにアーカイブされています。 。

Celeryは、膨大な量のメッセージを処理すると同時に、そのようなシステムを維持するために必要なツールを操作に提供する、シンプルで柔軟性があり、信頼性の高い分散システムです。

これは、リアルタイム処理に重点を置いたタスクキューであり、タスクのスケジューリングもサポートしています。

Celeryには、ユーザーと寄稿者の大規模で多様なコミュニティがあります。IRCまたはメーリングリストでに参加する必要があります。

セロリの詳細については、はじめにをお読みください。

このバージョンは以前のバージョンと下位互換性がありますが、次のセクションを読むことが重要です。

このバージョンは、CPython 2.7、3.4、および3.5で正式にサポートされています。 また、PyPyでもサポートされています。

目次

このバージョンにアップグレードする前に、必ず重要な注意事項をお読みください。



序文

セロリ4へようこそ!

これは、2年以上の変更を伴う大規模なリリースです。 多くの新機能が付属しているだけでなく、バグの膨大なリストも修正されているため、多くの点で「SnowLeopard」リリースと呼ぶことができます。

Celeryの次のメジャーバージョンはPython3.5のみをサポートし、新しいasyncioライブラリを利用することを計画しています。

このリリースは、私の雇用主である Robinhood のサポートなしには実現できなかったでしょう(私たちは採用しています!)。

  • ソレムに聞く

セバスチャン「ゼブ」ビョルネルド(RIP)に捧げ、新しいロゴをデザインしてくれた Ty Wilkins 、これを実現するのに貢献したすべての貢献者、そして Robinhood の同僚に感謝します。 。

貢献者の壁

Aaron McMillin、Adam Chainz、Adam Renberg、Adriano Martins de Jesus、Adrien Guinet、Ahmet Demir、AitorGómez-Goiri、Alan Justino、Albert Wang、Alex Koshelev、Alex Rattray、Alex Williams、Alexander Koshelev、Alexander Lebedev、Alexander Oblovatniy、Alexey Kotlyarov、Ali Bozorgkhan、AliceZoëBevan–McGregor、Allard Hoeve、Alman One、Amir Rustamzadeh、Andrea Rabbaglietti、Andrea Rosa、Andrei Fokau、Andrew Rodionoff、Andrew Stewart、Andriy Yurchuk、Aneil Mallavarapu、Areski Belaid、Armenak Baburyan、Art Artyom Koval、Asif Saifuddin Auvi、Ask Solem、Balthazar Rouberol、Batiste Bieler、Berker Peksag、Bert Vanderbauwhede、Brendan Smithyman、Brian Bouterse、Bryce Groff、Cameron Will、ChangBo Guo、Chris Clark、Chris Duryee、Chris Erway、Chris Har Martin、Chillar Anand、Colin McIntosh、Conrad Kramer、Corey Farwell、Craig Jellick、Cullen Rhodes、Dallas Marlow、Daniel Devine、Daniel Wallace、Danilo Bargen、Davanum Srinivas、Dave Smith、David Baumgold、David Harrigan、David P ravec、Dennis Brakhane、Derek Anderson、Dmitry Dygalo、Dmitry Malinovsky、Dongweiming、DudásÁdám、DustinJ。 Mitchell、Ed Morley、Edward Betts、ÉloiRivard、Emmanuel Cazenave、Fahad Siddiqui、Fatih Sucu、Feanil Patel、Federico Ficarelli、Felix Schwarz、Felix Yan、Fernando Rocha、Flavio Grossi、Frantisek Holop、Gao Jiangmiao、George Whewell Gilles Dartiguelongue、Gino Ledesma、Greg Wilbur、Guillaume Seguin、Hank John、Hogni Gylfason、Ilya Georgievsky、Ionel CristianMărieș、Ivan Larin、James Pulec、Jared Lewis、Jason Veatch、Jasper Bryant-Greene、Jeff Widman、Jeremy T 、Jocelyn Delalande、Joe Jevnik、Joe Sanford、John Anderson、John Barham、John Kirkham、John Whitlock、Jonathan Vanasco、Joshua Harlow、JoãoRicardo、Juan Carlos Ferrer、Juan Rossi、Justin Patrin、Kai Groner、Kevin Harvey、Kevin Richardson、 Komu Wairagu、Konstantinos Koukopoulos、Kouhei Maeda、Kracekumar Ramaraju、Krzysztof Bujniewicz、LatitiaM。 Haskins、Len Buckens、Lev Berman、lidongming、Lorenzo Mancini、Lucas Wiman、Luke Pomfrey、Luyun Xie、Maciej Obuchowski、Manuel Kaufmann、Marat Sharafutdinov、Marc Sibson、Marcio Ribeiro、Marin Atanasov Nikolov、Mathieu Fenniak、Mark Parncut Maxime Beauchemin、Maxime Vdb、Mher Movsisyan、Michael Aquilina、Michael Duane Mooring、Michael Permana、MickaëlPenhard、Mike Attwood、Mitchel Humpherys、Mohamed Abouelsaoud、Morris Tweed、Morton Fox、Môshevander Sterre、Nat Williams、Nathan Van Gheem、Nicolas Unravel、Nik Nyby、Omer Katz、Omer Korner、Ori Hoch、Paul Pearce、Paulo Bu、Pavlo Kapyshin、Philip Garnero、Pierre Fersing、Piotr Kilczuk、PiotrMaślanka、Quentin Pradet、Radek Czajka、Raghuram Srinivasan、Randy Barlow RémyLéone、Robert Coup、Robert Kolba、Rockallite Wulf、Rodolfo Carvalho、Roger Hu、Romuald Brunet、Rongze Zhu、Ross Deane、Ryan Luckie、RémyGreinhofer、Samuel Giffard、Samuel Jaillet、Sergey Azovskov、Sergey Tikhonov、Seungha Kim、Simon rs、スペンサーE。 Olson、Srinivas Garlapati、Stephen Milner、Steve Peak、Steven Sklar、Stuart Axon、Sukrit Khera、TadejJanež、Taha Jahangir、Takeshi Kanemoto、Tayfun Sen、Tewfik Sadaoui、Thomas French、Thomas Grainger、Tomas Machalek、Tobias Schottdorf、Tocho Tochev Valentyn Klindukh、Vic Kumar、Vladimir Bolshakov、Vladimir Gorbunov、Wayne Chang、Wieland Hoffmann、Wido den Hollander、Wil Langford、Will Thompson、William King、Yury Selivanov、Vytis Banaitis、Zoran Pavlovic、Xin Li、許邱翔、[X475] :github_user: `allenling` 、:github_user:` alzeih`:github_user: `bastb`:github_user:` bee-keeper`:github_user: `ffeast`:github_user:` firefly4268`:github_user: `flyingfoxlee` 、:github_user:` gdw2` [ X739X]、:github_user: `gitaarik`:github_user:` hankjin`:github_user: `lvh`:github_user:` m -vdb`:github_user: `kindule` 、:github_user:` mdk` [X93 5X]:、:github_user: `michael-k`:github_user:` mozillazg`:github_user: `nokrik`:github_user : `ocean1`:github_user:` orlo666`:github_user: `raducc`:github_user:` wanglei`:github_user: `worldexception`:github_user:` xBeAsTx`

ノート

この壁はgitの履歴から自動的に生成されたため、残念ながら、メーリングリストの質問への回答など、より重要なことを支援する人は含まれていません。


Celery3.1からのアップグレード

ステップ1:Celery3.1.25にアップグレードする

まだ行っていない場合、最初のステップはCelery3.1.25にアップグレードすることです。

このバージョンでは、新しいメッセージプロトコルに上位互換性が追加されているため、3.1から4.0に段階的にアップグレードできます。

3.1.25にアップグレードして、最初にワーカーをデプロイします。これは、これらのワーカーが3.1と4.0の両方を使用してクライアントから送信されたメッセージを処理できることを意味します。

ワーカーがアップグレードされた後、クライアントをアップグレードできます(例: Webサーバー)。


手順2:新しい設定名で構成を更新します

このバージョンでは、構成設定名が大幅に変更され、一貫性が向上しています。

変更には完全な下位互換性があるため、古い設定名が非推奨になるまで待つオプションがありますが、移行を容易にするために、設定を自動的に書き換えるコマンドラインユーティリティが含まれています。

詳細については、小文字の設定名を参照してください。


ステップ3:このドキュメントの重要な注意事項を読む

次のセクションに記載されている重要なアップグレードに関する注意事項の影響を受けていないことを確認してください。

特に重要な注意点は、Celeryがタスクに送信した引数を署名と照合することでチェックするようになったことです(タスク引数のチェック)。


ステップ4:Celery4.0にアップグレードする

この時点で、ワーカーとクライアントを新しいバージョンにアップグレードできます。


重要な注意事項

Python2.6のサポートを終了しました

CeleryにはPython2.7以降が必要になり、Python 3.3のサポートも終了するため、サポートされるバージョンは次のとおりです。

  • CPython 2.7
  • CPython 3.4
  • CPython 3.5
  • PyPy 5.4(pypy2
  • PyPy 5.5-alpha(pypy3


Python2をサポートする最後のメジャーバージョン

Celery 5.0以降、Python3.5以降のみがサポートされます。

この変更の影響を受けないようにするには、要件ファイルのCeleryバージョンを特定のバージョンcelery==4.0.0または範囲celery>=4.0,<5.0に固定する必要があります。

Python 2のサポートを終了すると、大量の互換性コードを削除できるようになります。Python3.5を使用すると、入力、async / await、asyncioなど、古いバージョンに代わるものがない概念を利用できます。

Celery4.xは引き続きPython2.7、3.4、3.5で動作します。 Celery3.xがPython2.6で引き続き機能するように。


Djangoのサポート

Celery4.xにはDjango1.8以降が必要ですが、新しいtransaction.on_commit機能には少なくともDjango1.9を使用することを強くお勧めします。

Djangoからタスクを呼び出すときの一般的な問題は、タスクがモデルの変更に関連していて、トランザクションがロールバックされた場合にタスクをキャンセルするか、変更がデータベースに書き込まれた後にのみタスクが実行されるようにする場合です。

transaction.atomicを使用すると、トランザクションがコミットされたときにのみ呼び出されるコールバックとしてタスクを追加することで、この問題を解決できます。

使用例:

from functools import partial
from django.db import transaction

from .models import Article, Log
from .tasks import send_article_created_notification

def create_article(request):
    with transaction.atomic():
        article = Article.objects.create(**request.POST)
        # send this task only if the rest of the transaction succeeds.
        transaction.on_commit(partial(
            send_article_created_notification.delay, article_id=article.pk))
        Log.objects.create(type=Log.ARTICLE_CREATED, object_pk=article.pk)

削除された機能

  • MicrosoftWindowsはサポートされなくなりました。

    テストスイートは合格であり、CeleryはWindowsで動作しているようですが、このプラットフォームで問題を診断できないため、保証はできません。 このプラットフォームでのサポートが必要な企業の場合は、ご連絡ください。

  • Jythonはサポートされなくなりました。

簡単にするために機能が削除されました

  • Webhookタスク機構(celery.task.http)は削除されました。

    最近では、:pypi: `requests` モジュールを使用してWebhookタスクを手動で作成するのは簡単です。 リクエストを使用したいのですが、Pythonコミュニティには非常に声高な「反依存性」の暴徒がいるため、使用できません。

    下位互換性が必要な場合は、モジュールの3.1バージョンをコピーして貼り付け、ワーカーによってインポートされていることを確認できます: https://github.com/celery/celery/blob/3.1/celery/task/http .py

  • タスクがエラーメールを送信しなくなりました。

    これにより、app.mail_adminsのサポート、および電子メールの送信に関連するすべての機能も削除されます。

  • celery.contrib.batchesは削除されました。

    これは実験的な機能であったため、非推奨のタイムライン保証の対象にはなりませんでした。

    プロジェクト内で使用するために、既存のバッチコードをコピーしてペースを調整できます: https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py


資金不足のために機能が削除されました

3.1リリースで、一部のトランスポートが実験的な状態に移行し、トランスポートに対する公式のサポートがないことを発表しました。

資金調達の必要性に関するこの微妙なヒントが失敗したため、それらを完全に削除し、下位互換性を壊しました。

  • DjangoORMをブローカーとして使用することはサポートされなくなりました。

    結果バックエンドとしてDjangoORMを引き続き使用できます。詳細については、 django-celery-results-結果バックエンドとしてのDjangoORM / Cacheの使用セクションを参照してください。

  • SQLAlchemyをブローカーとして使用することはサポートされなくなりました。

    結果のバックエンドとしてSQLAlchemyを引き続き使用できます。

  • CouchDBをブローカーとして使用することはサポートされなくなりました。

    結果のバックエンドとして引き続きCouchDBを使用できます。

  • IronMQをブローカーとして使用することはサポートされなくなりました。

  • Beanstalkをブローカーとして使用することはサポートされなくなりました。

さらに、一部の機能は完全に削除されているため、それらを使用しようとすると例外が発生します。

  • --autoreload機能は削除されました。

    これは実験的な機能であり、非推奨のタイムライン保証の対象ではありません。 フラグが完全に削除されるため、ワーカーが存在すると起動時にクラッシュします。 幸い、このフラグは本番システムでは使用されていません。

  • 実験的なthreadsプールはサポートされなくなり、削除されました。

  • force_execv機能はサポートされなくなりました。

    celery workerコマンドは、--no-execv--force-execv、およびCELERYD_FORCE_EXECV設定を無視するようになりました。

    このフラグは5.0で完全に削除され、ワーカーはエラーを発生させます。

  • 古いレガシー「amqp」結果バックエンドは非推奨になり、Celery5.0で削除される予定です。

    RPCスタイルの呼び出しにはrpc結果バックエンドを使用し、マルチコンシューマー結果には永続的な結果バックエンドを使用してください。

これらのほとんどは、かなりの労力をかけなくても修正できると考えています。これらの機能のいずれかを元に戻すことに興味がある場合は、ご連絡ください。

さて、朗報です


新しいタスクメッセージプロトコル

このバージョンでは、プロジェクトの開始以来、プロトコルに最初の大きな変更が加えられた、まったく新しいタスクメッセージプロトコルが導入されています。

このバージョンでは、新しいプロトコルがデフォルトで有効になっています。新しいバージョンには下位互換性がないため、アップグレードする際には注意が必要です。

3.1.25バージョンは、新しいプロトコルとの互換性を追加するためにリリースされたため、アップグレードする最も簡単な方法は、最初にそのバージョンにアップグレードしてから、2番目の展開で4.0にアップグレードすることです。

古いプロトコルを引き続き使用する場合は、使用するプロトコルのバージョン番号を構成することもできます。

app = Celery()
app.conf.task_protocol = 1

新しいプロトコルで利用可能な機能の詳細については、このドキュメントの後半にあるニュースセクションを参照してください。


小文字の設定名

美しさを追求するために、すべての設定の名前がすべて小文字に変更され、一貫性を保つために一部の設定名の名前が変更されました。

この変更には完全な下位互換性があるため、大文字の設定名を引き続き使用できますが、できるだけ早くアップグレードしてください。セロリアップグレード設定コマンドを使用して自動的にアップグレードできます。

$ celery upgrade settings proj/settings.py

このコマンドは、新しい小文字の名前を使用するようにモジュールをインプレースで変更し(「CELERY」プレフィックスが付いた大文字が必要な場合は、以下のブロックを参照)、バックアップをproj/settings.py.origに保存します。

大文字の名前を保持したいDjangoユーザーやその他のユーザー向け

Django設定モジュールからCelery構成をロードする場合は、大文字の名前を引き続き使用する必要があります。

また、CELERY_プレフィックスを使用して、Celery設定が他のアプリで使用されるDjango設定と衝突しないようにする必要があります。

これを行うには、最初に設定ファイルを変換して新しい一貫した命名スキームを使用し、Celery関連のすべての設定にプレフィックスを追加する必要があります。

$ celery upgrade settings proj/settings.py --django

設定ファイルをアップグレードした後、proj/celery.pyモジュールでプレフィックスを明示的に設定する必要があります。

app.config_from_object('django.conf:settings', namespace='CELERY')

最新のDjangoCelery統合の例は、 Django の最初のステップにあります。

ノート

これにより、以前はプレフィックスがなかった設定にもプレフィックスが追加されます。たとえば、BROKER_URLCELERY CELERY_BROKER_URLの名前空間でCELERY_BROKER_URLと記述する必要があります。


幸い、セロリのアップグレード設定--django プログラムが正しく機能するため、ファイルを手動で変更する必要はありません。


ローダーは、構成が新しい形式を使用しているかどうかを検出し、それに応じて動作しますが、これは、両方の選択肢に値を指定しない限り、新しい設定名と古い設定名を混在させて一致させることはできないことも意味します。

以前のバージョンとの主な違いは、小文字の名前を除いて、celerybeat_からbeat_celeryd_からworker_などの一部のプレフィックスの名前が変更されていることです。

celery_プレフィックスも削除され、この名前空間のタスク関連の設定のプレフィックスはtask_になり、ワーカー関連の設定のプレフィックスはworker_になります。

これを除けば、いくつかの特別な設定を除いて、ほとんどの設定は小文字で同じになります。

設定名 と置換する
CELERY_MAX_CACHED_RESULTS :setting: `result_cache_max`
CELERY_MESSAGE_COMPRESSION :setting: `result_compression` / :setting:` task_compression`
CELERY_TASK_RESULT_EXPIRES :setting: `result_expires`
CELERY_RESULT_DBURI :setting: `result_backend`
CELERY_RESULT_ENGINE_OPTIONS :setting: `database_engine_options`
-*-_DB_SHORT_LIVED_SESSIONS :setting: `database_short_lived_sessions`
CELERY_RESULT_DB_TABLE_NAMES :setting: `database_db_names`
CELERY_ACKS_LATE :setting: `task_acks_late`
CELERY_ALWAYS_EAGER :setting: `task_always_eager`
CELERY_ANNOTATIONS :setting: `task_annotations`
CELERY_MESSAGE_COMPRESSION :setting: `task_compression`
CELERY_CREATE_MISSING_QUEUES :setting: `task_create_missing_queues`
CELERY_DEFAULT_DELIVERY_MODE :setting: `task_default_delivery_mode`
CELERY_DEFAULT_EXCHANGE :setting: `task_default_exchange`
CELERY_DEFAULT_EXCHANGE_TYPE :setting: `task_default_exchange_type`
CELERY_DEFAULT_QUEUE :setting: `task_default_queue`
CELERY_DEFAULT_RATE_LIMIT :setting: `task_default_rate_limit`
CELERY_DEFAULT_ROUTING_KEY :setting: `task_default_routing_key`
-"-_EAGER_PROPAGATES_EXCEPTIONS :setting: `task_eager_propagates`
CELERY_IGNORE_RESULT :setting: `task_ignore_result`
CELERY_TASK_PUBLISH_RETRY :setting: `task_publish_retry`
CELERY_TASK_PUBLISH_RETRY_POLICY :setting: `task_publish_retry_policy`
CELERY_QUEUES :setting: `task_queues`
CELERY_ROUTES :setting: `task_routes`
CELERY_SEND_TASK_SENT_EVENT :setting: `task_send_sent_event`
CELERY_TASK_SERIALIZER :setting: `task_serializer`
CELERYD_TASK_SOFT_TIME_LIMIT :setting: `task_soft_time_limit`
CELERYD_TASK_TIME_LIMIT :setting: `task_time_limit`
CELERY_TRACK_STARTED :setting: `task_track_started`
CELERY_DISABLE_RATE_LIMITS :setting: `worker_disable_rate_limits`
CELERY_ENABLE_REMOTE_CONTROL :setting: `worker_enable_remote_control`
CELERYD_SEND_EVENTS :setting: `worker_send_task_events`

新しい小文字の設定で変更の完全な表を見ることができます。


Jsonがデフォルトのシリアライザーになりました

ついにデフォルトのシリアル化メカニズムとしてのpickleの統治を終わらせる時が来ました、そしてjsonはこのバージョンから始まるデフォルトのシリアライザーです。

この変更は、Celery 3.1 のリリースで発表されました。

pickleがデフォルトのシリアライザーであることに依然依存している場合は、4.0にアップグレードする前にアプリを構成する必要があります。

task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = {'pickle'}

Jsonシリアライザーは、いくつかの追加タイプもサポートするようになりました。

  • datetimetimedate

    ISO-8601形式のjsonテキストに変換されます。

  • Decimal

    jsonテキストに変換されます。

  • django.utils.functional.Promise

    Djangoのみ:翻訳などに使用される遅延文字列が評価され、json型への変換が試行されます。

  • uuid.UUID

    jsonテキストに変換されます。

カスタムクラスで__json__メソッドを定義して、JSONシリアル化をサポートすることもできます(json互換タイプを返す必要があります)。

class Person:
    first_name = None
    last_name = None
    address = None

    def __json__(self):
        return {
            'first_name': self.first_name,
            'last_name': self.last_name,
            'address': self.address,
        }

Task基本クラスはタスクを自動的に登録しなくなりました

@Taskクラスは、タスクをタスクレジストリに自動的に登録する特別なメタクラスを使用しなくなりました。

代わりに、これは@taskデコレータによって処理されるようになりました。

クラスベースのタスクをまだ使用している場合は、これらを手動で登録する必要があります。

class CustomTask(Task):
    def run(self):
        print('running')
CustomTask = app.register_task(CustomTask())

ベストプラクティスは、一般的な動作をオーバーライドするためにのみカスタムタスククラスを使用し、次にタスクデコレータを使用してタスクを実現することです。

@app.task(bind=True, base=CustomTask)
def custom(self):
    print('running')

この変更は、タスクのabstract属性が無効になったことも意味します。


タスク引数のチェック

タスクを呼び出すときに、非同期でもタスクの引数が検証されるようになりました。

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

typing属性をFalseに設定することにより、任意のタスクの引数チェックを無効にできます。

>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

または、すべてのタスクでこれを完全に無効にしたい場合は、アプリの作成時にstrict_typing=Falseを渡すことができます。

app = Celery(..., strict_typing=False)

Redisイベントには下位互換性がありません

Redis fanout_patternsおよびfanout_prefixトランスポートオプションがデフォルトで有効になりました。

これらのフラグが有効になっていないワーカー/モニターは、このフラグが無効になっているワーカーを表示できません。 彼らはまだタスクを実行できますが、お互いに監視メッセージを受信することはできません。

4.0への最終アップグレードの前に、最初に3.1ワーカーとモニターを構成して設定を有効にすることにより、下位互換性のある方法でアップグレードできます。

BROKER_TRANSPORT_OPTIONS = {
    'fanout_patterns': True,
    'fanout_prefix': True,
}

Redisの優先順位が逆になりました

優先度0が最低になり、9が最高になります。

この変更は、優先サポートをAMQPでの動作と一致させるために行われました。

Alex Koshelev による寄稿。


Django:自動検出でDjangoアプリの構成がサポートされるようになりました

autodiscover_tasks()関数を引数なしで呼び出すことができるようになり、Djangoハンドラーがインストール済みのアプリを自動的に検出します。

app.autodiscover_tasks()

ドキュメントのDjango統合の例は、引数なしの呼び出しを使用するように更新されました。

これにより、最近のDjangoバージョンで導入された新しいAppConfigのものとの互換性も保証されます。


ワーカーの直接キューは自動削除を使用しなくなりました

4.0を実行しているワーカー/クライアントは、古いバージョンを実行しているワーカーにワーカーダイレクトメッセージを送信できなくなります。その逆も同様です。

ワーカーダイレクトメッセージに依存している場合は、celery.utils.worker_direct()を次の実装に置き換えて、最初に新しいルーティング設定を使用するように3.xワーカーとクライアントをアップグレードする必要があります。

from kombu import Exchange, Queue

worker_direct_exchange = Exchange('C.dq2')

def worker_direct(hostname):
    return Queue(
        '{hostname}.dq2'.format(hostname),
        exchange=worker_direct_exchange,
        routing_key=hostname,
    )

この機能により、問題#2492がクローズされました。


古いコマンドラインプログラムが削除されました

Celeryをインストールしても、celerydcelerybeat、およびceleryd-multiプログラムはインストールされなくなります。

これはCelery3.1のリリースで発表されましたが、古い名前を指すスクリプトがまだ残っている可能性があるため、新しいアンブレラコマンドを使用するようにこれらを更新してください。

プログラム 新しいステータス 置換
celeryd NS セロリ労働者
celerybeat NS セロリビート
celeryd-multi NS セロリマルチ


ニュース

新しいプロトコルのハイライト

新しいプロトコルは、古いプロトコルの多くの問題を修正し、長い間要求されていた機能を有効にします。

  • ほとんどのデータは、メッセージ本文でシリアル化されるのではなく、メッセージヘッダーとして送信されるようになりました。

    プロトコルのバージョン1では、タスクID、名前などのタスクメタデータを読み取れるように、ワーカーは常にメッセージを逆シリアル化する必要がありました。 これは、ワーカーがデータを二重にデコードすることを余儀なくされたことも意味します。最初に受信時にメッセージを逆シリアル化し、メッセージを再度シリアル化して子プロセスに送信し、最後に子プロセスがメッセージを再度逆シリアル化します。

    メッセージヘッダーにメタデータフィールドを保持することは、ワーカーがタスクを子プロセスに配信する前にペイロードを実際にデコードする必要がないことを意味します。また、ワーカーがとは異なる言語で記述されたタスクを再ルーティングできるようになりました。別のワーカーへのPython。

  • 新しいlangメッセージヘッダーを使用して、タスクが記述されているプログラミング言語を指定できます。

  • ワーカーは、ContentDisallowedなどの内部エラーやその他の逆シリアル化エラーの結果を保存します。

  • ワーカーは結果を保存し、未登録のタスクエラーの監視イベントを送信します。

  • 結果が親プロセスによって送信された場合でも、ワーカーはコールバック/エラーバックを呼び出します(たとえば、子プロセスが終了したときのWorkerLostError、逆シリアル化エラー、未登録タスク)。

  • 新しいoriginヘッダーには、タスクを送信するプロセスに関する情報(ワーカーノード名、またはPIDとホスト名の情報)が含まれます。

  • 新しいshadowヘッダーを使用すると、ログで使用されるタスク名を変更できます。

    これは、pickleを使用して関数を呼び出すタスクのようなパターンのようなディスパッチに役立ちます(自宅ではこれを行わないでください)。

    from celery import Task
    from celery.utils.imports import qualname
    
    class call_as_task(Task):
    
        def shadow_name(self, args, kwargs, options):
            return 'call_as_task:{0}'.format(qualname(args[0]))
    
        def run(self, fun, *args, **kwargs):
            return fun(*args, **kwargs)
    call_as_task = app.register_task(call_as_task())
  • 新しいargsreprおよびkwargsreprフィールドには、ログやモニターなどで使用するためのタスク引数(切り捨てられる可能性があります)のテキスト表現が含まれています。

    これは、情報提供の目的でタスク引数を表示するために、ワーカーがメッセージペイロードを逆シリアル化する必要がないことを意味します。

  • チェーンは専用のchainフィールドを使用するようになり、数千以上のタスクのチェーンをサポートできるようになりました。

  • 新しいparent_idおよびroot_idヘッダーは、他のタスクとのタスクの関係に関する情報を追加します。

    • parent_idは、このタスクを呼び出したタスクのタスクIDです。

    • root_idは、ワークフローの最初のタスクです。

    これらのフィールドを使用して、花などのモニターを改善し、関連するメッセージ(チェーン、グループ、コード、完全なワークフローなど)をグループ化できます。

  • app.TaskProducer@amqp.create_task_message()および@amqp.send_task_message()に置き換えられました。

    責任を作成と送信に分割するということは、Python AMQPクライアントを使用してメッセージを直接送信したい人は、プロトコルを実装する必要がないことを意味します。

    @amqp.create_task_message()メソッドは、構成されたタスクプロトコルに応じて、@amqp.as_task_v2()または@amqp.as_task_v1()のいずれかを呼び出し、ヘッダー、プロパティ、および本体を含む特別なtask_messageタプルを返します。タスクメッセージ。

も参照してください

新しいタスクプロトコルは、バージョン2 で完全に文書化されています。


プレフォークプールの改善

タスクが子プロセスからログに記録されるようになりました

タスクの成功/失敗のログは、タスクを実行している子プロセスから発生するようになりました。 その結果、Sentryなどのロギングユーティリティは、トレースバックスタック内の変数を含むタスクに関する完全な情報を取得できます。


-Ofairがデフォルトのスケジューリング戦略になりました

3.1でデフォルトの動作を再度有効にするには、-Ofastコマンドラインオプションを使用します。

-Ofairコマンドラインオプションの機能については多くの混乱があり、説明で「プリフェッチ」という用語を使用しても、この用語がAMQPでどれほど混乱しているのかを考えると、おそらく役に立たなかったでしょう。

preforkプールを使用しているCeleryワーカーがタスクを受信すると、そのタスクを子プロセスに委任して実行する必要があります。

プリフォークプールには、タスクの実行に使用できる構成可能な数の子プロセス(--concurrency)があり、各子プロセスはパイプ/ソケットを使用して親プロセスと通信します。

  • インキュー(パイプ/ソケット):親が子プロセスにタスクを送信します
  • アウトキュー(パイプ/ソケット):子は結果/戻り値を親に送信します。

Celery 3.1では、デフォルトのスケジューリングメカニズムは、書き込み可能な最初のinqueueにタスクを送信するだけでした。いくつかのヒューリスティックを使用して、各子プロセスが同じ量のタスクを受け取るように、それらの間でラウンドロビンを実行します。 。

これは、デフォルトのスケジューリング戦略では、ワーカーがすでにタスクを実行しているのと同じ子プロセスにタスクを送信できることを意味します。 そのタスクが長時間実行されている場合、待機中のタスクを長時間ブロックする可能性があります。 さらに悪いことに、自由に作業できる子プロセスがある場合でも、何百もの短期間のタスクが長期のタスクの背後に立ち往生する可能性があります。

この状況を回避するために-Ofairスケジューリング戦略が追加され、有効にすると、すでにタスクを実行している子プロセスにタスクを送信しないというルールが追加されます。

実行時間の短いタスクしかない場合、公平なスケジューリング戦略のパフォーマンスがわずかに低下する可能性があります。


子プロセスの常駐メモリサイズを制限する

ワーカー--max-memory-per-childオプション、または:setting: `worker_max_memory_per_child` 設定を設定することにより、プリフォークプールの子プロセスごとに割り当てられるメモリの最大量を制限できるようになりました。

制限はRSS /常駐メモリサイズ用であり、キロバイト単位で指定されます。

制限を超えた子プロセスは、現在実行中のタスクが戻った後、終了して新しいプロセスに置き換えられます。

詳細については、子あたりの最大メモリ設定を参照してください。

Dave Smith による寄稿。


子プロセスごとに1つのログファイル

Init-scripsおよび celery multi は、%I ログファイル形式オプション(/var/log/celery/%n%I.logなど)を使用するようになりました。

この変更は、タスクログを子プロセスに移動した後に各子プロセスが個別のログファイルを持つようにするために必要でした。同じログファイルに書き込む複数のプロセスが破損を引き起こす可能性があるためです。

この新しいオプションを使用するには、init-scriptsと celery multi 引数をアップグレードすることをお勧めします。


トランスポート

RabbitMQ優先キューのサポート

詳細については、 RabbitMQメッセージの優先度を参照してください。

Gerald Manipon による寄稿。


読み取り/書き込み用にブローカーURLを個別に構成する

新しい:setting: `broker_read_url` および:setting:` broker_write_url` 設定が追加され、消費/公開に使用される接続に個別のブローカーURLを提供できるようになりました。

構成オプションに加えて、2つの新しいメソッドがアプリAPIに追加されました。

  • app.connection_for_read()
  • app.connection_for_write()


app.connection()の代わりにこれらを使用して、必要な接続の目的を指定する必要があります。

ノート

app.pool(読み取り)とapp.producer_pool(書き込み)の2つの接続プールを使用できます。 後者は実際には接続を提供しませんが、完全なkombu.Producerインスタンスを提供します。

def publish_some_message(app, producer=None):
    with app.producer_or_acquire(producer) as producer:
        ...

def consume_messages(app, connection=None):
    with app.connection_or_acquire(connection) as connection:
        ...

RabbitMQキュー拡張機能のサポート

message_ttlおよびexpires引数を使用して、キュー宣言でメッセージTTLとキューの有効期限を直接設定できるようになりました。

Queueに新しい引数が追加され、キュー宣言でRabbitMQキュー拡張を直接かつ便利に構成できるようになりました。

  • Queue(expires=20.0)

    キューの有効期限をfloat秒で設定します。

    kombu.Queue.expiresを参照してください。

  • Queue(message_ttl=30.0)

    キューメッセージの存続可能時間float秒を設定します。

    kombu.Queue.message_ttlを参照してください。

  • Queue(max_length=1000)

    キューの最大長(メッセージ数)をintとして設定します。

    kombu.Queue.max_lengthを参照してください。

  • Queue(max_length_bytes=1000)

    キューの最大長(メッセージサイズの合計(バイト単位))をintとして設定します。

    kombu.Queue.max_length_bytesを参照してください。

  • Queue(max_priority=10)

    メッセージのpriorityフィールドに基づいてメッセージをルーティングする優先キューとしてキューを宣言します。

    kombu.Queue.max_priorityを参照してください。


AmazonSQSトランスポートが正式にサポートされるようになりました

SQSブローカートランスポートは非同期I / Oを使用するように書き直されており、公式にサポートされているトランスポートとしてRabbitMQ、Redis、およびQPidに参加しています。

新しい実装はまた、長いポーリングを利用し、ブローカーとしてのSQSの使用に関連するいくつかの問題を解決します。

この作品はNextdoorによって後援されました。


ApacheQPidトランスポートが正式にサポートされるようになりました

Brian Bouterse による寄稿。


Redis:Sentinelのサポート

次のようなセンチネルURLのリストへの接続を指定できます。

sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

ここで、各歩哨は; で区切られています。 複数の番兵はkombu.Connectionコンストラクターによって処理され、接続に失敗した場合に接続するサーバーの代替リストに配置されます。

Sergey Azovskov 、および Lorenzo Mancini による寄稿。


タスク

タスク自動再試行デコレータ

例外イベントのカスタム再試行処理を作成することは非常に一般的であるため、サポートが組み込まれています。

このため、新しいautoretry_for引数がタスクデコレータでサポートされるようになりました。ここで、例外のタプルを指定して、次のことを自動的に再試行できます。

from twitter.exceptions import FailWhaleError

@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

詳細については、既知の例外の自動再試行を参照してください。

Dmitry Malinovsky による寄稿。


Task.replaceの改善

  • self.replace(signature)は、任意のタスク、コード、またはグループを置き換えることができるようになりました。置き換える署名は、コード、グループ、またはその他のタイプの署名にすることができます。

  • 既存のタスクのコールバックとエラーバックを継承しなくなりました。

    ツリー内のノードを置き換える場合、新しいノードが古いノードの子を継承することは期待できません。

  • Task.replace_in_chordは削除されました。代わりに.replaceを使用してください。

  • 置換がグループの場合、そのグループは自動的にコードに変換され、コールバックはグループタスクの結果を「蓄積」します。

    新しい組み込みタスク( celery.accumulate がこの目的のために追加されました)

Steeve Morin 、および Ask Solem による寄稿。


リモートタスクのトレースバック

新しい:setting: `task_remote_tracebacks` は、リモートワーカーのスタックを挿入することにより、タスクのトレースバックをより便利にします。

この機能には、追加の:pypi: `tblib` ライブラリが必要です。

IonelCristianMărieș による寄稿。


タスク接続エラーの処理

タスクの送信中に発生した接続関連のエラーは、kombu.exceptions.OperationalErrorエラーとして再発生するようになりました。

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     print('Could not send task %r: %r' % (add, exc))

詳細については、接続エラー処理を参照してください。


Gevent / Eventlet:結果を消費するための専用スレッド

:pypi: `gevent` 、または:pypi:` eventlet` を使用する場合、イベントの消費を担当する単一のスレッドが存在するようになりました。

これは、結果を取得する呼び出しが多数ある場合、それらを消費するための専用スレッドがあることを意味します。

result = add.delay(2, 2)

# this call will delegate to the result consumer thread:
#   once the consumer thread has received the result this greenlet can
# continue.
value = result.get(timeout=3)

これにより、gevent / eventletを使用する場合のRPC呼び出しの実行が大幅に向上します。


AsyncResult.then(on_success, on_error)

AsyncResult APIは、promiseプロトコルをサポートするように拡張されました。

これは現在、RPC(amqp)とRedisの結果バックエンドでのみ機能しますが、タスクが終了したときにコールバックをアタッチできます。

import gevent.monkey
monkey.patch_all()

import time
from celery import Celery

app = Celery(broker='amqp://', backend='rpc')

@app.task
def add(x, y):
    return x + y

def on_result_ready(result):
    print('Received result for id %r: %r' % (result.id, result.result,))

add.delay(2, 2).then(on_result_ready)

time.sleep(3)  # run gevent event loop for a while.

ここでは:pypi: `gevent` を使用して示されていますが、実際には、これは:pypi:` twisted`などのコールバックベースのイベントループでより役立つAPIです。 pypi: `tornado`


新しいタスクルーターAPI

:setting: `task_routes` 設定で関数を保持できるようになり、マップルートでグロブパターンと正規表現がサポートされるようになりました。

ルータークラスを使用する代わりに、関数を簡単に定義できるようになりました。

def route_for_task(name, args, kwargs, options, task=None, **kwargs):
    from proj import tasks

    if name == tasks.add.name:
        return {'queue': 'hipri'}

開始引数を使用できる引数が必要ない場合は、将来的に機能を追加できるように、常にスター引数も受け入れるようにしてください。

def route_for_task(name, *args, **kwargs):
    from proj import tasks
    if name == tasks.add.name:
        return {'queue': 'hipri', 'priority': 9}

options引数と新しいtaskキーワード引数はどちらも関数型ルーターの新機能であり、実行オプションまたはタスクのプロパティに基づいてルーターを簡単に作成できるようになります。

@send_task()を使用してタスクが名前で呼び出された場合、オプションのtaskキーワード引数は設定されません。

ルーターでのglob / regexesの使用など、その他の例については、:setting: `task_routes` および自動ルーティングを参照してください。


キャンバスリファクタリング

キャンバス/ワークフローの実装は、いくつかの長い未解決の問題を修正するために大幅にリファクタリングされました。

  • エラーコールバックは、実際の例外インスタンスとトレースバックインスタンスを受け取ることができるようになりました(問題#2538)。

    >>> add.s(2, 2).on_error(log_error.s()).delay()

    ここで、log_errorは次のように定義できます。

    @app.task
    def log_error(request, exc, traceback):
        with open(os.path.join('/var/errors', request.id), 'a') as fh:
            print('--\n\n{0} {1} {2}'.format(
                task_id, exc, traceback), file=fh)

    その他の例については、 Canvas:ワークフローの設計を参照してください。

  • chain(a, b, c)a | b | cと同じように機能するようになりました。

    これは、チェーンがchainのインスタンスを返さなくなる可能性があることを意味します。代わりに、ワークフローを最適化して、たとえば 連鎖した2つのグループが1つのグループになります。

  • グループ内のグループを1つのグループに展開するようになりました(問題#1509)。

  • チャンク/マップ/スターマップタスクは、ターゲットタスクに基づいてルーティングされるようになりました

  • コードとチェーンは不変になりました。

  • シリアル化された署名が署名に変換されないバグを修正しました(問題#2078)

    Ross Deane によって提供された修正。

  • JSONシリアル化を使用するとチェーンとグループが機能しなかった問題を修正しました(問題#2076)。

    Ross Deane によって提供された修正。

  • コードを作成しても、キーワード引数 'task_id'に複数の値が表示されなくなりました(問題#2225)。

    Aneil Mallavarapu によって提供された修正。

  • チェーンに最後から2番目のタスクとしてコードが含まれている場合に間違った結果が返される問題を修正しました。

    Aneil Mallavarapu によって提供された修正。

  • group(A.s() | group(B.s() | C.s()))の特殊なケースが機能するようになりました。

  • チェーン:サブタスクもチェーンである場合に誤ったIDが設定されるバグを修正しました。

  • group | groupが1つのグループにフラット化されるようになりました(問題#2573)。

  • group | taskが正しくコードにアップグレードされなかった問題を修正しました(問題#2922)。

  • Chordsはresult.parentリンクを正しく設定するようになりました。

  • chunks / map / starmapは、ターゲットタスクに基づいてルーティングされるようになりました。

  • Signature.linkは、引数がスカラー(リストではない)の場合に機能するようになりました

    (問題#2019)。

  • group()は、キーワード引数を適切に転送するようになりました(問題#3426)。

    Samuel Giffard によって提供された修正。

  • ヘッダーグループが単一のタスクのみで構成されるchordは、単純なチェーンになりました。

  • link引数をgroup.apply_async()に渡すと、エラーが発生するようになりました(問題#3508)。

  • chord | sigがコードコールバックにアタッチされるようになりました(問題#3356)。


定期的なタスク

定期的なタスクを構成するための新しいAPI

この新しいAPIを使用すると、定期的なタスクを定義するときに署名を使用できるため、タスク名の入力ミスの可能性がなくなります。

新しいAPIの例は、こちらです。


最適化されたBeatの実装

セロリビートの実装は、ヒープを使用してエントリをスケジュールすることにより、何百万もの定期的なタスク用に最適化されています。

Ask Solem および Alexander Koshelev による寄稿。


日の出、日の入り、夜明け、夕暮れに基づいてタスクをスケジュールします

詳細については、ソーラースケジュールを参照してください。

Mark Parncutt による寄稿。


結果のバックエンド

RPC結果バックエンドが成熟しました

以前に実験的なRPC結果バックエンドの多くのバグが修正され、本番環境での使用を検討できるようになりました。

Ask SolemMorris Tweed による寄稿。


Redis:結果のバックエンドの最適化

result.get()はストリーミングタスクの結果にpub / subを使用するようになりました

Redis結果バックエンドを使用するときにresult.get()を呼び出すと、結果が利用可能になるのを待つためにポーリングを使用していたため、非常にコストがかかりました。 デフォルトのポーリング間隔0.5秒はパフォーマンスに役立ちませんでしたが、スピンループを回避するために必要でした。

新しい実装では、Redis Pub / Subメカニズムを使用して結果をすぐにパブリッシュおよび取得し、タスクのラウンドトリップ時間を大幅に改善しています。

Yaroslav Zhavoronkov および Ask Solem による寄稿。


新しい最適化されたコード結合の実装

これはCelery3.1で導入された実験的な機能であり、結果のバックエンドURL構成に?new_join=1を追加することによってのみ有効にできました。

実装は十分にテストされており、安定していると見なされ、デフォルトで有効になっていると思われます。

新しい実装により、コードのオーバーヘッドが大幅に削減されます。特に、コードが大きい場合、パフォーマンス上のメリットは非常に大きくなります。


新しいRiak結果バックエンドが導入されました

詳細については、 conf-riak-result-backend を参照してください。

Gilles DartiguelongueAlman OneNoKriK による寄稿。


新しいCouchDB結果バックエンドが導入されました

詳細については、 CouchDBバックエンド設定を参照してください。

Nathan Van Gheem による寄稿。


新しい領事結果バックエンドが導入されました

ConsulのKey / Valueストアを使用して、バックエンドとしてConsulのサポートを追加します。

ConsulにはHTTPAPIがあり、キーをその値とともに保存できます。

バックエンドはKeyValueStoreBackendを拡張し、ほとんどのメソッドを実装します。

主にオブジェクトを設定、取得、削除します。

これにより、Celeryはタスクの結果を領事のK / Vストアに保存できます。

領事は、領事からのセッションを使用してキーにTTLを設定することもできます。 このようにして、バックエンドはタスク結果の自動有効期限をサポートします。

領事の詳細については、 https://consul.io/をご覧ください。

バックエンドは、HTTP APIとの通信に:pypi: `python-consul` を使用します。 このバックエンドが次のように、このパッケージはPython3に完全に準拠しています。

$ pip install python-consul

これにより、PythonからConsulのHTTPAPIと通信するために必要なパッケージがインストールされます。

Celeryへの依存関係の拡張機能としてconsulを指定することもできます。

$ pip install celery[consul]

詳細については、バンドルを参照してください。

Wido den Hollander による寄稿。


真新しいカサンドラ結果バックエンド

新しい:pypi: `cassandra-driver` ライブラリを利用する新しいCassandraバックエンドは、古い:pypi:` pycassa` ライブラリを使用する古い結果バックエンドを置き換えています。

詳細については、 Cassandraバックエンド設定を参照してください。

結果としてバックエンドの使用としてCassandraでCeleryに依存するには:

$ pip install celery[cassandra]

複数の拡張要件を組み合わせることもできます。詳細については、バンドルを参照してください。


新しいElasticsearch結果バックエンドが導入されました

詳細については、 Elasticsearchバックエンド設定を参照してください。

結果としてElasticsearchでCeleryに依存するには、次のように使用します。

$ pip install celery[elasticsearch]

複数の拡張要件を組み合わせることもできます。詳細については、バンドルを参照してください。

Ahmet Demir による寄稿。


新しいファイルシステム結果バックエンドが導入されました

詳細については、ファイルシステムバックエンド設定を参照してください。

MôshevanderSterreによる寄稿。


イベントのバッチ処理

イベントはワーカーにバッファリングされ、リストとして送信されるようになり、監視イベントの送信に必要なオーバーヘッドが削減されます。

カスタムイベントモニターの作成者の場合、Python Celeryヘルパー(Receiver)を使用してモニターを実装している限り、アクションは必要ありません。

ただし、生のイベントメッセージを解析する場合は、次の点で通常のイベントメッセージとは異なるため、バッチイベントメッセージを考慮する必要があります。

  • イベントメッセージのバッチのルーティングキーは<event-group>.multiに設定され、バッチ処理されたイベントグループは現在taskのみです(task.multiのルーティングキーを指定)。
  • メッセージ本文は、辞書ではなく、シリアル化された辞書のリストになります。 リスト内の各項目は、通常のイベントメッセージ本文と見なすことができます。


ほかのニュースでは…

要件

  • 昆布4.0 に依存するようになりました。
  • 現在、:pypi: `billiard` バージョン3.5に依存しています。
  • :pypi: `anyjson` に依存しなくなりました。 さようなら古くからの友人:(


タスク

  • 「anon-exchange」は、単純な名前と名前の直接ルーティングに使用されるようになりました。

    これにより、ルーティングテーブルが完全にバイパスされるため、パフォーマンスが向上します。さらに、Redisブローカートランスポートの信頼性も向上します。

  • 空のResultSetはTrueと評価されるようになりました。

    Colin McIntosh によって提供された修正。

  • デフォルトのルーティングキー(:setting: `task_default_routing_key` )と交換名(:setting:` task_default_exchange` )は、:setting: `task_default_queue` [から取得されるようになりました。 X188X]設定。

    つまり、デフォルトキューの名前を変更するには、1つの設定を設定するだけで済みます。

  • 新しい:setting: `task_reject_on_worker_lost` 設定、およびreject_on_worker_lostタスク属性は、遅延ackタスクを実行している子ワーカープロセスが終了したときに何が起こるかを決定します。

    Michael Permana による寄稿。

  • Task.subtaskはエイリアス付きのTask.signatureに名前が変更されました。

  • Task.subtask_from_requestはエイリアス付きのTask.signature_from_requestに名前が変更されました。

  • kombu.Queuedelivery_mode属性が尊重されるようになりました(問題#1953)。

  • :setting: `task-routes` のルートで、Queueインスタンスを直接指定できるようになりました。

    例:

    task_routes = {'proj.tasks.add': {'queue': Queue('add')}}
  • task_idがNoneの場合、AsyncResultValueErrorを発生させるようになりました。 (問題#1996)。

  • 再試行されたタスクは、有効期限の設定を転送しませんでした(問題#3297)。

  • result.get()は、on_message引数をサポートして、受信したすべてのメッセージに対して呼び出されるコールバックを設定するようになりました。

  • 追加された新しい抽象クラス:

    • CallableTask

      タスクのように見えます。

    • CallableSignature

      タスクの署名のように見えます。


  • Task.replaceは、コールバックを適切に転送するようになりました(問題#2722)。

    Nicolas Unravel によって提供された修正。

  • Task.replace:チェーン/コードに追加(#3232を閉じる)

    チェーンに署名を追加する問題#3232を修正しました(存在する場合)。 指定された署名にコードが含まれている場合のコード抑制を修正しました。

    :github_user: `honux` によって提供された修正。

  • タスクの再試行もイーガーモードでスローされるようになりました。

    Feanil Patel によって提供された修正。


ビート

  • 日付が無効なcrontab無限ループを修正しました。

    オカレンスに到達できない場合(たとえば、4月31日)、次のオカレンスに到達しようとすると、無限ループがトリガーされます。

    2,000回の反復後にRuntimeErrorを上げて、それを修正してみてください

    (また、プロセスでcrontabうるう年のテストを追加しました)

    Romuald Brunet によって提供された修正。

  • 例外がサービスを終了したときに、プログラムがゼロ以外の終了コードで終了することを保証するようになりました。

    Simon Peeters によって提供された修正。


アプリ

  • :setting: `enable_utc` が無効になっている場合でも、日付は常にタイムゾーンに対応するようになりました(問題#943)。

    Omer Katz によって提供された修正。

  • Config :アプリの事前構成も構成に組み込まれるようになりました。

    Jeremy Zafran によって提供された修正。

  • アプリケーションは、を使用してタスク名の生成方法を変更できるようになりました

    @gen_task_name()メソッド。

    Dmitry Malinovsky による寄稿。

  • アプリには、現在作業中のタスク(またはNone)を返す新しいapp.current_worker_taskプロパティがあります。 (問題#2100)。


ロギング

  • get_task_logger()は、「celery」または「celery.task」という名前を使用しようとすると例外を発生させるようになりました(問題#3475)。


実行プール

  • Eventlet / Gevent :AMQPハートビートを有効にするようになりました(問題#3338)。

  • Eventlet / Gevent :「同時読み取り」エラーにつながる競合状態を修正しました(問題#2755)。

  • Prefork :Preforkプールは、使用可能な場合、selectの代わりにpollを使用するようになりました(問題#2373)。

  • Prefork :プールがワーカーのシャットダウンを拒否するバグを修正しました(問題#2606)。

  • Eventletcelery inspect stats コマンドでプールサイズを返すようになりました。

    Alexander Oblovatniy による寄稿。


テスト

トランスポート

  • amqps://を指定してSSLを要求できるようになりました。

  • Redisトランスポート:Redisトランスポートが:setting: `broker_use_ssl` オプションをサポートするようになりました。

    Robert Kolba による寄稿。

  • JSONシリアライザーは、サポートされていない型に対してobj.__json__を呼び出すようになりました。

    これは、組み込みのjson型に縮小できるカスタム型の__json__メソッドを定義できることを意味します。

    例:

    class Person:
        first_name = None
        last_name = None
        address = None
    
        def __json__(self):
            return {
                'first_name': self.first_name,
                'last_name': self.last_name,
                'address': self.address,
            }
  • JSONシリアライザーは、日時、Django promise、UUID、およびDecimalを処理するようになりました。

  • 新しいQueue.consumer_argumentsは、x-priorityを介して消費者の優先順位を設定する機能に使用できます。

    https://www.rabbitmq.com/consumer-priority.htmlを参照してください

    例:

    consumer = Consumer(channel, consumer_arguments={'x-priority': 3})
  • キュー/交換:no_declareオプションが追加されました(内部amqでも有効になっています)。 交換)。


プログラム

  • Celeryは、optparseの代わりにargparseを使用するようになりました。

  • 制御端末がTTYでない場合、すべてのプログラムが色を無効にするようになりました。

  • セロリワーカー-q引数は、スタートアップバナーを無効にするようになりました。

  • セロリワーカー:「ワーカー準備完了」メッセージは、警告ではなく重大度情報を使用してログに記録されるようになりました。

  • セロリマルチ%nの形式は、セロリワーカーと一致するように%Nと同義になりました。

  • celery inspect / celery control :新しい--jsonオプションをサポートして、json形式で出力を提供するようになりました。

  • 登録済みのセロリ検査:組み込みタスクを無視するようになりました。

  • セロリパージは、パージに含めるキューとパージから除外するキューを指定するために使用される-Qおよび-Xオプションを使用するようになりました。

  • 新しい celery logtool :celeryworkerログファイルをフィルタリングおよび解析するためのユーティリティ

  • celery multi :%i および%I ログファイル形式を通過するようになりました。

  • 一般:%pを使用して、log-file / pid-file引数で完全なワーカーノード名に展開できるようになりました。

  • 新しいコマンドラインオプション

    --executableは、プログラムのデーモン化に使用できるようになりました(セロリワーカーおよびセロリビート)。

    Bert Vanderbauwhede による寄稿。

  • セロリワーカー:新しい--prefetch-multiplierオプションをサポートします。

    MickaëlPenhardによる寄稿。

  • --loader引数は、アプリの引数が設定されている場合でも常に有効になりました(問題#3405)。

  • inspect / controlがレジストリからコマンドを取得するようになりました

    これは、ユーザーのリモートコントロールコマンドもコマンドラインから使用できることを意味します。

    コマンドラインで引数を正しく渡すには、引数/および引数のタイプを指定する必要があることに注意してください。

    コマンドのタイプに応じて使用する2つのデコレータがあります: @inspect_command + @control_command :

    from celery.worker.control import control_command
    
    @control_command(
        args=[('n', int)]
        signature='[N=1]',
    )
    def something(state, n=1, **kwargs):
        ...

    ここで、argsは、コマンドでサポートされている引数のリストです。 リストには、(argument_name, type)のタプルが含まれている必要があります。

    signatureは、たとえばで使用されるコマンドラインヘルプです。 celery -A proj control --help

    コマンドは variadic 引数もサポートします。これは、残った引数が1つの変数に追加されることを意味します。 ここでは、シグナル引数と可変数のtask_idsを受け取るterminateコマンドによって示されています。

    from celery.worker.control import control_command
    
    @control_command(
        args=[('signal', str)],
        signature='<signal> [id1, [id2, [..., [idN]]]]',
        variadic='ids',
    )
    def terminate(state, signal, ids, **kwargs):
        ...

    このコマンドは、次を使用して呼び出すことができます。

    $ celery -A proj control terminate SIGKILL id1 id2 id3`

    詳細については、独自のリモコンコマンドの作成を参照してください。


ワーカー

  • LimitedSetの改善と修正。

    メモリリークを取り除く+セットのminlenサイズを追加する:しばらく操作した後のセットの最小残存サイズ。 minlenアイテムは、有効期限が切れているはずのアイテムでも保持されます。

    古いコードとさらに古いコードの問題:

    1. 一部のシナリオ(アイテムを複数回追加するなど)では、ヒープが大きくなる傾向があります。

    2. 多くのアイテムをすばやく追加しても、(あったとしても)すぐにそれらをクリーンアップすることはできません。

    3. 他のワーカーと話しているときに、revoked._dataが送信されましたが、反対側で反復可能として処理されました。 つまり、これらのキーに新しい(現在の)タイムスタンプを付けることを意味します。 これを行うことにより、労働者はアイテムを永久にリサイクルすることができます。 1)と2)を組み合わせると、これは、大規模なワーカーセットでは、すぐにメモリが不足することを意味します。

    これらの問題はすべて今すぐ修正する必要があります。

    これにより、問題#3095、#3086が修正されます。

    David Pravec による寄稿。

  • リモートコントロールコマンドキューを制御するための新しい設定。

    • :setting: `control_queue_expires`

      リモートコントロールコマンドキューとリモートコントロール応答キューの両方のキュー有効期限を設定します。

    • :setting: `control_queue_ttl`

      リモートコントロールコマンドキューとリモートコントロール応答キューの両方のメッセージの存続時間を設定します。

    Alan Justino による寄稿。

  • :signal: `worker_shutdown` シグナルは、シャットダウン中に常に呼び出されるようになりました。

    以前は、ワーカーインスタンスが最初にgcによって収集された場合は呼び出されませんでした。

  • ワーカーは、使用されるブローカートランスポートが実際にそれらをサポートしている場合にのみ、リモートコントロールコマンドコンシューマーを起動するようになりました。

  • Gossipは、イベントキューのx-message-ttlをheartbeat_intervalに設定するようになりました。 (問題#2005)。

  • 終了コードを保持するようになりました(問題#2024)。

  • 無効なETA値を持つメッセージを拒否するようになりました(ackの代わりに、メッセージが構成されている場合、配信不能交換に送信されます)。

  • -purge引数が使用されたときのクラッシュを修正しました。

  • 回復不能なエラーのログレベルがerrorからcriticalに変更されました。

  • レート制限の精度が向上しました。

  • タスクの有効期限フィールドで欠落しているタイムゾーン情報を考慮してください。

    Albert Wang によって提供された修正。

  • ワーカーには、Queuesブートステップがなくなりました。

    余計な。

  • 取り消されたタスクに対しても「Receivedtask」行を出力するようになりました。 (問題#3155)。

  • :setting: `broker_connection_retry` 設定を尊重するようになりました。

    Nat Williams によって提供された修正。

  • 新しい:setting: `control_queue_ttl` および:setting:` control_queue_expires` 設定により、リモートコントロールコマンドメッセージのTTLとキューの有効期限を構成できるようになりました。

    Alan Justino による寄稿。

  • 新しいcelery.worker.state.requestsは、IDによるアクティブ/予約済みタスクのO(1)ルックアップを有効にします。

  • 自動スケールは、スケールダウン時にキープアライブを常に更新するとは限りませんでした。

    Philip Garnero によって提供された修正。

  • タイプミスoptions_list-> option_listを修正しました。

    Greg Wilbur によって提供された修正。

  • 一部のワーカーコマンドライン引数とWorker()クラス引数は、一貫性を保つために名前が変更されました。

    これらはすべて、下位互換性のためのエイリアスを持っています。

    • --send-events-> --task-events

    • --schedule-> --schedule-filename

    • --maxtasksperchild-> --max-tasks-per-child

    • Beat(scheduler_cls=)-> Beat(scheduler=)

    • Worker(send_events=True)-> Worker(task_events=True)

    • Worker(task_time_limit=)-> Worker(time_limit=

    • Worker(task_soft_time_limit=)-> Worker(soft_time_limit=)

    • Worker(state_db=)-> Worker(statedb=)

    • Worker(working_directory=)-> Worker(workdir=)



デバッグユーティリティ

  • celery.contrib.rdb:アドレスを簡単にコピーして貼り付けることができるようにリモートデバッガバナーを変更しました(アドレスにピリオドが含まれなくなりました)。

    Jonathan Vanasco による寄稿。

  • 最近の:pypi: `psutil` バージョンとの互換性を修正しました(問題#3262)。


信号

  • アプリ:アプリの構成/ファイナライズの新しいシグナル:

    • app.on_configure

    • app.on_after_configure

    • app.on_after_finalize


  • タスク:拒否されたタスクメッセージの新しいタスクシグナル:

    • celery.signals.task_rejected

    • celery.signals.task_unknown


  • Worker :ハートビートイベントが送信されたときの新しい信号。

    • celery.signals.heartbeat_sent

      Kevin Richardson による寄稿。



イベント

  • イベントメッセージは、RabbitMQ x-message-ttlオプションを使用して、古いイベントメッセージが確実に破棄されるようになりました。

    デフォルトは5秒ですが、:setting: `event_queue_ttl` 設定を使用して変更できます。

  • Task.send_eventは、タスクの公開再試行設定に従って、接続障害時にイベントの送信を自動的に再試行するようになりました。

  • イベントモニターは、デフォルトで:setting: `event_queue_expires` 設定を設定するようになりました。

    キューは、モニターがキューからの消費を停止してから60秒後に期限切れになります。

  • None値が適切に処理されなかったバグを修正しました。

    Dongweiming によって提供された修正。

  • 新しい:setting: `event_queue_prefix` 設定を使用して、イベントレシーバーキューのデフォルトのceleryevキュープレフィックスを変更できるようになりました。

    金本武による寄稿。

  • State.tasks_by_typeおよびState.tasks_by_workerを、この情報への高速アクセスのマッピングとして使用できるようになりました。


展開

  • 汎用init-scriptsは、 CELERY_SUおよび CELERYD_SU_ARGS環境変数をサポートして、 suのパスと引数を設定するようになりました。 ] su(1))。

  • 一般的なinit-scriptsは、/usr/local/etc/で構成ファイルを検索することにより、FreeBSDおよびその他のBSDシステムをより適切にサポートするようになりました。

    Taha Jahangir による寄稿。

  • 一般的なinit-script:再起動が常に機能しないcelerybeatの奇妙なバグを修正しました(問題#3018)。

  • systemd initスクリプトは、サービスの実行時にシェルを使用するようになりました。

    Tomas Machalek による寄稿。


結果のバックエンド

  • Redis:デフォルトのソケットタイムアウトが120秒になりました。

    デフォルトは、新しい:setting: `redis_socket_timeout` 設定を使用して変更できます。

    Raghuram Srinivasan による寄稿。

  • RPCバックエンドの結果キューがデフォルトで自動削除されるようになりました(問題#2001)。

  • RPCバックエンド:例外がjsonシリアライザーで適切に逆シリアル化されなかった問題を修正しました(問題#2518)。

    Allard Hoeve によって提供された修正。

  • CouchDB:結果をdouble-jsonエンコードするために使用されるバックエンド。

    Andrew Stewart によって提供された修正。

  • CouchDB:バックエンドが見つからない原因となるタイプミスを修正しました(問題#3287)。

    Andrew Stewart によって提供された修正。

  • MongoDB::setting: `result_serialzier` 設定をbsonに設定して、MongoDBライブラリ独自のシリアライザーを使用できるようになりました。

    Davide Quarta による寄稿。

  • MongoDB:URI処理が改善されて使用できるようになりました

    提供されている場合は、URIからのデータベース名、ユーザー、およびパスワード。

    Samuel Jaillet による寄稿。

  • SQLAlchemy結果バックエンド:NullPoolを使用するときにすべての結果エンジンオプションを無視するようになりました(問題#1930)。

  • SQLAlchemyの結果バックエンド:脳に損傷を受けたMySQL Unicode実装を処理するために、最大文字サイズを155に設定するようになりました(問題#1748)。

  • General :すべてのCelery例外/警告は、共通のCeleryError / CeleryWarningから継承するようになりました。 (問題#2643)。


ドキュメントの改善

寄稿者:

  • アダム・チェインズ
  • アミール・ルスタムザデ
  • アーサー・ヴュイヤール
  • Batiste Bieler
  • バーカーペクサグ
  • ブライス・グロフ
  • ダニエル・ディヴァイン
  • エドワード・ベッツ
  • ジェイソンビーチ
  • ジェフ・ウィドマン
  • Maciej Obuchowski
  • マヌエルカウフマン
  • マキシムボーケミン
  • ミッチェルハンファリー
  • パブロ・カピシン
  • ピエール・ファーシング
  • リック
  • スティーブン・スクラール
  • Tayfun Sen
  • ウィーランドホフマン


再編成、非推奨、および削除

互換性のない変更

  • プリフォーク:result.get()を呼び出すか、タスク内から結果を結合すると、RuntimeErrorが発生するようになりました。

    以前のバージョンでは、これは警告を発していました。

  • celery.worker.consumerは、モジュールではなくパッケージになりました。

  • モジュールcelery.worker.jobの名前がcelery.worker.requestに変更されました。

  • ビート:Scheduler.Publisher / .publisher.Producer / .producerに名前が変更されました。

  • 結果:@AsyncResultのtask_name引数/属性が削除されました。

    これは歴史的にpickle互換性のために使用されていたフィールドでしたが、現在は必要ありません。

  • バックエンド:statusという名前の引数の名前がstateに変更されました。

  • バックエンド:backend.get_status()の名前がbackend.get_state()に変更されました。

  • バックエンド:backend.maybe_reraise()の名前が.maybe_throw()に変更されました

    promise APIは.throw()を使用するため、この変更は一貫性を高めるために行われました。

    使用可能なエイリアスがあるため、Celery5.0までmaybe_reraiseを引き続き使用できます。


予定外の削除

  • 実験的なcelery.contrib.methods機能は削除されました。これは、実装に役立つバグが非常に多かったためです。

  • CentOSのinitスクリプトは削除されました。

    これらは実際には一般的なinit-scriptsに機能を追加しなかったので、代わりにそれらを使用するか、:pypi: `supervisor` のようなものを使用することをお勧めします。


再編成の非推奨

これらのシンボルの名前は変更されており、下位互換性のためにこのバージョンで使用できるエイリアスがありますが、Celery 5.0で削除されるため、これらのシンボルの名前をできるだけ早く変更して、そのリリースで壊れないようにしてください。

このリストの最初のリストのみを使用する可能性がありますが、次のことはわかりません。

  • celery.utils.worker_direct-> celery.utils.nodenames.worker_direct()
  • celery.utils.nodename-> celery.utils.nodenames.nodename()
  • celery.utils.anon_nodename-> celery.utils.nodenames.anon_nodename()
  • celery.utils.nodesplit-> celery.utils.nodenames.nodesplit()
  • celery.utils.default_nodename-> celery.utils.nodenames.default_nodename()
  • celery.utils.node_format-> celery.utils.nodenames.node_format()
  • celery.utils.host_format-> celery.utils.nodenames.host_format()


予定された削除

モジュール

  • モジュールcelery.worker.jobcelery.worker.requestに名前が変更されました。

    これは内部モジュールであったため、効果はありません。 現在はパブリックAPIの一部であるため、再度変更しないでください。

  • モジュールcelery.task.traceは、celery.taskパッケージが段階的に廃止されるため、celery.app.traceに名前が変更されました。 モジュールはバージョン5.0で削除されるため、インポートを次の場所から変更してください。

    from celery.task.trace import X

    に:

    from celery.app.trace import X
  • celery.loadersモジュールの古い互換性エイリアスは削除されました。

    • celery.loaders.current_loader()を削除し、次を使用します:current_app.loader

    • celery.loaders.load_settings()を削除し、次を使用します:current_app.conf



結果

  • AsyncResult.serializable()およびcelery.result.from_serializable

    削除されました:

    代わりに使用してください:

    >>> tup = result.as_tuple()
    >>> from celery.result import result_from_tuple
    >>> result = result_from_tuple(tup)
  • BaseAsyncResultを削除し、代わりにAsyncResultをインスタンスチェックに使用します。

  • TaskSetResultを削除し、代わりにGroupResultを使用してください。

    • TaskSetResult.total-> len(GroupResult)

    • TaskSetResult.taskset_id-> GroupResult.id


  • ResultSet.subtasksを削除し、代わりにResultSet.resultsを使用してください。


TaskSet

TaskSetは、Celery3.0でgroup構造に置き換えられたため、削除されました。

このようなコードがある場合:

>>> from celery.task import TaskSet

>>> TaskSet(add.subtask((i, i)) for i in xrange(10)).apply_async()

これを次のように置き換える必要があります。

>>> from celery import group
>>> group(add.s(i, i) for i in xrange(10))()

イベント

  • クラスcelery.events.state.Workerの削除:

    • Worker._defaults属性。

      {k: getattr(worker, k) for k in worker._fields}を使用してください。

    • Worker.update_heartbeat

      Worker.event(None, timestamp, received)を使用する

    • Worker.on_online

      Worker.event('online', timestamp, received, fields)を使用する

    • Worker.on_offline

      Worker.event('offline', timestamp, received, fields)を使用する

    • Worker.on_heartbeat

      Worker.event('heartbeat', timestamp, received, fields)を使用する


  • クラスcelery.events.state.Taskの削除:

    • Task._defaults属性。

      {k: getattr(task, k) for k in task._fields}を使用してください。

    • Task.on_sent

      Worker.event('sent', timestamp, received, fields)を使用する

    • Task.on_received

      Task.event('received', timestamp, received, fields)を使用する

    • Task.on_started

      Task.event('started', timestamp, received, fields)を使用する

    • Task.on_failed

      Task.event('failed', timestamp, received, fields)を使用する

    • Task.on_retried

      Task.event('retried', timestamp, received, fields)を使用する

    • Task.on_succeeded

      Task.event('succeeded', timestamp, received, fields)を使用する

    • Task.on_revoked

      Task.event('revoked', timestamp, received, fields)を使用する

    • Task.on_unknown_event

      Task.event(short_type, timestamp, received, fields)を使用する

    • Task.update

      Task.event(short_type, timestamp, received, fields)を使用する

    • Task.merge

      これが必要な場合はお問い合わせください。



魔法のキーワード引数

タスクによって受け入れられる非常に古い魔法のキーワード引数のサポートは、このバージョンで最終的に削除されました。

これらをまだ使用している場合は、古いcelery.decoratorsモジュールを使用し、タスクに渡されるキーワード引数に応じて、タスクを書き直す必要があります。次に例を示します。

from celery.decorators import task

@task()
def add(x, y, task_id=None):
    print('My task id is %r' % (task_id,))

次のように書き直す必要があります。

from celery import task

@task(bind=True)
def add(self, x, y):
    print('My task id is {0.request.id}'.format(self))

削除された設定

次の設定は削除され、サポートされなくなりました。

ロギング設定

設定名 と置換する
CELERYD_LOG_LEVEL celery worker --loglevel
CELERYD_LOG_FILE celery worker --logfile
CELERYBEAT_LOG_LEVEL celery beat --loglevel
CELERYBEAT_LOG_FILE celery beat --logfile
CELERYMON_LOG_LEVEL セレリモンは非推奨です、花を使用してください
CELERYMON_LOG_FILE セレリモンは非推奨です、花を使用してください
CELERYMON_LOG_FORMAT セレリモンは非推奨です、花を使用してください


タスク設定

設定名 と置換する
CELERY_CHORD_PROPAGATES 該当なし


内部APIの変更

  • モジュールcelery.datastructuresの名前がcelery.utils.collectionsに変更されました。

  • モジュールcelery.utils.timeutilsの名前がcelery.utils.timeに変更されました。

  • celery.utils.datastructures.DependencyGraphcelery.utils.graphに移動しました。

  • celery.utils.jsonifycelery.utils.serialization.jsonify()になりました。

  • celery.utils.strtoboolcelery.utils.serialization.strtobool()になりました。

  • celery.utils.is_iterableは削除されました。

    代わりに以下を使用してください:

    isinstance(x, collections.Iterable)
  • celery.utils.lpmergecelery.utils.collections.lpmerge()になりました。

  • celery.utils.crycelery.utils.debug.cry()になりました。

  • celery.utils.isattycelery.platforms.isatty()になりました。

  • celery.utils.gen_task_namecelery.utils.imports.gen_task_name()になりました。

  • celery.utils.deprecatedcelery.utils.deprecated.Callable()になりました

  • celery.utils.deprecated_propertycelery.utils.deprecated.Property()になりました。

  • celery.utils.warn_deprecatedcelery.utils.deprecated.warn()になりました


非推奨のタイムラインの変更

Celery Deprecation Time-line を参照してください。