「ビッグインスタンス」リファクタリング—Pythonドキュメント
「ビッグインスタンス」リファクタリング
app ブランチは、Celeryでのグローバル構成の使用を削除するための進行中の作業です。
Celeryをインスタンス化できるようになり、Celeryの複数のインスタンスが同じプロセススペースに存在する可能性があります。 また、モンキーパッチを使わずに大きなパーツをカスタマイズできます。
例
Celeryインスタンスの作成:
>>> from celery import Celery
>>> app = Celery()
>>> app.config_from_object('celeryconfig')
>>> #app.config_from_envvar('CELERY_CONFIG_MODULE')
タスクの作成:
@app.task
def add(x, y):
return x + y
カスタムタスクサブクラスの作成:
Task = celery.create_task_cls()
class DebugTask(Task):
def on_failure(self, *args, **kwargs):
import pdb
pdb.set_trace()
@app.task(base=DebugTask)
def add(x, y):
return x + y
ワーカーの開始:
worker = celery.Worker(loglevel='INFO')
構成へのアクセスの取得:
celery.conf.task_always_eager = True
celery.conf['task_always_eager'] = True
労働者の管理:
>>> celery.control.inspect().active()
>>> celery.control.rate_limit(add.name, '100/m')
>>> celery.control.broadcast('shutdown')
>>> celery.control.discard_all()
その他の興味深い属性:
# Establish broker connection.
>>> celery.broker_connection()
# AMQP Specific features.
>>> celery.amqp
>>> celery.amqp.Router
>>> celery.amqp.get_queues()
>>> celery.amqp.get_task_consumer()
# Loader
>>> celery.loader
# Default backend
>>> celery.backend
おそらくお分かりのように、これはカスタマイズ機能の別の側面を実際に開きます。
非推奨
celery.task.ping
celery.task.PingTask
pingリモートコントロールコマンドより劣っています。 Celery2.3で削除されます。
エイリアス(保留中の非推奨)
celery.execute
.send_task
-> {app.send_task
}.delay_task
-> 代替手段なし
celery.log
.get_default_logger
-> {app.log.get_default_logger
}.setup_logger
-> {app.log.setup_logger
}.get_task_logger
-> {app.log.get_task_logger
}.setup_task_logger
-> {app.log.setup_task_logger
}.setup_logging_subsystem
-> {app.log.setup_logging_subsystem
}.redirect_stdouts_to_logger
-> {app.log.redirect_stdouts_to_logger
}
celery.messaging
.establish_connection
-> {app.broker_connection
}.with_connection
-> {app.with_connection
}.get_consumer_set
-> {app.amqp.get_task_consumer
}.TaskPublisher
-> {app.amqp.TaskPublisher
}.TaskConsumer
-> {app.amqp.TaskConsumer
}.ConsumerSet
-> {app.amqp.ConsumerSet
}
celery.conf.*
-> {app.conf
}注:すべての構成キーに、構成と同じ名前が付けられるようになりました。 したがって、キー
task_always_eager
は次のようにアクセスされます。>>> app.conf.task_always_eager
それ以外の:
>>> from celery import conf >>> conf.always_eager
.get_queues
-> {app.amqp.get_queues
}
celery.utils.info
.humanize_seconds
->celery.utils.time.humanize_seconds
.textindent
->celery.utils.textindent
.get_broker_info
-> {app.amqp.get_broker_info
}.format_broker_info
-> {app.amqp.format_broker_info
}.format_queues
-> {app.amqp.format_queues
}
デフォルトのアプリの使用法
下位互換性を保つには、明示的なアプリインスタンスを渡さずにすべてのクラス/関数を使用できる必要があります。
これは、アプリインスタンスが欠落している場合に、すべてのアプリ依存オブジェクトにdefault_app
を使用させることで実現されます。
from celery.app import app_or_default
class SomeClass:
def __init__(self, app=None):
self.app = app_or_default(app)
このアプローチの問題は、途中でアプリインスタンスが失われる可能性があり、すべてが正常に機能しているように見えることです。 アプリインスタンスのリークをテストするのは困難です。 環境変数 CELERY_TRACE_APP
を使用できます。これを有効にすると、celery.app.app_or_default()
は、デフォルトのアプリインスタンスに戻らなければならないときに例外を発生させます。
アプリの依存関係ツリー
- *; {
app
}- *;*
celery.loaders.base.BaseLoader
celery.backends.base.BaseBackend
- *;*; {
app.TaskSet
}
- *;*;*
celery.task.sets.TaskSet
(app.TaskSet
)
- *;*;*
- *;*
- *;*; [
app.TaskSetResult
]
- *;*; [
- *;*;*
celery.result.TaskSetResult
(app.TaskSetResult
)
- *;*;*
app.AsyncResult
}
- *;*
celery.result.BaseAsyncResult
/celery.result.AsyncResult
celery.bin.worker.WorkerCommand
- *;* *;*;
celery.apps.worker.Worker
- *;*;* *;*;*;
celery.worker.WorkerController
- *;*;* *;*;*;
- *;*;*;* *;*;*;*;
celery.worker.consumer.Consumer
- *;*;*;* *;*;*;*;
- *;*;*;*;*
celery.worker.request.Request
- *;*;*;*;*
celery.events.EventDispatcher
- *;*;*;*;*;
celery.worker.control.ControlDispatch
- *;*;*;*;*;*
celery.worker.control.registry.Panel
- *;*;*;*;*;*
celery.pidbox.BroadcastPublisher
celery.pidbox.BroadcastConsumer
celery.beat.EmbeddedService
celery.bin.events.EvCommand
- *;* *;*;
celery.events.snapshot.evcam
- *;*;*
celery.events.snapshot.Polaroid
- *;*;*
celery.events.EventReceiver
- *;*;
celery.events.cursesmon.evtop
- *;*;
- *;*;*
celery.events.EventReceiver
- *;*;*
celery.events.cursesmon.CursesMonitor
- *;*;
celery.events.dumper
- *;*;
- *;*;*
celery.events.EventReceiver
- *;*;*
celery.bin.amqp.AMQPAdmin
celery.bin.beat.BeatCommand
- *;* *;*;
celery.apps.beat.Beat
- *;*;* *;*;*;
celery.beat.Service
- *;*;* *;*;*;
- *;*;*;*
celery.beat.Scheduler
- *;*;*;*