アプリケーション—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/userguide/application
移動先:案内検索

応用

Celeryライブラリは、使用する前にインスタンス化する必要があります。このインスタンスは、アプリケーション(または略して app )と呼ばれます。

このアプリケーションはスレッドセーフであるため、構成、コンポーネント、およびタスクが異なる複数のCeleryアプリケーションを同じプロセススペースに共存させることができます。

今すぐ作成しましょう:

>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最後の行は、アプリケーションのテキスト表現を示しています。これには、アプリクラスの名前(Celery)、現在のメインモジュールの名前(__main__)、およびオブジェクトのメモリアドレスが含まれます。 (0x100469fd0)。

メインネーム

これらのうち1つだけが重要であり、それがメインモジュール名です。 その理由を見てみましょう。

Celeryでタスクメッセージを送信すると、そのメッセージにはソースコードは含まれず、実行するタスクの名前のみが含まれます。 これは、インターネット上でのホスト名の動作と同様に機能します。すべてのワーカーは、タスクレジストリと呼ばれる実際の機能へのタスク名のマッピングを維持します。

タスクを定義するたびに、そのタスクはローカルレジストリにも追加されます。

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

そこに__main__が再び表示されます。 Celeryが関数が属するモジュールを検出できない場合は常に、メインモジュール名を使用してタスク名の先頭を生成します。

これは、限られた一連のユースケースでのみ問題になります。

  1. タスクが定義されているモジュールがプログラムとして実行されている場合。
  2. アプリケーションがPythonシェル(REPL)で作成されている場合。


たとえば、ここでは、タスクモジュールを使用して@worker_main()でワーカーを開始します。

tasks.py

from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

このモジュールが実行されると、タスクには「__main__」で始まる名前が付けられますが、モジュールが別のプロセスによってインポートされる場合、たとえばタスクを呼び出す場合、タスクには「 [X195X」で始まる名前が付けられます。 ]」(モジュールの本名):

>>> from tasks import add
>>> add.name
tasks.add

メインモジュールに別の名前を指定できます。

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

も参照してください

名前


構成

セロリの動作を変更する設定可能なオプションがいくつかあります。 これらのオプションは、アプリインスタンスで直接設定することも、専用の構成モジュールを使用することもできます。

構成は@confとして利用できます。

>>> app.conf.timezone
'Europe/London'

ここで、構成値を直接設定することもできます。

>>> app.conf.enable_utc = True

または、updateメソッドを使用して、一度に複数のキーを更新します。

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

構成オブジェクトは、次の順序で参照される複数のディクショナリで構成されます。

  1. 実行時に行われた変更。
  2. 構成モジュール(存在する場合)
  3. デフォルト構成(celery.app.defaults)。


@add_defaults()メソッドを使用して、新しいデフォルトソースを追加することもできます。

も参照してください

使用可能なすべての設定とそのデフォルト値の完全なリストについては、構成リファレンスにアクセスしてください。


config_from_object

@config_from_object()メソッドは、構成オブジェクトから構成をロードします。

これは、構成モジュール、または構成属性を持つ任意のオブジェクトにすることができます。

@config_from_object()が呼び出されると、以前に設定された構成がリセットされることに注意してください。 追加の構成を設定する場合は、後で行う必要があります。

例1:モジュールの名前を使用する

@config_from_object()メソッドは、Pythonモジュールの完全修飾名、またはPython属性の名前を使用できます(例:"celeryconfig""myproj.config.celery"、 [ X157X]:

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

celeryconfigモジュールは、次のようになります。

celeryconfig.py

enable_utc = True
timezone = 'Europe/London'

import celeryconfigが可能な限り、アプリはそれを使用できます。


例2:実際のモジュールオブジェクトを渡す

インポート済みのモジュールオブジェクトを渡すこともできますが、これが常に推奨されるとは限りません。

ヒント

モジュールの名前を使用することをお勧めします。これは、プリフォークプールを使用するときにモジュールをシリアル化する必要がないことを意味します。 構成の問題やpickleエラーが発生している場合は、代わりにモジュールの名前を使用してみてください。


import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

例3:構成クラス/オブジェクトの使用

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
#   app.config_from_object('module:Config')

config_from_envvar

@config_from_envvar()は、環境変数から構成モジュール名を取得します

例– CELERY_CONFIG_MODULEという名前の環境変数で指定されたモジュールから構成をロードするには:

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

次に、環境を介して使用する構成モジュールを指定できます。

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO

打ち切り構成

デバッグ情報などとして構成を印刷する場合は、パスワードやAPIキーなどの機密情報を除外することもできます。

Celeryには、構成の表示に役立ついくつかのユーティリティが付属しています。1つはhumanize()です。

>>> app.conf.humanize(with_defaults=False, censored=True)

このメソッドは、構成を表形式の文字列として返します。 これには、デフォルトで構成への変更のみが含まれますが、with_defaults引数を有効にすることで、組み込みのデフォルトのキーと値を含めることができます。

代わりに、構成を辞書として操作する場合は、table()メソッドを使用できます。

>>> app.conf.table(with_defaults=False, censored=True)

Celeryは、正規表現を使用して一般的な名前のキーを検索するだけなので、すべての機密情報を削除できるわけではないことに注意してください。 機密情報を含むカスタム設定を追加する場合は、Celeryが秘密として識別する名前を使用してキーに名前を付ける必要があります。

名前に次のサブ文字列のいずれかが含まれている場合、構成設定は打ち切られます。

APITOKENKEYSECRETPASSSIGNATUREDATABASE


怠惰

アプリケーションインスタンスはレイジーです。つまり、実際に必要になるまで評価されません。

@Celeryインスタンスを作成すると、次のことのみが実行されます。

  1. イベントに使用される論理クロックインスタンスを作成します。
  2. タスクレジストリを作成します。
  3. 自分自身を現在のアプリとして設定します(ただし、set_as_current引数が無効になっている場合は設定しません)
  4. @on_init()コールバックを呼び出します(デフォルトでは何もしません)。


@task()デコレータは、タスクが定義された時点ではタスクを作成しません。代わりに、タスクが使用されたとき、またはアプリケーションがになった後に、タスクの作成を延期します。 ]最終化

