Concurrency-in-python-pool-of-threads

提供:Dev Guides
移動先:案内検索

Pythonの同時実行-スレッドのプール

マルチスレッドタスクのために多数のスレッドを作成する必要があるとします。 スレッドが多すぎるためにパフォーマンスの問題が多く発生する可能性があるため、計算コストが最も高くなります。 大きな問題は、スループットが制限されることです。 スレッドのプールを作成することにより、この問題を解決できます。 スレッドプールは、事前にインスタンス化され、アイドル状態のスレッドのグループとして定義できます。これらのスレッドは、作業を行う準備ができています。 スレッドプールの作成は、多数のタスクを実行する必要がある場合に、すべてのタスクの新しいスレッドをインスタンス化するよりも優先されます。 スレッドプールは、次のように多数のスレッドの同時実行を管理できます-

  • スレッドプール内のスレッドが実行を完了すると、そのスレッドを再利用できます。
  • スレッドが終了すると、別のスレッドが作成され、そのスレッドが置き換えられます。

Pythonモジュール– Concurrent.futures

Python標準ライブラリには、 concurrent.futures モジュールが含まれています。 このモジュールは、非同期タスクを起動するための高レベルインターフェイスを開発者に提供するために、Python 3.2で追加されました。 これは、スレッドまたはプロセスのプールを使用してタスクを実行するためのインターフェースを提供するためのPythonのスレッド化およびマルチプロセッシングモジュールの上部にある抽象化レイヤーです。

以降のセクションでは、concurrent.futuresモジュールのさまざまなクラスについて学習します。

エグゼキュータークラス

  • Executor は、 *concurrent.futures Pythonモジュールの抽象クラスです。 直接使用することはできません。次の具体的なサブクラスのいずれかを使用する必要があります-
  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor –具体的なサブクラス

これは、Executorクラスの具象サブクラスの1つです。 サブクラスはマルチスレッドを使用し、タスクを送信するためのスレッドのプールを取得します。 このプールは、利用可能なスレッドにタスクを割り当て、実行するようにスケジュールします。

ThreadPoolExecutorを作成する方法は?

*concurrent.futures* モジュールとその具体的なサブクラス *Executor* の助けを借りて、スレッドのプールを簡単に作成できます。 このためには、プールに必要なスレッド数で *ThreadPoolExecutor* を構築する必要があります。 デフォルトでは、数は5です。 その後、タスクをスレッドプールに送信できます。 タスクを* submit()*すると、 *Future* が返されます。 Futureオブジェクトには、* done()*というメソッドがあり、futureが解決したかどうかを通知します。 これにより、その特定の将来のオブジェクトに値が設定されました。 タスクが完了すると、スレッドプールエグゼキューターは値をfutureオブジェクトに設定します。

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

出力

False
True
Completed

上記の例では、5つのスレッドで ThreadPoolExecutor が構築されています。 次に、メッセージを与える前に2秒間待機するタスクがスレッドプールエグゼキューターに送信されます。 出力からわかるように、タスクは2秒まで完了しないため、* done()の最初の呼び出しはFalseを返します。 2秒後、タスクは完了し、 result()*メソッドを呼び出して未来の結果を取得します。

ThreadPoolExecutorのインスタンス化–コンテキストマネージャー

*ThreadPoolExecutor* をインスタンス化する別の方法は、コンテキストマネージャーを使用することです。 上記の例で使用した方法と同様に機能します。 コンテキストマネージャを使用する主な利点は、構文的に見た目が良いことです。 インスタンス化は、次のコードの助けを借りて行うことができます-
with ThreadPoolExecutor(max_workers = 5) as executor

次の例は、Pythonドキュメントから引用したものです。 この例では、まず concurrent.futures モジュールをインポートする必要があります。 次に、* load_url()という名前の関数が作成され、要求されたURLがロードされます。 この関数は、プール内の5つのスレッドで *ThreadPoolExecutor を作成します。 ThreadPoolExecutor はコンテキストマネージャーとして利用されています。 将来の結果を取得するには、* result()*メソッドを呼び出します。

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

出力

以下は、上記のPythonスクリプトの出力です。

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Executor.map()関数の使用

Pythonの* map()関数は、多くのタスクで広く使用されています。 そのようなタスクの1つは、特定の関数をiterables内のすべての要素に適用することです。 同様に、イテレータのすべての要素を関数にマップし、これらを独立したジョブとして *ThreadPoolExecutor に送信できます。 関数の動作を理解するために、次のPythonスクリプトの例を検討してください。

以下のこの例では、map関数を使用して、values配列内のすべての値に* square()*関数を適用しています。

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

出力

上記のPythonスクリプトは次の出力を生成します-

4
9
16
25