キュー
ソースコード: :source: `Lib / asyncio / queues.py`
asyncioキューは、 queue モジュールのクラスと同様に設計されています。 asyncioキューはスレッドセーフではありませんが、特にasync / awaitコードで使用するように設計されています。
asyncioキューのメソッドには、 timeout パラメーターがないことに注意してください。 asyncio.wait_for()
関数を使用して、タイムアウト付きのキュー操作を実行します。
以下の例セクションも参照してください。
列
- class asyncio.Queue(maxsize=0, \*, loop=None)
先入れ先出し(FIFO)キュー。
maxsize がゼロ以下の場合、キューサイズは無限大です。
0
より大きい整数の場合、キューが maxsize に達すると、get()
によってアイテムが削除されるまで、await put()
がブロックされます。標準ライブラリスレッドキューとは異なり、キューのサイズは常にわかっており、 qsize()メソッドを呼び出すことで返すことができます。
このクラスはスレッドセーフではありません。
- maxsize
キューで許可されているアイテムの数。
- empty()
キューが空の場合は
True
を返し、それ以外の場合はFalse
を返します。
- get_nowait()
アイテムがすぐに利用できる場合はアイテムを返します。それ以外の場合は、 QueueEmpty を上げます。
- put_nowait(item)
ブロックせずにアイテムをキューに入れます。
すぐに利用できる空きスロットがない場合は、 QueueFull を上げます。
- qsize()
キュー内のアイテムの数を返します。
- task_done()
以前にキューに入れられたタスクが完了したことを示します。
キューコンシューマーによって使用されます。 タスクのフェッチに使用される
get()
ごとに、 task_done()への後続の呼び出しは、タスクの処理が完了したことをキューに通知します。join()
が現在ブロックしている場合、すべてのアイテムが処理されると再開されます(つまり、put()
待ち行列)。キューに配置されたアイテムよりも多く呼び出された場合、 ValueError を発生させます。
優先キュー
- class asyncio.PriorityQueue
キューのバリアント。 優先順位の高い順にエントリを取得します(最初に低いものから)。
エントリは通常、
(priority_number, data)
の形式のタプルです。
例外
- exception asyncio.QueueEmpty
- この例外は、 get_nowait()メソッドが空のキューで呼び出されたときに発生します。
- exception asyncio.QueueFull
- maxsize に達したキューで put_nowait()メソッドが呼び出されたときに例外が発生しました。
例
キューを使用して、複数の同時タスク間でワークロードを分散できます。
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())