拡張機能とブートステップ—Pythonドキュメント

提供:Dev Guides
Celery/docs/latest/userguide/extending
移動先:案内検索

拡張機能とブートステップ

カスタムメッセージコンシューマー

メッセージを手動で処理するために、カスタムの昆布消費者を埋め込むことをお勧めします。

そのために、特別なConsumerStepブートステップクラスが存在します。このクラスでは、get_consumersメソッドを定義するだけで、接続が確立されるたびに開始するkombu.Consumerオブジェクトのリストを返す必要があります。 :

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

def send_me_a_message(who, producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.publish(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )

if __name__ == '__main__':
    send_me_a_message('world!')

ノート

昆布消費者は、2つの異なるメッセージコールバックディスパッチングメカニズムを利用できます。 1つ目は(body, message)署名付きのコールバックのリストを受け入れるcallbacks引数で、2つ目は(message,)署名。 後者は、ペイロードを自動的にデコードおよび逆シリアル化しません。

def get_consumers(self, channel):
    return [Consumer(channel, queues=[my_queue],
                     on_message=self.on_message)]


def on_message(self, message):
    payload = message.decode()
    print(
        'Received message: {0!r} {props!r} rawlen={s}'.format(
        payload, props=message.properties, s=len(message.body),
    ))
    message.ack()

設計図

ブートステップは、ワーカーに機能を追加するための手法です。 ブートステップは、ワーカーのさまざまな段階でカスタムアクションを実行するためのフックを定義するカスタムクラスです。 すべてのブートステップはブループリントに属し、ワーカーは現在、 WorkerConsumer の2つのブループリントを定義しています。



図A:ワーカーとコンシューマーのブループリントのブートステップ。 起動
ワーカーブループリントの最初のステップは下から順にタイマーであり、最後のステップはコンシューマーブループリントを開始することです。これにより、ブローカー接続が確立され、メッセージの消費が開始されます。

thumb|none




ワーカー

ワーカーは最初に開始する青写真であり、イベントループ、処理プール、ETAタスクやその他の時間指定イベントに使用されるタイマーなどの主要なコンポーネントを開始します。

ワーカーが完全に開始されると、タスクの実行方法を設定し、ブローカーに接続してメッセージコンシューマーを開始するコンシューマーブループリントを続行します。

WorkControllerはコアワーカーの実装であり、ブートステップで使用できるいくつかのメソッドと属性が含まれています。

属性

app
現在のアプリインスタンス。

hostname
ワーカーノード名(例: [email protected]

blueprint
これはワーカーBlueprintです。

hub

イベントループオブジェクト(Hub)。 これを使用して、イベントループにコールバックを登録できます。

これは、非同期I / O対応のトランスポート(amqp、redis)でのみサポートされます。この場合、 worker.use_eventloop 属性を設定する必要があります。

これを使用するには、ワーカーブートステップでハブブートステップが必要です。

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}

pool

現在のプロセス/イベントレット/ gevent /スレッドプール。 celery.concurrency.base.BasePoolを参照してください。

ワーカーブートステップでは、これを使用するためにプールブートステップが必要です。

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}

timer

Timerは機能のスケジュールに使用されます。

ワーカーブートステップでは、これを使用するためにタイマーブートステップが必要です。

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}

statedb

