コンカレント.futures—並列タスクの起動—Pythonドキュメント

提供:Dev Guides
< PythonPython/docs/3.8/library/concurrent.futures
移動先:案内検索

concurrent.futures —並列タスクの起動

バージョン3.2の新機能。


ソースコード: :source: `Lib / concurrent / futures / thread.py` and :source: `Lib / concurrent / futures / process.py`



concurrent.futures モジュールは、呼び出し可能オブジェクトを非同期的に実行するための高レベルのインターフェースを提供します。

非同期実行は、 ThreadPoolExecutor を使用してスレッドで実行することも、 ProcessPoolExecutor を使用して個別のプロセスで実行することもできます。 どちらも、抽象 Executor クラスによって定義される同じインターフェイスを実装します。

エグゼキュータオブジェクト

class concurrent.futures.Executor

呼び出しを非同期で実行するメソッドを提供する抽象クラス。 直接使用するのではなく、具体的なサブクラスを介して使用する必要があります。

submit(fn, *args, **kwargs)

呼び出し可能オブジェクト fnfn(*args **kwargs)として実行するようにスケジュールし、呼び出し可能オブジェクトの実行を表す Future オブジェクトを返します。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)

map(func、* iterables)と似ていますが、次の点が異なります。

  • iterables は、怠惰ではなくすぐに収集されます。

  • func は非同期で実行され、 func への複数の呼び出しが同時に行われる場合があります。

__ next __()が呼び出され、最初の呼び出しから timeout 秒後に結果が利用できない場合、返されるイテレータは concurrent.futures.TimeoutError を発生させます。 X193X] Executor.map()。 timeout は、intまたはfloatにすることができます。 timeout が指定されていない場合またはNoneの場合、待機時間に制限はありません。

func 呼び出しで例外が発生した場合、その値がイテレーターから取得されるときにその例外が発生します。

ProcessPoolExecutor を使用する場合、このメソッドは iterables をいくつかのチャンクに分割し、個別のタスクとしてプールに送信します。 これらのチャンクの(おおよその)サイズは、 chunksize を正の整数に設定することで指定できます。 非常に長い反復可能オブジェクトの場合、 chunksize に大きな値を使用すると、デフォルトサイズの1と比較してパフォーマンスを大幅に向上させることができます。 ThreadPoolExecutor では、 chunksize は効果がありません。

バージョン3.5で変更: chunksize 引数を追加しました。

shutdown(wait=True)

現在保留中の先物の実行が完了したときに、使用しているリソースを解放する必要があることをエグゼキュータに通知します。 シャットダウン後に Executor.submit()および Executor.map()を呼び出すと、 RuntimeError が発生します。

waitTrueの場合、このメソッドは、保留中のすべての先物の実行が完了し、エグゼキュータに関連付けられたリソースが解放されるまで戻りません。 waitFalseの場合、このメソッドはすぐに戻り、保留中のすべての先物の実行が完了すると、エグゼキュータに関連付けられたリソースが解放されます。 wait の値に関係なく、Pythonプログラム全体は、保留中のすべての先物の実行が完了するまで終了しません。

with ステートメントを使用すると、このメソッドを明示的に呼び出す必要がなくなり、 Executor がシャットダウンされます( Executor.shutdown()が呼び出されたかのように待機します)。 waitTrueに設定されている場合):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')



ThreadPoolExecutor

ThreadPoolExecutor は、スレッドのプールを使用して非同期で呼び出しを実行する Executor サブクラスです。

デッドロックは、 Future に関連付けられたcallableが別の Future の結果を待機しているときに発生する可能性があります。 例えば:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

と:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=, initializer=None, initargs=())

最大で max_workers スレッドのプールを使用して呼び出しを非同期に実行する Executor サブクラス。

initializer は、各ワーカースレッドの開始時に呼び出されるオプションの呼び出し可能オブジェクトです。 initargs は、イニシャライザーに渡される引数のタプルです。 initializer が例外を発生させた場合、現在保留中のすべてのジョブは BrokenThreadPool を発生させ、さらに多くのジョブをプールに送信しようとします。

バージョン3.5で変更: max_workersNoneであるか指定されていない場合、デフォルトでマシン上のプロセッサ数に5を掛けたものになります。 、CPU作業の代わりに ThreadPoolExecutor がI / Oのオーバーラップによく使用され、ワーカーの数が ProcessPoolExecutor のワーカーの数よりも多いと仮定します。

バージョン3.6の新機能: thread_name_prefix 引数が追加され、ユーザーがプールによって作成されたワーカースレッドの threading.Thread 名を制御してデバッグを容易にできるようになりました。

バージョン3.7で変更: initializer および initargs 引数が追加されました。