この例は、タスクを使用するか、属性(この場合はrepr())にアクセスするまでタスクが作成されない方法を示しています。

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

アプリの Finalization は、@finalize()を呼び出すことによって明示的に、または@tasks属性にアクセスすることによって暗黙的に行われます。

オブジェクトを完成させると、次のようになります。

  1. アプリ間で共有する必要のあるタスクをコピーする

    タスクはデフォルトで共有されますが、タスクデコレータへのshared引数が無効になっている場合、タスクはバインドされているアプリに対してプライベートになります。

  2. 保留中のすべてのタスクデコレータを評価します。

  3. すべてのタスクが現在のアプリにバインドされていることを確認してください。

    タスクはアプリにバインドされているため、構成からデフォルト値を読み取ることができます。


「デフォルトアプリ」

Celeryには常にアプリケーションがあるとは限りませんでした。以前は、モジュールベースのAPIしかありませんでした。 互換性APIは、Celery 5.0がリリースされるまで古い場所で利用可能でしたが、削除されました。

Celeryは常に特別なアプリ(「デフォルトアプリ」)を作成します。これは、カスタムアプリケーションがインスタンス化されていない場合に使用されます。

celery.taskモジュールは使用できなくなりました。 モジュールベースのAPIではなく、アプリインスタンスのメソッドを使用します。

from celery.task import Task   # << OLD Task base class.

from celery import Task        # << NEW base class.

チェーンを壊す

現在設定されているアプリに依存することは可能ですが、ベストプラクティスは、アプリインスタンスを必要なものに常に渡すことです。

渡されるアプリに応じてインスタンスのチェーンを作成するため、これを「アプリチェーン」と呼びます。

次の例は悪い習慣と見なされます。

from celery import current_app

class Scheduler:

    def run(self):
        app = current_app

代わりに、appを引数として取る必要があります。

class Scheduler:

    def __init__(self, app):
        self.app = app

内部的にCeleryはcelery.app.app_or_default()関数を使用しているため、すべてがモジュールベースの互換性APIでも機能します。

from celery.app import app_or_default

class Scheduler:
    def __init__(self, app=None):
        self.app = app_or_default(app)

開発では、 CELERY_TRACE_APP環境変数を設定して、アプリチェーンが壊れた場合に例外を発生させることができます。

$ CELERY_TRACE_APP=1 celery worker -l INFO

APIの進化

セロリは、最初に作成されて以来、2009年から大きく変化しました。

たとえば、最初は、任意の呼び出し可能オブジェクトをタスクとして使用することが可能でした。

def hello(to):
    return 'hello {0}'.format(to)

>>> from celery.execute import apply_async

>>> apply_async(hello, ('world!',))

または、Taskクラスを作成して特定のオプションを設定したり、他の動作をオーバーライドしたりすることもできます。

from celery import Task
from celery.registry import tasks

class Hello(Task):
    queue = 'hipri'

    def run(self, to):
        return 'hello {0}'.format(to)
tasks.register(Hello)

>>> Hello.delay('world!')

その後、任意の呼び出し可能オブジェクトを渡すことはアンチパターンであると判断されました。これは、ピクルス以外のシリアライザーの使用が非常に困難になるためです。この機能は2.0で削除され、タスクデコレータに置き換えられました。

from celery import app

@app.task(queue='hipri')
def hello(to):
    return 'hello {0}'.format(to)

抽象タスク

@task()デコレータを使用して作成されたすべてのタスクは、アプリケーションの基本@Taskクラスから継承されます。

base引数を使用して、別の基本クラスを指定できます。

@app.task(base=OtherTask):
def add(x, y):
    return x + y

カスタムタスククラスを作成するには、ニュートラル基本クラスcelery.Taskから継承する必要があります。

from celery import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return self.run(*args, **kwargs)

ヒント

タスクの__call__メソッドをオーバーライドする場合は、self.runも呼び出してタスクの本体を実行することが非常に重要です。 super().__call__は呼び出さないでください。 ニュートラル基本クラスcelery.Task__call__メソッドは、参照用にのみ存在します。 最適化のために、これはcelery.app.trace.build_tracer.trace_taskに展開され、__call__メソッドが定義されていない場合、カスタムタスククラスでrunを直接呼び出します。


ニュートラル基本クラスは、特定のアプリにまだバインドされていないため、特別です。 タスクがアプリにバインドされると、構成を読み取ってデフォルト値を設定します。

基本クラスを実現するには、@task()デコレータを使用してタスクを作成する必要があります。

@app.task(base=DebugTask)
def add(x, y):
    return x + y

@Task()属性を変更することで、アプリケーションのデフォルトの基本クラスを変更することもできます。

>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
...    queue = 'hipri'

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
 <unbound MyBaseTask>,
 <unbound Task>,
 <type 'object'>]