アプリケーション—Pythonドキュメント
応用
Celeryライブラリは、使用する前にインスタンス化する必要があります。このインスタンスは、アプリケーション(または略して app )と呼ばれます。
このアプリケーションはスレッドセーフであるため、構成、コンポーネント、およびタスクが異なる複数のCeleryアプリケーションを同じプロセススペースに共存させることができます。
今すぐ作成しましょう:
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>
最後の行は、アプリケーションのテキスト表現を示しています。これには、アプリクラスの名前(Celery
)、現在のメインモジュールの名前(__main__
)、およびオブジェクトのメモリアドレスが含まれます。 (0x100469fd0
)。
メインネーム
これらのうち1つだけが重要であり、それがメインモジュール名です。 その理由を見てみましょう。
Celeryでタスクメッセージを送信すると、そのメッセージにはソースコードは含まれず、実行するタスクの名前のみが含まれます。 これは、インターネット上でのホスト名の動作と同様に機能します。すべてのワーカーは、タスクレジストリと呼ばれる実際の機能へのタスク名のマッピングを維持します。
タスクを定義するたびに、そのタスクはローカルレジストリにも追加されます。
>>> @app.task
... def add(x, y):
... return x + y
>>> add
<@task: __main__.add>
>>> add.name
__main__.add
>>> app.tasks['__main__.add']
<@task: __main__.add>
そこに__main__
が再び表示されます。 Celeryが関数が属するモジュールを検出できない場合は常に、メインモジュール名を使用してタスク名の先頭を生成します。
これは、限られた一連のユースケースでのみ問題になります。
- タスクが定義されているモジュールがプログラムとして実行されている場合。
- アプリケーションがPythonシェル(REPL)で作成されている場合。
たとえば、ここでは、タスクモジュールを使用して@worker_main()
でワーカーを開始します。
tasks.py
:
from celery import Celery
app = Celery()
@app.task
def add(x, y): return x + y
if __name__ == '__main__':
app.worker_main()
このモジュールが実行されると、タスクには「__main__
」で始まる名前が付けられますが、モジュールが別のプロセスによってインポートされる場合、たとえばタスクを呼び出す場合、タスクには「 [X195X」で始まる名前が付けられます。 ]」(モジュールの本名):
>>> from tasks import add
>>> add.name
tasks.add
メインモジュールに別の名前を指定できます。
>>> app = Celery('tasks')
>>> app.main
'tasks'
>>> @app.task
... def add(x, y):
... return x + y
>>> add.name
tasks.add
構成
セロリの動作を変更する設定可能なオプションがいくつかあります。 これらのオプションは、アプリインスタンスで直接設定することも、専用の構成モジュールを使用することもできます。
構成は@conf
として利用できます。
>>> app.conf.timezone
'Europe/London'
ここで、構成値を直接設定することもできます。
>>> app.conf.enable_utc = True
または、update
メソッドを使用して、一度に複数のキーを更新します。
>>> app.conf.update(
... enable_utc=True,
... timezone='Europe/London',
...)
構成オブジェクトは、次の順序で参照される複数のディクショナリで構成されます。
- 実行時に行われた変更。
- 構成モジュール(存在する場合)
- デフォルト構成(
celery.app.defaults
)。
@add_defaults()
メソッドを使用して、新しいデフォルトソースを追加することもできます。
config_from_object
@config_from_object()
メソッドは、構成オブジェクトから構成をロードします。
これは、構成モジュール、または構成属性を持つ任意のオブジェクトにすることができます。
@config_from_object()
が呼び出されると、以前に設定された構成がリセットされることに注意してください。 追加の構成を設定する場合は、後で行う必要があります。
例1:モジュールの名前を使用する
@config_from_object()
メソッドは、Pythonモジュールの完全修飾名、またはPython属性の名前を使用できます(例:"celeryconfig"
、"myproj.config.celery"
、 [ X157X]:
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
celeryconfig
モジュールは、次のようになります。
celeryconfig.py
:
enable_utc = True
timezone = 'Europe/London'
import celeryconfig
が可能な限り、アプリはそれを使用できます。
例2:実際のモジュールオブジェクトを渡す
インポート済みのモジュールオブジェクトを渡すこともできますが、これが常に推奨されるとは限りません。
ヒント
モジュールの名前を使用することをお勧めします。これは、プリフォークプールを使用するときにモジュールをシリアル化する必要がないことを意味します。 構成の問題やpickleエラーが発生している場合は、代わりにモジュールの名前を使用してみてください。
import celeryconfig
from celery import Celery
app = Celery()
app.config_from_object(celeryconfig)
例3:構成クラス/オブジェクトの使用
from celery import Celery
app = Celery()
class Config:
enable_utc = True
timezone = 'Europe/London'
app.config_from_object(Config)
# or using the fully qualified name of the object:
# app.config_from_object('module:Config')
config_from_envvar
@config_from_envvar()
は、環境変数から構成モジュール名を取得します
例– CELERY_CONFIG_MODULE
という名前の環境変数で指定されたモジュールから構成をロードするには:
import os
from celery import Celery
#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')
次に、環境を介して使用する構成モジュールを指定できます。
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO
打ち切り構成
デバッグ情報などとして構成を印刷する場合は、パスワードやAPIキーなどの機密情報を除外することもできます。
Celeryには、構成の表示に役立ついくつかのユーティリティが付属しています。1つはhumanize()
です。
>>> app.conf.humanize(with_defaults=False, censored=True)
このメソッドは、構成を表形式の文字列として返します。 これには、デフォルトで構成への変更のみが含まれますが、with_defaults
引数を有効にすることで、組み込みのデフォルトのキーと値を含めることができます。
代わりに、構成を辞書として操作する場合は、table()
メソッドを使用できます。
>>> app.conf.table(with_defaults=False, censored=True)
Celeryは、正規表現を使用して一般的な名前のキーを検索するだけなので、すべての機密情報を削除できるわけではないことに注意してください。 機密情報を含むカスタム設定を追加する場合は、Celeryが秘密として識別する名前を使用してキーに名前を付ける必要があります。
名前に次のサブ文字列のいずれかが含まれている場合、構成設定は打ち切られます。
API
、TOKEN
、KEY
、SECRET
、PASS
、SIGNATURE
、DATABASE
怠惰
アプリケーションインスタンスはレイジーです。つまり、実際に必要になるまで評価されません。
@Celery
インスタンスを作成すると、次のことのみが実行されます。
- イベントに使用される論理クロックインスタンスを作成します。
- タスクレジストリを作成します。
- 自分自身を現在のアプリとして設定します(ただし、
set_as_current
引数が無効になっている場合は設定しません)@on_init()
コールバックを呼び出します(デフォルトでは何もしません)。
@task()
デコレータは、タスクが定義された時点ではタスクを作成しません。代わりに、タスクが使用されたとき、またはアプリケーションがになった後に、タスクの作成を延期します。 ]最終化、
この例は、タスクを使用するか、属性(この場合はrepr()
)にアクセスするまでタスクが作成されない方法を示しています。
>>> @app.task
>>> def add(x, y):
... return x + y
>>> type(add)
<class 'celery.local.PromiseProxy'>
>>> add.__evaluated__()
False
>>> add # <-- causes repr(add) to happen
<@task: __main__.add>
>>> add.__evaluated__()
True
アプリの Finalization は、@finalize()
を呼び出すことによって明示的に、または@tasks
属性にアクセスすることによって暗黙的に行われます。
オブジェクトを完成させると、次のようになります。
アプリ間で共有する必要のあるタスクをコピーする
タスクはデフォルトで共有されますが、タスクデコレータへの
shared
引数が無効になっている場合、タスクはバインドされているアプリに対してプライベートになります。保留中のすべてのタスクデコレータを評価します。
すべてのタスクが現在のアプリにバインドされていることを確認してください。
タスクはアプリにバインドされているため、構成からデフォルト値を読み取ることができます。
「デフォルトアプリ」
Celeryには常にアプリケーションがあるとは限りませんでした。以前は、モジュールベースのAPIしかありませんでした。 互換性APIは、Celery 5.0がリリースされるまで古い場所で利用可能でしたが、削除されました。
Celeryは常に特別なアプリ(「デフォルトアプリ」)を作成します。これは、カスタムアプリケーションがインスタンス化されていない場合に使用されます。
celery.task
モジュールは使用できなくなりました。 モジュールベースのAPIではなく、アプリインスタンスのメソッドを使用します。
from celery.task import Task # << OLD Task base class.
from celery import Task # << NEW base class.
チェーンを壊す
現在設定されているアプリに依存することは可能ですが、ベストプラクティスは、アプリインスタンスを必要なものに常に渡すことです。
渡されるアプリに応じてインスタンスのチェーンを作成するため、これを「アプリチェーン」と呼びます。
次の例は悪い習慣と見なされます。
from celery import current_app
class Scheduler:
def run(self):
app = current_app
代わりに、app
を引数として取る必要があります。
class Scheduler:
def __init__(self, app):
self.app = app
内部的にCeleryはcelery.app.app_or_default()
関数を使用しているため、すべてがモジュールベースの互換性APIでも機能します。
from celery.app import app_or_default
class Scheduler:
def __init__(self, app=None):
self.app = app_or_default(app)
開発では、 CELERY_TRACE_APP
環境変数を設定して、アプリチェーンが壊れた場合に例外を発生させることができます。
$ CELERY_TRACE_APP=1 celery worker -l INFO
APIの進化
セロリは、最初に作成されて以来、2009年から大きく変化しました。
たとえば、最初は、任意の呼び出し可能オブジェクトをタスクとして使用することが可能でした。
def hello(to):
return 'hello {0}'.format(to)
>>> from celery.execute import apply_async
>>> apply_async(hello, ('world!',))
または、Task
クラスを作成して特定のオプションを設定したり、他の動作をオーバーライドしたりすることもできます。
from celery import Task
from celery.registry import tasks
class Hello(Task):
queue = 'hipri'
def run(self, to):
return 'hello {0}'.format(to)
tasks.register(Hello)
>>> Hello.delay('world!')
その後、任意の呼び出し可能オブジェクトを渡すことはアンチパターンであると判断されました。これは、ピクルス以外のシリアライザーの使用が非常に困難になるためです。この機能は2.0で削除され、タスクデコレータに置き換えられました。
from celery import app
@app.task(queue='hipri')
def hello(to):
return 'hello {0}'.format(to)
抽象タスク
@task()
デコレータを使用して作成されたすべてのタスクは、アプリケーションの基本@Task
クラスから継承されます。
base
引数を使用して、別の基本クラスを指定できます。
@app.task(base=OtherTask):
def add(x, y):
return x + y
カスタムタスククラスを作成するには、ニュートラル基本クラスcelery.Task
から継承する必要があります。
from celery import Task
class DebugTask(Task):
def __call__(self, *args, **kwargs):
print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
return self.run(*args, **kwargs)
ヒント
タスクの__call__
メソッドをオーバーライドする場合は、self.run
も呼び出してタスクの本体を実行することが非常に重要です。 super().__call__
は呼び出さないでください。 ニュートラル基本クラスcelery.Task
の__call__
メソッドは、参照用にのみ存在します。 最適化のために、これはcelery.app.trace.build_tracer.trace_task
に展開され、__call__
メソッドが定義されていない場合、カスタムタスククラスでrun
を直接呼び出します。
ニュートラル基本クラスは、特定のアプリにまだバインドされていないため、特別です。 タスクがアプリにバインドされると、構成を読み取ってデフォルト値を設定します。
基本クラスを実現するには、@task()
デコレータを使用してタスクを作成する必要があります。
@app.task(base=DebugTask)
def add(x, y):
return x + y
@Task()
属性を変更することで、アプリケーションのデフォルトの基本クラスを変更することもできます。
>>> from celery import Celery, Task
>>> app = Celery()
>>> class MyBaseTask(Task):
... queue = 'hipri'
>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>
>>> @app.task
... def add(x, y):
... return x + y
>>> add
<@task: __main__.add>
>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
<unbound MyBaseTask>,
<unbound Task>,
<type 'object'>]