バージョン3.8で変更: max_workers のデフォルト値がmin(32, os.cpu_count() + 4)に変更されました。 このデフォルト値は、I / Oバウンドタスク用に少なくとも5つのワーカーを保持します。 GILを解放するCPUバウンドタスクに最大32個のCPUコアを利用します。 また、メニーコアマシンで暗黙的に非常に大きなリソースを使用することを回避します。

ThreadPoolExecutorは、 max_workers ワーカースレッドも開始する前に、アイドル状態のワーカースレッドを再利用するようになりました。

ThreadPoolExecutorの例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor クラスは、プロセスのプールを使用して非同期で呼び出しを実行する Executor サブクラスです。 ProcessPoolExecutor は、 multiprocessing モジュールを使用します。これにより、 Global Interpreter Lock を回避できますが、実行して返すことができるのは、選択可能なオブジェクトのみです。

__main__モジュールは、ワーカーサブプロセスによってインポート可能である必要があります。 これは、 ProcessPoolExecutor がインタラクティブインタープリターで機能しないことを意味します。

ProcessPoolExecutor に送信された呼び出し可能オブジェクトから Executor または Future メソッドを呼び出すと、デッドロックが発生します。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

最大で max_workers プロセスのプールを使用して非同期で呼び出しを実行する Executor サブクラス。 max_workersNoneであるか指定されていない場合、デフォルトでマシン上のプロセッサの数になります。 max_workers0以下の場合、 ValueError が発生します。 Windowsでは、 max_workers61以下である必要があります。 そうでない場合は、 ValueError が発生します。 max_workersNoneの場合、より多くのプロセッサが使用可能であっても、選択されるデフォルトは最大で61になります。 mp_context は、マルチプロセッシングコンテキストまたはNoneにすることができます。 ワーカーを起動するために使用されます。 mp_contextNoneの場合、または指定されていない場合は、デフォルトのマルチプロセッシングコンテキストが使用されます。

initializer は、各ワーカープロセスの開始時に呼び出されるオプションの呼び出し可能オブジェクトです。 initargs は、イニシャライザーに渡される引数のタプルです。 initializer で例外が発生した場合、現在保留中のすべてのジョブで BrokenProcessPool が発生し、さらに多くのジョブをプールに送信しようとします。

バージョン3.3で変更:ワーカープロセスの1つが突然終了すると、BrokenProcessPoolエラーが発生するようになりました。 以前は、動作は定義されていませんでしたが、エグゼキュータまたはその先物に対する操作は、フリーズまたはデッドロックすることがよくありました。

バージョン3.7で変更: mp_context 引数が追加され、ユーザーがプールによって作成されたワーカープロセスのstart_methodを制御できるようになりました。

initializer および initargs 引数を追加しました。

ProcessPoolExecutorの例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

将来のオブジェクト

Future クラスは、呼び出し可能オブジェクトの非同期実行をカプセル化します。 Future インスタンスは、 Executor.submit()によって作成されます。

class concurrent.futures.Future

呼び出し可能オブジェクトの非同期実行をカプセル化します。 Future インスタンスは Executor.submit()によって作成され、テスト以外は直接作成しないでください。

cancel()

通話のキャンセルを試みます。 呼び出しが現在実行中または実行を終了し、キャンセルできない場合、メソッドはFalseを返します。それ以外の場合、呼び出しはキャンセルされ、メソッドはTrueを返します。

cancelled()

通話が正常にキャンセルされた場合は、Trueを返します。

running()

呼び出しが現在実行中であり、キャンセルできない場合は、Trueを返します。

done()

通話が正常にキャンセルされた場合、または実行が終了した場合は、Trueを返します。

result(timeout=None)

呼び出しによって返された値を返します。 呼び出しがまだ完了していない場合、このメソッドは最大 timeout 秒待機します。 timeout 秒以内に呼び出しが完了しなかった場合、 concurrent.futures.TimeoutError が発生します。 timeout はintまたはfloatにすることができます。 timeout が指定されていない場合またはNoneの場合、待機時間に制限はありません。

完了する前にfutureがキャンセルされると、 CancelledError が発生します。

呼び出しが発生した場合、このメソッドは同じ例外を発生させます。

exception(timeout=None)

呼び出しによって発生した例外を返します。 呼び出しがまだ完了していない場合、このメソッドは最大 timeout 秒待機します。 timeout 秒以内に呼び出しが完了しなかった場合、 concurrent.futures.TimeoutError が発生します。 timeout はintまたはfloatにすることができます。 timeout が指定されていない場合またはNoneの場合、待機時間に制限はありません。

完了する前にfutureがキャンセルされると、 CancelledError が発生します。

呼び出しが発生せずに完了した場合、Noneが返されます。

add_done_callback(fn)

呼び出し可能 fn をfutureにアタッチします。 fn は、futureがキャンセルされるか実行が終了すると、futureを唯一の引数として呼び出されます。

