「ビッグインスタンス」リファクタリング—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/internals/app-overview
移動先:案内検索

「ビッグインスタンス」リファクタリング

app ブランチは、Celeryでのグローバル構成の使用を削除するための進行中の作業です。

Celeryをインスタンス化できるようになり、Celeryの複数のインスタンスが同じプロセススペースに存在する可能性があります。 また、モンキーパッチを使わずに大きなパーツをカスタマイズできます。

Celeryインスタンスの作成:

>>> from celery import Celery
>>> app = Celery()
>>> app.config_from_object('celeryconfig')
>>> #app.config_from_envvar('CELERY_CONFIG_MODULE')

タスクの作成:

@app.task
def add(x, y):
    return x + y

カスタムタスクサブクラスの作成:

Task = celery.create_task_cls()

class DebugTask(Task):

    def on_failure(self, *args, **kwargs):
        import pdb
        pdb.set_trace()

@app.task(base=DebugTask)
def add(x, y):
    return x + y

ワーカーの開始:

worker = celery.Worker(loglevel='INFO')

構成へのアクセスの取得:

celery.conf.task_always_eager = True
celery.conf['task_always_eager'] = True

労働者の管理:

>>> celery.control.inspect().active()
>>> celery.control.rate_limit(add.name, '100/m')
>>> celery.control.broadcast('shutdown')
>>> celery.control.discard_all()

その他の興味深い属性:

# Establish broker connection.
>>> celery.broker_connection()

# AMQP Specific features.
>>> celery.amqp
>>> celery.amqp.Router
>>> celery.amqp.get_queues()
>>> celery.amqp.get_task_consumer()

# Loader
>>> celery.loader

# Default backend
>>> celery.backend

おそらくお分かりのように、これはカスタマイズ機能の別の側面を実際に開きます。


非推奨

  • celery.task.ping celery.task.PingTask

    pingリモートコントロールコマンドより劣っています。 Celery2.3で削除されます。


エイリアス(保留中の非推奨)

  • celery.execute
    • .send_task-> {app.send_task}

    • .delay_task-> 代替手段なし

  • celery.log
    • .get_default_logger-> {app.log.get_default_logger}

    • .setup_logger-> {app.log.setup_logger}

    • .get_task_logger-> {app.log.get_task_logger}

    • .setup_task_logger-> {app.log.setup_task_logger}

    • .setup_logging_subsystem-> {app.log.setup_logging_subsystem}

    • .redirect_stdouts_to_logger-> {app.log.redirect_stdouts_to_logger}

  • celery.messaging
    • .establish_connection-> {app.broker_connection}

    • .with_connection-> {app.with_connection}

    • .get_consumer_set-> {app.amqp.get_task_consumer}

    • .TaskPublisher-> {app.amqp.TaskPublisher}

    • .TaskConsumer-> {app.amqp.TaskConsumer}

    • .ConsumerSet-> {app.amqp.ConsumerSet}

  • celery.conf.*-> {app.conf}

    :すべての構成キーに、構成と同じ名前が付けられるようになりました。 したがって、キーtask_always_eagerは次のようにアクセスされます。

    >>> app.conf.task_always_eager

    それ以外の:

    >>> from celery import conf
    >>> conf.always_eager
    • .get_queues-> {app.amqp.get_queues}


  • celery.utils.info
    • .humanize_seconds-> celery.utils.time.humanize_seconds

    • .textindent-> celery.utils.textindent

    • .get_broker_info-> {app.amqp.get_broker_info}

    • .format_broker_info-> {app.amqp.format_broker_info}

    • .format_queues-> {app.amqp.format_queues}


デフォルトのアプリの使用法

下位互換性を保つには、明示的なアプリインスタンスを渡さずにすべてのクラス/関数を使用できる必要があります。

これは、アプリインスタンスが欠落している場合に、すべてのアプリ依存オブジェクトにdefault_appを使用させることで実現されます。

from celery.app import app_or_default

class SomeClass:

    def __init__(self, app=None):
        self.app = app_or_default(app)

このアプローチの問題は、途中でアプリインスタンスが失われる可能性があり、すべてが正常に機能しているように見えることです。 アプリインスタンスのリークをテストするのは困難です。 環境変数 CELERY_TRACE_APPを使用できます。これを有効にすると、celery.app.app_or_default()は、デフォルトのアプリインスタンスに戻らなければならないときに例外を発生させます。

アプリの依存関係ツリー

  • *; {app}
    *;* celery.loaders.base.BaseLoader
    • celery.backends.base.BaseBackend
    • *;*; {app.TaskSet}
    • *;*;* celery.task.sets.TaskSetapp.TaskSet
  • *;*; [app.TaskSetResult]
  • *;*;* celery.result.TaskSetResultapp.TaskSetResult
  • *; {app.AsyncResult}
    *;* celery.result.BaseAsyncResult / celery.result.AsyncResult
  • *; celery.bin.worker.WorkerCommand
    *;* *;*; celery.apps.worker.Worker
    • *;*;* *;*;*; celery.worker.WorkerController
      • *;*;*;* *;*;*;*; celery.worker.consumer.Consumer
        • *;*;*;*;* celery.worker.request.Request
          • celery.events.EventDispatcher
          • *;*;*;*;*; celery.worker.control.ControlDispatch
          • *;*;*;*;*;* celery.worker.control.registry.Panel
            • celery.pidbox.BroadcastPublisher
          • celery.pidbox.BroadcastConsumer
        • celery.beat.EmbeddedService
  • *; celery.bin.events.EvCommand
    *;* *;*; celery.events.snapshot.evcam
    • *;*;* celery.events.snapshot.Polaroid
      • celery.events.EventReceiver
    • *;*; celery.events.cursesmon.evtop
    • *;*;* celery.events.EventReceiver
      • celery.events.cursesmon.CursesMonitor
    • *;*; celery.events.dumper
    • *;*;* celery.events.EventReceiver
  • celery.bin.amqp.AMQPAdmin
  • *; celery.bin.beat.BeatCommand
    *;* *;*; celery.apps.beat.Beat
    • *;*;* *;*;*; celery.beat.Service
      • *;*;*;* celery.beat.Scheduler