拡張機能とブートステップ—Pythonドキュメント
拡張機能とブートステップ
カスタムメッセージコンシューマー
メッセージを手動で処理するために、カスタムの昆布消費者を埋め込むことをお勧めします。
そのために、特別なConsumerStep
ブートステップクラスが存在します。このクラスでは、get_consumers
メソッドを定義するだけで、接続が確立されるたびに開始するkombu.Consumer
オブジェクトのリストを返す必要があります。 :
from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://')
class MyConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=['json'])]
def handle_message(self, body, message):
print('Received message: {0!r}'.format(body))
message.ack()
app.steps['consumer'].add(MyConsumerStep)
def send_me_a_message(who, producer=None):
with app.producer_or_acquire(producer) as producer:
producer.publish(
{'hello': who},
serializer='json',
exchange=my_queue.exchange,
routing_key='routing_key',
declare=[my_queue],
retry=True,
)
if __name__ == '__main__':
send_me_a_message('world!')
ノート
昆布消費者は、2つの異なるメッセージコールバックディスパッチングメカニズムを利用できます。 1つ目は(body, message)
署名付きのコールバックのリストを受け入れるcallbacks
引数で、2つ目は(message,)
署名。 後者は、ペイロードを自動的にデコードおよび逆シリアル化しません。
def get_consumers(self, channel):
return [Consumer(channel, queues=[my_queue],
on_message=self.on_message)]
def on_message(self, message):
payload = message.decode()
print(
'Received message: {0!r} {props!r} rawlen={s}'.format(
payload, props=message.properties, s=len(message.body),
))
message.ack()
設計図
ブートステップは、ワーカーに機能を追加するための手法です。 ブートステップは、ワーカーのさまざまな段階でカスタムアクションを実行するためのフックを定義するカスタムクラスです。 すべてのブートステップはブループリントに属し、ワーカーは現在、 Worker と Consumer の2つのブループリントを定義しています。
- 図A:ワーカーとコンシューマーのブループリントのブートステップ。 起動
- ワーカーブループリントの最初のステップは下から順にタイマーであり、最後のステップはコンシューマーブループリントを開始することです。これにより、ブローカー接続が確立され、メッセージの消費が開始されます。
ワーカー
ワーカーは最初に開始する青写真であり、イベントループ、処理プール、ETAタスクやその他の時間指定イベントに使用されるタイマーなどの主要なコンポーネントを開始します。
ワーカーが完全に開始されると、タスクの実行方法を設定し、ブローカーに接続してメッセージコンシューマーを開始するコンシューマーブループリントを続行します。
WorkController
はコアワーカーの実装であり、ブートステップで使用できるいくつかのメソッドと属性が含まれています。
属性
- app
- 現在のアプリインスタンス。
- hostname
- ワーカーノード名(例: [email protected] )
- blueprint
- これはワーカー
Blueprint
です。
- hub
イベントループオブジェクト(
Hub
)。 これを使用して、イベントループにコールバックを登録できます。これは、非同期I / O対応のトランスポート(amqp、redis)でのみサポートされます。この場合、 worker.use_eventloop 属性を設定する必要があります。
これを使用するには、ワーカーブートステップでハブブートステップが必要です。
class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Hub'}
- pool
現在のプロセス/イベントレット/ gevent /スレッドプール。
celery.concurrency.base.BasePool
を参照してください。ワーカーブートステップでは、これを使用するためにプールブートステップが必要です。
class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Pool'}
- timer
Timer
は機能のスケジュールに使用されます。ワーカーブートステップでは、これを使用するためにタイマーブートステップが必要です。
class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Timer'}
- statedb
Database <celery.worker.state.Persistent>`
は、ワーカーの再起動間で状態を保持します。これは、
statedb
引数が有効になっている場合にのみ定義されます。これを使用するには、ワーカーブートステップで
Statedb
ブートステップが必要です。class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Statedb'}
- autoscaler
Autoscaler
は、プール内のプロセスの数を自動的に増減するために使用されます。これは、
autoscale
引数が有効になっている場合にのみ定義されます。これを使用するには、ワーカーのブートステップで Autoscaler ブートステップが必要です。
class WorkerStep(bootsteps.StartStopStep): requires = ('celery.worker.autoscaler:Autoscaler',)
- autoreloader
Autoreloader
は、ファイルシステムが変更されたときに使用コードを自動的に再読み込みするために使用されます。これは、
autoreload
引数が有効になっている場合にのみ定義されます。 これを使用するには、ワーカーのブートステップで Autoreloader ブートステップが必要です。class WorkerStep(bootsteps.StartStopStep): requires = ('celery.worker.autoreloader:Autoreloader',)
ワーカーのブートステップの例
ワーカーのブートステップの例は次のとおりです。
from celery import bootsteps
class ExampleWorkerStep(bootsteps.StartStopStep):
requires = {'celery.worker.components:Pool'}
def __init__(self, worker, **kwargs):
print('Called when the WorkController instance is constructed')
print('Arguments to WorkController: {0!r}'.format(kwargs))
def create(self, worker):
# this method can be used to delegate the action methods
# to another object that implements ``start`` and ``stop``.
return self
def start(self, worker):
print('Called when the worker is started.')
def stop(self, worker):
print('Called when the worker shuts down.')
def terminate(self, worker):
print('Called when the worker terminates')
すべてのメソッドには、現在のWorkController
インスタンスが最初の引数として渡されます。
別の例では、タイマーを使用して定期的にウェイクアップできます。
from celery import bootsteps
class DeadlockDetection(bootsteps.StartStopStep):
requires = {'celery.worker.components:Timer'}
def __init__(self, worker, deadlock_timeout=3600):
self.timeout = deadlock_timeout
self.requests = []
self.tref = None
def start(self, worker):
# run every 30 seconds.
self.tref = worker.timer.call_repeatedly(
30.0, self.detect, (worker,), priority=10,
)
def stop(self, worker):
if self.tref:
self.tref.cancel()
self.tref = None
def detect(self, worker):
# update active requests
for req in worker.active_requests:
if req.time_start and time() - req.time_start > self.timeout:
raise SystemExit()
タスク処理ログのカスタマイズ
Celeryワーカーは、タスクのライフサイクル全体を通じてさまざまなイベントのメッセージをPythonロギングサブシステムに送信します。 これらのメッセージは、celery/app/trace.py
で定義されているLOG_<TYPE>
形式の文字列を上書きすることでカスタマイズできます。 例えば:
import celery.app.trace
celery.app.trace.LOG_SUCCESS = "This is a custom message"
さまざまなフォーマット文字列はすべて、%
フォーマットのタスク名とIDで提供され、それらの一部は、タスクが失敗する原因となった戻り値や例外などの追加フィールドを受け取ります。 これらのフィールドは、次のようなカスタム形式の文字列で使用できます。
import celery.app.trace
celery.app.trace.LOG_REJECTED = "%(name)r is cursed and I won't run it: %(exc)s"
消費者
コンシューマーブループリントはブローカーへの接続を確立し、この接続が失われるたびに再起動されます。 コンシューマーのブートステップには、ワーカーハートビート、リモートコントロールコマンドコンシューマー、そして重要なことに、タスクコンシューマーが含まれます。
コンシューマブートステップを作成するときは、ブループリントを再開できる必要があることを考慮に入れる必要があります。 追加の「shutdown」メソッドがコンシューマーブートステップ用に定義されています。このメソッドは、ワーカーがシャットダウンされたときに呼び出されます。
属性
- app
- 現在のアプリインスタンス。
- controller
- このコンシューマーを作成した親
@WorkController
オブジェクト。
- hostname
- ワーカーノード名(例: [email protected] )
- blueprint
- これはワーカー
Blueprint
です。
- hub
イベントループオブジェクト(
Hub
)。 これを使用して、イベントループにコールバックを登録できます。これは、非同期I / O対応のトランスポート(amqp、redis)でのみサポートされます。この場合、 worker.use_eventloop 属性を設定する必要があります。
これを使用するには、ワーカーブートステップでハブブートステップが必要です。
class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Hub'}
- connection
現在のブローカー接続(
kombu.Connection
)。コンシューマーブートステップでは、これを使用するために「接続」ブートステップが必要です。
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.connection:Connection'}
- event_dispatcher
イベントの送信に使用できる
@events.Dispatcher
オブジェクト。コンシューマーブートステップでは、これを使用するために Events ブートステップが必要です。
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.events:Events'}
- gossip
ワーカー間ブロードキャスト通信(
Gossip
)。コンシューマーブートステップでは、これを使用するために Gossip ブートステップが必要です。
class RatelimitStep(bootsteps.StartStopStep): """Rate limit tasks based on the number of workers in the cluster.""" requires = {'celery.worker.consumer.gossip:Gossip'} def start(self, c): self.c = c self.c.gossip.on.node_join.add(self.on_cluster_size_change) self.c.gossip.on.node_leave.add(self.on_cluster_size_change) self.c.gossip.on.node_lost.add(self.on_node_lost) self.tasks = [ self.app.tasks['proj.tasks.add'] self.app.tasks['proj.tasks.mul'] ] self.last_size = None def on_cluster_size_change(self, worker): cluster_size = len(list(self.c.gossip.state.alive_workers())) if cluster_size != self.last_size: for task in self.tasks: task.rate_limit = 1.0 / cluster_size self.c.reset_rate_limits() self.last_size = cluster_size def on_node_lost(self, worker): # may have processed heartbeat too late, so wake up soon # in order to see if the worker recovered. self.c.timer.call_after(10.0, self.on_cluster_size_change)
コールバック
<set> gossip.on.node_join
新しいノードがクラスターに参加するたびに呼び出され、
Worker
インスタンスを提供します。<set> gossip.on.node_leave
新しいノードがクラスターを離れる(シャットダウンする)たびに呼び出され、
Worker
インスタンスを提供します。<set> gossip.on.node_lost
クラスター内のワーカーインスタンスのハートビートが失われた場合(ハートビートが時間内に受信または処理されなかった場合)に呼び出され、
Worker
インスタンスを提供します。これは必ずしもワーカーが実際にオフラインであることを意味するわけではないため、デフォルトのハートビートタイムアウトでは不十分な場合は、タイムアウトメカニズムを使用してください。
- pool
- 現在のプロセス/イベントレット/ gevent /スレッドプール。
celery.concurrency.base.BasePool
を参照してください。
- timer
Timer <celery.utils.timer2.Schedule
は機能のスケジュールに使用されます。
- heart
ワーカーイベントのハートビート(
Heart
)の送信を担当します。これを使用するには、コンシューマーブートステップで Heart ブートステップが必要です。
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.heart:Heart'}
- task_consumer
タスクメッセージを消費するために使用される
kombu.Consumer
オブジェクト。これを使用するには、コンシューマブートステップで Tasks ブートステップが必要です。
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.tasks:Tasks'}
- strategies
登録されたすべてのタスクタイプには、このマッピングにエントリがあり、この値は、このタスクタイプの着信メッセージを実行するために使用されます(タスク実行戦略)。 このマッピングは、コンシューマーが開始したときにタスクのブートステップによって生成されます。
for name, task in app.tasks.items(): strategies[name] = task.start_strategy(app, consumer) task.__trace__ = celery.app.trace.build_tracer( name, task, loader, hostname )
これを使用するには、コンシューマブートステップで Tasks ブートステップが必要です。
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.tasks:Tasks'}
- task_buckets
タイプごとにタスクのレート制限を検索するために使用される
defaultdict
。 このdictのエントリは、None(制限なし)またはconsume(tokens)
およびexpected_time(tokens)
を実装するTokenBucket
インスタンスの場合があります。TokenBucketはトークンバケットアルゴリズムを実装しますが、同じインターフェースに準拠し、上記の2つの方法を定義する限り、任意のアルゴリズムを使用できます。
- qos
QoS
オブジェクトを使用して、タスクチャネルの現在のprefetch_count値を変更できます。# increment at next cycle consumer.qos.increment_eventually(1) # decrement at next cycle consumer.qos.decrement_eventually(1) consumer.qos.set(10)
メソッド
- consumer.reset_rate_limits()
- 登録されているすべてのタスクタイプの
task_buckets
マッピングを更新します。
- consumer.bucket_for_task(type, Bucket=TokenBucket)
task.rate_limit
属性を使用して、タスクのレート制限バケットを作成します。
- consumer.add_task_queue(name, exchange=None, exchange_type=None,
routing_key=None, \*\*options):
- 消費する新しいキューを追加します。 これは、接続を再開しても持続します。
- consumer.cancel_task_queue(name)
- 名前によるキューからの消費を停止します。 これは、接続を再開しても持続します。
- apply_eta_task(request)
request.eta
属性に基づいて実行するETAタスクをスケジュールします。 (Request
)
ブートステップのインストール
app.steps['worker']
およびapp.steps['consumer']
を変更して、新しいブートステップを追加できます。
>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep) # < add class, don't instantiate
>>> app.steps['consumer'].add(MyConsumerStep)
>>> app.steps['consumer'].update([StepA, StepB])
>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}
ステップの順序は、結果の依存関係グラフ(Step.requires
)によって決定されるため、ここでは重要ではありません。
ブートステップをインストールする方法とその動作を説明するために、これはいくつかの役に立たないデバッグ情報を出力するステップの例です。 ワーカーとコンシューマーの両方のブートステップとして追加できます。
from celery import Celery
from celery import bootsteps
class InfoStep(bootsteps.Step):
def __init__(self, parent, **kwargs):
# here we can prepare the Worker/Consumer object
# in any way we want, set attribute defaults, and so on.
print('{0!r} is in init'.format(parent))
def start(self, parent):
# our step is started together with all other Worker/Consumer
# bootsteps.
print('{0!r} is starting'.format(parent))
def stop(self, parent):
# the Consumer calls stop every time the consumer is
# restarted (i.e., connection is lost) and also at shutdown.
# The Worker will call stop at shutdown only.
print('{0!r} is stopping'.format(parent))
def shutdown(self, parent):
# shutdown is called by the Consumer at shutdown, it's not
# called by Worker.
print('{0!r} is shutting down'.format(parent))
app = Celery(broker='amqp://')
app.steps['worker'].add(InfoStep)
app.steps['consumer'].add(InfoStep)
このステップをインストールしてワーカーを起動すると、次のログが表示されます。
<Worker: [email protected] (initializing)> is in init
<Consumer: [email protected] (initializing)> is in init
[2013-05-29 16:18:20,544: WARNING/MainProcess]
<Worker: [email protected] (running)> is starting
[2013-05-29 16:18:21,577: WARNING/MainProcess]
<Consumer: [email protected] (running)> is starting
<Consumer: [email protected] (closing)> is stopping
<Worker: [email protected] (closing)> is stopping
<Consumer: [email protected] (terminating)> is shutting down
print
ステートメントは、ワーカーが初期化された後にロギングサブシステムにリダイレクトされるため、「開始中」の行にはタイムスタンプが付けられます。 これは、シャットダウン時に発生しなくなったことに気付くかもしれません。これは、stop
メソッドとshutdown
メソッドがシグナルハンドラー内で呼び出され、ロギングを使用するのは安全ではないためです。そのようなハンドラーの内部。 Pythonロギングモジュールを使用したロギングは reentrant ではありません。つまり、関数を中断して後で再度呼び出すことはできません。 作成するstop
およびshutdown
メソッドもリエントラントであることが重要です。
--loglevel=debug
でワーカーを起動すると、起動プロセスに関する詳細情報が表示されます。
[2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at 0x101ad8410> is in init
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
{Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
[2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
{Connection, Mingle, Events, Gossip, InfoStep, Agent,
Heart, Control, Tasks, event loop}
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
[2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
[2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
[2013-05-29 16:18:20,544: WARNING/MainProcess]
<celery.apps.worker.Worker object at 0x101ad8410> is starting
[2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
[2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://[email protected]:5672//
[2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
[2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
[2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
[2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
[2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
[2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
[2013-05-29 16:18:21,577: WARNING/MainProcess]
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
[2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
[2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
[2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
[2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
[2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
[2013-05-29 16:18:21,608: WARNING/MainProcess] [email protected] ready.
コマンドラインプログラム
新しいコマンドラインオプションの追加
コマンド固有のオプション
アプリケーションインスタンスの@user_options
属性を変更することにより、worker
、beat
、およびevents
コマンドにコマンドラインオプションを追加できます。
Celeryコマンドは、click
モジュールを使用してコマンドライン引数を解析するため、カスタム引数を追加するには、click.Option
インスタンスを関連するセットに追加する必要があります。
celeryworker コマンドにカスタムオプションを追加する例:
from celery import Celery
from click import Option
app = Celery(broker='amqp://')
app.user_options['worker'].add(Option(('--enable-my-option',),
is_flag=True,
help='Enable custom option.'))
すべてのブートステップは、Bootstep.__init__
へのキーワード引数としてこの引数を受け取るようになります。
from celery import bootsteps
class MyBootstep(bootsteps.Step):
def __init__(self, parent, enable_my_option=False, **options):
super().__init__(parent, **options)
if enable_my_option:
party()
app.steps['worker'].add(MyBootstep)
プリロードオプション
celery アンブレラコマンドは、「プリロードオプション」の概念をサポートしています。 これらは、すべてのサブコマンドに渡される特別なオプションです。
たとえば、構成テンプレートを指定するために、新しいプリロードオプションを追加できます。
from celery import Celery
from celery import signals
from click import Option
app = Celery()
app.user_options['preload'].add(Option(('-Z', '--template'),
default='default',
help='Configuration template to use.'))
@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
use_template(options['template'])
新しいセロリサブコマンドの追加
setuptoolsエントリポイントを使用して、 celery アンブレラコマンドに新しいコマンドを追加できます。
エントリポイントは、パッケージsetup.py
プログラムに追加できる特別なメタデータであり、インストール後、pkg_resources
モジュールを使用してシステムから読み取ることができます。
Celeryは、celery.commands
エントリポイントを認識して、追加のサブコマンドをインストールします。エントリポイントの値は、有効なクリックコマンドを指している必要があります。
これは、:pypi: `Flower` 監視拡張機能が、setup.py
にエントリポイントを追加することにより、 celery flower コマンドを追加する方法です。
setup(
name='flower',
entry_points={
'celery.commands': [
'flower = flower.command:flower',
],
}
)
コマンド定義は、等号で区切られた2つの部分に分かれています。最初の部分はサブコマンド(花)の名前で、2番目の部分はコマンドを実装する関数への完全修飾シンボルパスです。
flower.command:flower
モジュールパスと属性の名前は、上記のようにコロンで区切る必要があります。
モジュールflower/command.py
では、コマンド関数は次のように定義できます。
import click
@click.command()
@click.option('--port', default=8888, type=int, help='Webserver port')
@click.option('--debug', is_flag=True)
def flower(port, debug):
print('Running our command')
ワーカーAPI
Hub-ワーカー非同期イベントループ
- サポートされているトランスポート
- amqp、redis
バージョン3.0の新機能。
amqpまたはredisブローカートランスポートが使用される場合、ワーカーは非同期I / Oを使用します。 最終的な目標は、すべてのトランスポートがイベントループを使用することですが、それには時間がかかるため、他のトランスポートは引き続きスレッドベースのソリューションを使用します。
- hub.add(fd, callback, flags)
- hub.add_reader(fd, callback, \*args)
fd
が読み取り可能になったときに呼び出されるコールバックを追加します。hub.remove(fd)を使用して明示的に削除されるか、ファイル記述子が無効になったために自動的に破棄されるまで、コールバックは登録されたままになります。
特定のファイル記述子に一度に登録できるコールバックは1つだけであるため、
add
をもう一度呼び出すと、そのファイル記述子に以前に登録されたコールバックが削除されることに注意してください。ファイル記述子は、
fileno
メソッドをサポートするファイルのようなオブジェクトです。または、ファイル記述子番号(int)にすることもできます。
- hub.add_writer(fd, callback, \*args)
fd
が書き込み可能であるときに呼び出されるコールバックを追加します。 上記の hub.add_reader()に関する注記も参照してください。
- hub.remove(fd)
- ファイル記述子
fd
のすべてのコールバックをループから削除します。
タイマー-イベントのスケジュール
- timer.call_after(secs, callback, args=(), kwargs=(),
priority=0)
- timer.call_repeatedly(secs, callback, args=(), kwargs=(),
priority=0)
- timer.call_at(eta, callback, args=(), kwargs=(),
priority=0)