Database <celery.worker.state.Persistent>`は、ワーカーの再起動間で状態を保持します。

これは、statedb引数が有効になっている場合にのみ定義されます。

これを使用するには、ワーカーブートステップでStatedbブートステップが必要です。

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Statedb'}

autoscaler

Autoscalerは、プール内のプロセスの数を自動的に増減するために使用されます。

これは、autoscale引数が有効になっている場合にのみ定義されます。

これを使用するには、ワーカーのブートステップで Autoscaler ブートステップが必要です。

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoscaler:Autoscaler',)

autoreloader

Autoreloaderは、ファイルシステムが変更されたときに使用コードを自動的に再読み込みするために使用されます。

これは、autoreload引数が有効になっている場合にのみ定義されます。 これを使用するには、ワーカーのブートステップで Autoreloader ブートステップが必要です。

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoreloader:Autoreloader',)


ワーカーのブートステップの例

ワーカーのブートステップの例は次のとおりです。

from celery import bootsteps

class ExampleWorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}

    def __init__(self, worker, **kwargs):
        print('Called when the WorkController instance is constructed')
        print('Arguments to WorkController: {0!r}'.format(kwargs))

    def create(self, worker):
        # this method can be used to delegate the action methods
        # to another object that implements ``start`` and ``stop``.
        return self

    def start(self, worker):
        print('Called when the worker is started.')

    def stop(self, worker):
        print('Called when the worker shuts down.')

    def terminate(self, worker):
        print('Called when the worker terminates')

すべてのメソッドには、現在のWorkControllerインスタンスが最初の引数として渡されます。

別の例では、タイマーを使用して定期的にウェイクアップできます。

from celery import bootsteps


class DeadlockDetection(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}

    def __init__(self, worker, deadlock_timeout=3600):
        self.timeout = deadlock_timeout
        self.requests = []
        self.tref = None

    def start(self, worker):
        # run every 30 seconds.
        self.tref = worker.timer.call_repeatedly(
            30.0, self.detect, (worker,), priority=10,
        )

    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None

    def detect(self, worker):
        # update active requests
        for req in worker.active_requests:
            if req.time_start and time() - req.time_start > self.timeout:
                raise SystemExit()

タスク処理ログのカスタマイズ

Celeryワーカーは、タスクのライフサイクル全体を通じてさまざまなイベントのメッセージをPythonロギングサブシステムに送信します。 これらのメッセージは、celery/app/trace.pyで定義されているLOG_<TYPE>形式の文字列を上書きすることでカスタマイズできます。 例えば:

import celery.app.trace

celery.app.trace.LOG_SUCCESS = "This is a custom message"

さまざまなフォーマット文字列はすべて、%フォーマットのタスク名とIDで提供され、それらの一部は、タスクが失敗する原因となった戻り値や例外などの追加フィールドを受け取ります。 これらのフィールドは、次のようなカスタム形式の文字列で使用できます。

import celery.app.trace

celery.app.trace.LOG_REJECTED = "%(name)r is cursed and I won't run it: %(exc)s"

消費者

コンシューマーブループリントはブローカーへの接続を確立し、この接続が失われるたびに再起動されます。 コンシューマーのブートステップには、ワーカーハートビート、リモートコントロールコマンドコンシューマー、そして重要なことに、タスクコンシューマーが含まれます。

コンシューマブートステップを作成するときは、ブループリントを再開できる必要があることを考慮に入れる必要があります。 追加の「shutdown」メソッドがコンシューマーブートステップ用に定義されています。このメソッドは、ワーカーがシャットダウンされたときに呼び出されます。

属性

app
現在のアプリインスタンス。

controller
このコンシューマーを作成した親@WorkControllerオブジェクト。

hostname
ワーカーノード名(例: [email protected]

blueprint
これはワーカーBlueprintです。

hub

イベントループオブジェクト(Hub)。 これを使用して、イベントループにコールバックを登録できます。

これは、非同期I / O対応のトランスポート(amqp、redis)でのみサポートされます。この場合、 worker.use_eventloop 属性を設定する必要があります。

これを使用するには、ワーカーブートステップでハブブートステップが必要です。

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}

connection

現在のブローカー接続(kombu.Connection)。

コンシューマーブートステップでは、これを使用するために「接続」ブートステップが必要です。

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.connection:Connection'}

event_dispatcher

イベントの送信に使用できる@events.Dispatcherオブジェクト。

コンシューマーブートステップでは、これを使用するために Events ブートステップが必要です。

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.events:Events'}

gossip

ワーカー間ブロードキャスト通信(Gossip)。

コンシューマーブートステップでは、これを使用するために Gossip ブートステップが必要です。

class RatelimitStep(bootsteps.StartStopStep):
    """Rate limit tasks based on the number of workers in the
    cluster."""
    requires = {'celery.worker.consumer.gossip:Gossip'}

    def start(self, c):
        self.c = c
        self.c.gossip.on.node_join.add(self.on_cluster_size_change)
        self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
        self.c.gossip.on.node_lost.add(self.on_node_lost)
        self.tasks = [
            self.app.tasks['proj.tasks.add']
            self.app.tasks['proj.tasks.mul']
        ]
        self.last_size = None

    def on_cluster_size_change(self, worker):
        cluster_size = len(list(self.c.gossip.state.alive_workers()))
        if cluster_size != self.last_size:
            for task in self.tasks:
                task.rate_limit = 1.0 / cluster_size
            self.c.reset_rate_limits()
            self.last_size = cluster_size

    def on_node_lost(self, worker):
        # may have processed heartbeat too late, so wake up soon
        # in order to see if the worker recovered.
        self.c.timer.call_after(10.0, self.on_cluster_size_change)

コールバック

  • <set> gossip.on.node_join

    新しいノードがクラスターに参加するたびに呼び出され、Workerインスタンスを提供します。

  • <set> gossip.on.node_leave

    新しいノードがクラスターを離れる(シャットダウンする)たびに呼び出され、Workerインスタンスを提供します。

  • <set> gossip.on.node_lost

    クラスター内のワーカーインスタンスのハートビートが失われた場合(ハートビートが時間内に受信または処理されなかった場合)に呼び出され、Workerインスタンスを提供します。

    これは必ずしもワーカーが実際にオフラインであることを意味するわけではないため、デフォルトのハートビートタイムアウトでは不十分な場合は、タイムアウトメカニズムを使用してください。

pool
現在のプロセス/イベントレット/ gevent /スレッドプール。 celery.concurrency.base.BasePoolを参照してください。

timer
Timer <celery.utils.timer2.Scheduleは機能のスケジュールに使用されます。

heart

ワーカーイベントのハートビート(Heart)の送信を担当します。

これを使用するには、コンシューマーブートステップで Heart ブートステップが必要です。

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.heart:Heart'}

task_consumer

タスクメッセージを消費するために使用されるkombu.Consumerオブジェクト。

これを使用するには、コンシューマブートステップで Tasks ブートステップが必要です。

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}

strategies

登録されたすべてのタスクタイプには、このマッピングにエントリがあり、この値は、このタスクタイプの着信メッセージを実行するために使用されます(タスク実行戦略)。 このマッピングは、コンシューマーが開始したときにタスクのブートステップによって生成されます。

for name, task in app.tasks.items():
    strategies[name] = task.start_strategy(app, consumer)
    task.__trace__ = celery.app.trace.build_tracer(
        name, task, loader, hostname
    )

これを使用するには、コンシューマブートステップで Tasks ブートステップが必要です。

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}

task_buckets

タイプごとにタスクのレート制限を検索するために使用されるdefaultdict。 このdictのエントリは、None(制限なし)またはconsume(tokens)およびexpected_time(tokens)を実装するTokenBucketインスタンスの場合があります。

TokenBucketはトークンバケットアルゴリズムを実装しますが、同じインターフェースに準拠し、上記の2つの方法を定義する限り、任意のアルゴリズムを使用できます。

qos

QoSオブジェクトを使用して、タスクチャネルの現在のprefetch_count値を変更できます。

# increment at next cycle
consumer.qos.increment_eventually(1)
# decrement at next cycle
consumer.qos.decrement_eventually(1)
consumer.qos.set(10)


メソッド

consumer.reset_rate_limits()
登録されているすべてのタスクタイプのtask_bucketsマッピングを更新します。
consumer.bucket_for_task(type, Bucket=TokenBucket)
task.rate_limit属性を使用して、タスクのレート制限バケットを作成します。
consumer.add_task_queue(name, exchange=None, exchange_type=None,

routing_key=None, \*\*options):

消費する新しいキューを追加します。 これは、接続を再開しても持続します。
consumer.cancel_task_queue(name)
名前によるキューからの消費を停止します。 これは、接続を再開しても持続します。
apply_eta_task(request)
request.eta属性に基づいて実行するETAタスクをスケジュールします。 (Request


ブートステップのインストール

app.steps['worker']およびapp.steps['consumer']を変更して、新しいブートステップを追加できます。

>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep)  # < add class, don't instantiate
>>> app.steps['consumer'].add(MyConsumerStep)

>>> app.steps['consumer'].update([StepA, StepB])

>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}

ステップの順序は、結果の依存関係グラフ(Step.requires)によって決定されるため、ここでは重要ではありません。

ブートステップをインストールする方法とその動作を説明するために、これはいくつかの役に立たないデバッグ情報を出力するステップの例です。 ワーカーとコンシューマーの両方のブートステップとして追加できます。

from celery import Celery
from celery import bootsteps

class InfoStep(bootsteps.Step):

    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults, and so on.
        print('{0!r} is in init'.format(parent))

    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))

    def stop(self, parent):
        # the Consumer calls stop every time the consumer is
        # restarted (i.e., connection is lost) and also at shutdown.
        # The Worker will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))

    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))

    app = Celery(broker='amqp://')
    app.steps['worker'].add(InfoStep)
    app.steps['consumer'].add(InfoStep)

このステップをインストールしてワーカーを起動すると、次のログが表示されます。

<Worker: [email protected] (initializing)> is in init
<Consumer: [email protected] (initializing)> is in init
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <Worker: [email protected] (running)> is starting
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <Consumer: [email protected] (running)> is starting
<Consumer: [email protected] (closing)> is stopping
<Worker: [email protected] (closing)> is stopping
<Consumer: [email protected] (terminating)> is shutting down

printステートメントは、ワーカーが初期化された後にロギングサブシステムにリダイレクトされるため、「開始中」の行にはタイムスタンプが付けられます。 これは、シャットダウン時に発生しなくなったことに気付くかもしれません。これは、stopメソッドとshutdownメソッドがシグナルハンドラー内で呼び出され、ロギングを使用するのは安全ではないためです。そのようなハンドラーの内部。 Pythonロギングモジュールを使用したロギングは reentrant ではありません。つまり、関数を中断して後で再度呼び出すことはできません。 作成するstopおよびshutdownメソッドもリエントラントであることが重要です。

--loglevel=debugでワーカーを起動すると、起動プロセスに関する詳細情報が表示されます。

[2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at 0x101ad8410> is in init
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
    {Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
[2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
    {Connection, Mingle, Events, Gossip, InfoStep, Agent,
     Heart, Control, Tasks, event loop}
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
[2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
[2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <celery.apps.worker.Worker object at 0x101ad8410> is starting
[2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
[2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://[email protected]:5672//
[2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
[2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
[2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
[2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
[2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
[2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
[2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
[2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
[2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
[2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
[2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
[2013-05-29 16:18:21,608: WARNING/MainProcess] [email protected] ready.

コマンドラインプログラム

新しいコマンドラインオプションの追加

コマンド固有のオプション

アプリケーションインスタンスの@user_options属性を変更することにより、workerbeat、およびeventsコマンドにコマンドラインオプションを追加できます。

Celeryコマンドは、clickモジュールを使用してコマンドライン引数を解析するため、カスタム引数を追加するには、click.Optionインスタンスを関連するセットに追加する必要があります。

celeryworker コマンドにカスタムオプションを追加する例:

from celery import Celery
from click import Option

app = Celery(broker='amqp://')

app.user_options['worker'].add(Option(('--enable-my-option',),
                                      is_flag=True,
                                      help='Enable custom option.'))

すべてのブートステップは、Bootstep.__init__へのキーワード引数としてこの引数を受け取るようになります。

from celery import bootsteps

class MyBootstep(bootsteps.Step):

    def __init__(self, parent, enable_my_option=False, **options):
        super().__init__(parent, **options)
        if enable_my_option:
            party()

app.steps['worker'].add(MyBootstep)

プリロードオプション

celery アンブレラコマンドは、「プリロードオプション」の概念をサポートしています。 これらは、すべてのサブコマンドに渡される特別なオプションです。

たとえば、構成テンプレートを指定するために、新しいプリロードオプションを追加できます。

from celery import Celery
from celery import signals
from click import Option

app = Celery()

app.user_options['preload'].add(Option(('-Z', '--template'),
                                       default='default',
                                       help='Configuration template to use.'))

@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
    use_template(options['template'])

新しいセロリサブコマンドの追加

setuptoolsエントリポイントを使用して、 celery アンブレラコマンドに新しいコマンドを追加できます。

エントリポイントは、パッケージsetup.pyプログラムに追加できる特別なメタデータであり、インストール後、pkg_resourcesモジュールを使用してシステムから読み取ることができます。

Celeryは、celery.commandsエントリポイントを認識して、追加のサブコマンドをインストールします。エントリポイントの値は、有効なクリックコマンドを指している必要があります。

これは、:pypi: `Flower` 監視拡張機能が、setup.pyにエントリポイントを追加することにより、 celery flower コマンドを追加する方法です。

setup(
    name='flower',
    entry_points={
        'celery.commands': [
           'flower = flower.command:flower',
        ],
    }
)

コマンド定義は、等号で区切られた2つの部分に分かれています。最初の部分はサブコマンド(花)の名前で、2番目の部分はコマンドを実装する関数への完全修飾シンボルパスです。

flower.command:flower

モジュールパスと属性の名前は、上記のようにコロンで区切る必要があります。

モジュールflower/command.pyでは、コマンド関数は次のように定義できます。

import click

@click.command()
@click.option('--port', default=8888, type=int, help='Webserver port')
@click.option('--debug', is_flag=True)
def flower(port, debug):
    print('Running our command')

ワーカーAPI

Hub-ワーカー非同期イベントループ

サポートされているトランスポート
amqp、redis

バージョン3.0の新機能。


amqpまたはredisブローカートランスポートが使用される場合、ワーカーは非同期I / Oを使用します。 最終的な目標は、すべてのトランスポートがイベントループを使用することですが、それには時間がかかるため、他のトランスポートは引き続きスレッドベースのソリューションを使用します。

hub.add(fd, callback, flags)
hub.add_reader(fd, callback, \*args)

fdが読み取り可能になったときに呼び出されるコールバックを追加します。

hub.remove(fd)を使用して明示的に削除されるか、ファイル記述子が無効になったために自動的に破棄されるまで、コールバックは登録されたままになります。

特定のファイル記述子に一度に登録できるコールバックは1つだけであるため、addをもう一度呼び出すと、そのファイル記述子に以前に登録されたコールバックが削除されることに注意してください。

ファイル記述子は、filenoメソッドをサポートするファイルのようなオブジェクトです。または、ファイル記述子番号(int)にすることもできます。

hub.add_writer(fd, callback, \*args)
fdが書き込み可能であるときに呼び出されるコールバックを追加します。 上記の hub.add_reader()に関する注記も参照してください。
hub.remove(fd)
ファイル記述子fdのすべてのコールバックをループから削除します。


タイマー-イベントのスケジュール

timer.call_after(secs, callback, args=(), kwargs=(),

priority=0)

timer.call_repeatedly(secs, callback, args=(), kwargs=(),

priority=0)

timer.call_at(eta, callback, args=(), kwargs=(),

priority=0)