キュー—Pythonドキュメント

提供:Dev Guides
< PythonPython/docs/3.8/library/asyncio-queue
移動先:案内検索

キュー

ソースコード: :source: `Lib / asyncio / queues.py`



asyncioキューは、 queue モジュールのクラスと同様に設計されています。 asyncioキューはスレッドセーフではありませんが、特にasync / awaitコードで使用するように設計されています。

asyncioキューのメソッドには、 timeout パラメーターがないことに注意してください。 asyncio.wait_for()関数を使用して、タイムアウト付きのキュー操作を実行します。

以下のセクションも参照してください。

class asyncio.Queue(maxsize=0, \*, loop=None)

先入れ先出し(FIFO)キュー。

maxsize がゼロ以下の場合、キューサイズは無限大です。 0より大きい整数の場合、キューが maxsize に達すると、get()によってアイテムが削除されるまで、await put()がブロックされます。

標準ライブラリスレッドキューとは異なり、キューのサイズは常にわかっており、 qsize()メソッドを呼び出すことで返すことができます。

このクラスはスレッドセーフではありません

maxsize

キューで許可されているアイテムの数。

empty()

キューが空の場合はTrueを返し、それ以外の場合はFalseを返します。

full()

キューに maxsize アイテムがある場合は、Trueを返します。

キューがmaxsize=0(デフォルト)で初期化された場合、 full()Trueを返しません。

get_nowait()

アイテムがすぐに利用できる場合はアイテムを返します。それ以外の場合は、 QueueEmpty を上げます。

put_nowait(item)

ブロックせずにアイテムをキューに入れます。

すぐに利用できる空きスロットがない場合は、 QueueFull を上げます。

qsize()

キュー内のアイテムの数を返します。

task_done()

以前にキューに入れられたタスクが完了したことを示します。

キューコンシューマーによって使用されます。 タスクのフェッチに使用されるget()ごとに、 task_done()への後続の呼び出しは、タスクの処理が完了したことをキューに通知します。

join()が現在ブロックしている場合、すべてのアイテムが処理されると再開されます(つまり、put()待ち行列)。

キューに配置されたアイテムよりも多く呼び出された場合、 ValueError を発生させます。


優先キュー

class asyncio.PriorityQueue

キューのバリアント。 優先順位の高い順にエントリを取得します(最初に低いものから)。

エントリは通常、(priority_number, data)の形式のタプルです。


LIFOキュー

class asyncio.LifoQueue
Queue のバリアントで、最後に追加されたエントリを最初に取得します(ラストイン、ファーストアウト)。


例外

exception asyncio.QueueEmpty
この例外は、 get_nowait()メソッドが空のキューで呼び出されたときに発生します。
exception asyncio.QueueFull
maxsize に達したキューで put_nowait()メソッドが呼び出されたときに例外が発生しました。


キューを使用して、複数の同時タスク間でワークロードを分散できます。

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())