追加された呼び出し可能オブジェクトは、追加された順序で呼び出され、それらを追加したプロセスに属するスレッドで常に呼び出されます。 呼び出し可能オブジェクトが Exception サブクラスを発生させた場合、それはログに記録され、無視されます。 呼び出し可能オブジェクトが BaseException サブクラスを発生させた場合、動作は未定義です。

フューチャーがすでに完了またはキャンセルされている場合は、 fn がすぐに呼び出されます。


次の Future メソッドは、単体テストおよび Executor 実装での使用を目的としています。

set_running_or_notify_cancel()

このメソッドは、 Future に関連付けられた作業を実行する前に、 Executor 実装によって、および単体テストによってのみ呼び出す必要があります。

メソッドがFalseを返す場合、 Future はキャンセルされました。 Future.cancel()が呼び出され、 True が返されました。 Future の完了を待機しているスレッド(つまり、 as_completed()または wait())を介してウェイクアップされます。

メソッドがTrueを返す場合、 Future はキャンセルされず、実行状態になっています。 Future.running()を呼び出すと、 True が返されます。

このメソッドは1回だけ呼び出すことができ、 Future.set_result()または Future.set_exception()が呼び出された後に呼び出すことはできません。

set_result(result)

Future に関連付けられている作業の結果を result に設定します。

このメソッドは、 Executor の実装と単体テストでのみ使用する必要があります。

バージョン3.8で変更: Future がすでに実行されている場合、このメソッドは concurrent.futures.InvalidStateError を発生させます。

set_exception(exception)

Future に関連付けられた作業の結果を Exception exception に設定します。

このメソッドは、 Executor の実装と単体テストでのみ使用する必要があります。

バージョン3.8で変更: Future がすでに実行されている場合、このメソッドは concurrent.futures.InvalidStateError を発生させます。



モジュール機能

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

fs によって指定された Future インスタンス(おそらく異なる Executor インスタンスによって作成された)が完了するのを待ちます。 名前付きの2タプルのセットを返します。 doneという名前の最初のセットには、待機が完了する前に完了した先物(完了またはキャンセルされた先物)が含まれています。 not_doneという名前の2番目のセットには、完了しなかった先物(保留中または実行中の先物)が含まれています。

timeout を使用して、戻るまで待機する最大秒数を制御できます。 timeout はintまたはfloatにすることができます。 timeout が指定されていない場合またはNoneの場合、待機時間に制限はありません。

return_when は、この関数がいつ戻るべきかを示します。 次の定数のいずれかである必要があります。

絶え間ない

説明

FIRST_COMPLETED

この関数は、将来終了するかキャンセルされると戻ります。

FIRST_EXCEPTION

この関数は、例外を発生させて将来が終了すると戻ります。 将来的に例外が発生しない場合は、ALL_COMPLETEDと同等です。

ALL_COMPLETED

この関数は、すべての先物が終了するかキャンセルされると戻ります。

concurrent.futures.as_completed(fs, timeout=None)
fs によって指定された Future インスタンス(異なる Executor インスタンスによって作成された可能性があります)に対するイテレーターを返します。 fs によって指定された重複する先物は、1回返されます。 as_completed()が呼び出される前に完了した先物は、最初に生成されます。 __ next __()が呼び出され、元の呼び出しから timeout 秒後に結果が利用できない場合、返されたイテレータは concurrent.futures.TimeoutError を発生させます。 X193X] as_completed()。 timeout はintまたはfloatにすることができます。 timeout が指定されていない場合またはNoneの場合、待機時間に制限はありません。

も参照してください

PEP 3148 –先物–非同期で計算を実行します
Python標準ライブラリに含めるためにこの機能を説明した提案。


例外クラス

exception concurrent.futures.CancelledError
フューチャーがキャンセルされたときに発生します。
exception concurrent.futures.TimeoutError
将来の操作が指定されたタイムアウトを超えたときに発生します。
exception concurrent.futures.BrokenExecutor

RuntimeError から派生したこの例外クラスは、何らかの理由でエグゼキュータが壊れた場合に発生し、新しいタスクの送信や実行には使用できません。

バージョン3.7の新機能。

exception concurrent.futures.InvalidStateError

現在の状態では許可されていない将来の操作が実行されたときに発生します。

バージョン3.8の新機能。

exception concurrent.futures.thread.BrokenThreadPool

BrokenExecutor から派生したこの例外クラスは、ThreadPoolExecutorのワーカーの1つが初期化に失敗したときに発生します。

バージョン3.7の新機能。

exception concurrent.futures.process.BrokenProcessPool

BrokenExecutor (以前の RuntimeError )から派生したこの例外クラスは、ProcessPoolExecutorのワーカーの1つがクリーンでない方法で終了した場合に発生します(たとえば、それが外部から殺された場合)。

バージョン3.3の新機能。