Concurrency-in-python-pool-of-processes

提供:Dev Guides
移動先:案内検索

Pythonの同時実行-プロセスのプール

プロセスのプールは、スレッドのプールを作成して使用したのと同じ方法で作成して使用できます。 プロセスプールは、事前にインスタンス化されたプロセスとアイドルプロセスのグループとして定義できます。 多数のタスクを実行する必要がある場合、すべてのタスクの新しいプロセスをインスタンス化するよりもプロセスプールを作成することをお勧めします。

Pythonモジュール– Concurrent.futures

Python標準ライブラリには、 concurrent.futures というモジュールがあります。 このモジュールは、非同期タスクを起動するための高レベルインターフェイスを開発者に提供するために、Python 3.2で追加されました。 これは、スレッドまたはプロセスのプールを使用してタスクを実行するためのインターフェースを提供するためのPythonのスレッド化およびマルチプロセッシングモジュールの上部にある抽象化レイヤーです。

以降のセクションでは、concurrent.futuresモジュールのさまざまなサブクラスを見ていきます。

エグゼキュータークラス

*Executor* は、 *concurrent.futures* Pythonモジュールの抽象クラスです。 直接使用することはできません。次の具体的なサブクラスのいずれかを使用する必要があります-
  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor –具体的なサブクラス

これは、Executorクラスの具象サブクラスの1つです。 マルチプロセッシングを使用し、タスクを送信するためのプロセスのプールを取得します。 このプールは、利用可能なプロセスにタスクを割り当て、実行するようにスケジュールします。

ProcessPoolExecutorを作成する方法は?

*concurrent.futures* モジュールとその具体的なサブクラス *Executor* を使用して、プロセスのプールを簡単に作成できます。 このために、プールに必要なプロセスの数で *ProcessPoolExecutor* を構築する必要があります。 デフォルトでは、数は5です。 その後、プロセスプールにタスクを送信します。

ここで、スレッドプールの作成時に使用したのと同じ例を検討します。唯一の違いは、 ThreadPoolExecutor の代わりに ProcessPoolExecutor を使用することです。

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

出力

False
False
Completed

上記の例では、Process * PoolExecutor は5つのスレッドで構成されています。 次に、メッセージを出す前に2秒間待機するタスクがプロセスプールエグゼキューターに送信されます。 出力からわかるように、タスクは2秒まで完了しないため、 done()の最初の呼び出しはFalseを返します。 2秒後、タスクは完了し、 result()*メソッドを呼び出して未来の結果を取得します。

ProcessPoolExecutorのインスタンス化–コンテキストマネージャー

ProcessPoolExecutorをインスタンス化する別の方法は、コンテキストマネージャーを使用することです。 上記の例で使用した方法と同様に機能します。 コンテキストマネージャを使用する主な利点は、構文的に見た目が良いことです。 インスタンス化は、次のコードの助けを借りて行うことができます-

with ProcessPoolExecutor(max_workers = 5) as executor

理解を深めるために、スレッドプールの作成時に使用したのと同じ例を取り上げます。 この例では、 concurrent.futures モジュールをインポートすることから始める必要があります。 次に、* load_url()という名前の関数が作成され、要求されたURLがロードされます。 次に、プール内の5つのスレッド数で *ProcessPoolExecutor が作成されます。 Process * PoolExecutor は、コンテキストマネージャーとして利用されています。 将来の結果を取得するには、 result()*メソッドを呼び出します。

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

出力

上記のPythonスクリプトは次の出力を生成します-

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Executor.map()関数の使用

Pythonの* map()関数は、多くのタスクを実行するために広く使用されています。 そのようなタスクの1つは、特定の関数をiterables内のすべての要素に適用することです。 同様に、イテレータのすべての要素を関数にマップし、これらを独立したジョブとして *ProcessPoolExecutor に送信できます。 これを理解するには、次のPythonスクリプトの例を検討してください。

  • Executor.map()関数を使用してスレッドプールを作成するときに使用したのと同じ例を検討します。 以下の例では、map関数を使用して、values配列のすべての値に square()*関数を適用しています。
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

出力

上記のPythonスクリプトは、次の出力を生成します

4
9
16
25

ProcessPoolExecutorとThreadPoolExecutorを使用する場合

executorクラス(ThreadPoolExecutorとProcessPoolExecutor)の両方について学習したので、どのエクゼキューターをいつ使用するかを知る必要があります。 CPUバウンドワークロードの場合はProcessPoolExecutorを選択し、I/Oバウンドワークロードの場合はThreadPoolExecutorを選択する必要があります。

*ProcessPoolExecutor* を使用する場合、マルチプロセッシングを使用するため、GILを心配する必要はありません。 さらに、 *ThreadPoolExecution* と比較すると、実行時間は短くなります。 これを理解するには、次のPythonスクリプト例を検討してください。

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

出力

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

出力

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

上記の両方のプログラムの出力から、 ProcessPoolExecutorThreadPoolExecutor を使用しているときの実行時間の違いを確認できます。