Python3でThreadPoolExecutorを使用する方法
著者はCOVID-19救済基金を選択し、 Write forDOnationsプログラムの一環として寄付を受け取りました。
序章
Python スレッド are a form of parallelism that allow your program to run multiple procedures at once. Parallelism in Python can also be achieved using multiple processes, but threads are particularly well suited to speeding up applications that involve significant amounts of I/O (input/output).
例I/ Oバウンド操作には、Web要求の作成とファイルからのデータの読み取りが含まれます。 I / Oバウンド操作とは対照的に、 CPUバウンド操作(Python標準ライブラリで数学を実行するなど)は、Pythonスレッドの恩恵をあまり受けません。
Python 3には、スレッドでコードを実行するためのThreadPoolExecutor
ユーティリティが含まれています。
このチュートリアルでは、ThreadPoolExecutor
を使用して、ネットワーク要求を適切に行います。 スレッド内での呼び出しに適した関数を定義し、ThreadPoolExecutor
を使用してその関数を実行し、それらの実行の結果を処理します。
このチュートリアルでは、Wikipediaページの存在を確認するためにネットワークリクエストを行います。
注: I / Oバウンド操作がCPUバウンド操作よりもスレッドの恩恵を受けるという事実は、グローバルインタープリターロックと呼ばれるPythonの特異性が原因です。 必要に応じて、Pythonのグローバルインタープリターロックの詳細については、Pythonの公式ドキュメントを参照してください。
前提条件
このチュートリアルを最大限に活用するには、Pythonでのプログラミングと、requests
がインストールされたローカルPythonプログラミング環境にある程度精通していることをお勧めします。
必要な背景情報については、次のチュートリアルを確認できます。
- Python3でコーディングする方法
- Ubuntu18.04でPython3をインストールしてローカルプログラミング環境をセットアップする方法
requests
パッケージをローカルPythonプログラミング環境にインストールするには、次のコマンドを実行できます。
pip install --user requests==2.23.0
ステップ1—スレッドで実行する関数を定義する
スレッドを使用して実行する関数を定義することから始めましょう。
nano
またはお好みのテキストエディタ/開発環境を使用して、次のファイルを開くことができます。
nano wiki_page_function.py
このチュートリアルでは、ウィキペディアのページが存在するかどうかを判断する関数を記述します。
wiki_page_function.py
import requests def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status
get_wiki_page_existence
関数は、ウィキペディアページへのURL(wiki_page_url
)とtimeout
からの応答を待機する秒数の2つの引数を受け入れます。そのURL。
get_wiki_page_existence
は、 requests パッケージを使用して、そのURLへのWebリクエストを作成します。 HTTP response
のステータスコードに応じて、ページが存在するかどうかを説明する文字列が返されます。 さまざまなステータスコードは、HTTPリクエストのさまざまな結果を表します。 この手順では、200
「成功」ステータスコードはウィキペディアページが存在することを意味し、404
「見つかりません」ステータスコードはウィキペディアページが存在しないことを意味します。
前提条件のセクションで説明されているように、この機能を実行するには、requests
パッケージがインストールされている必要があります。
url
を追加して関数を実行し、get_wiki_page_existence
関数の後に関数呼び出しを実行してみましょう。
wiki_page_function.py
. . . url = "https://en.wikipedia.org/wiki/Ocean" print(get_wiki_page_existence(wiki_page_url=url))
コードを追加したら、ファイルを保存して閉じます。
このコードを実行すると、次のようになります。
python wiki_page_function.py
次のような出力が表示されます。
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
有効なウィキペディアページでget_wiki_page_existence
関数を呼び出すと、ページが実際に存在することを確認する文字列が返されます。
警告:一般に、同時実行のバグを回避するために特別な注意を払わずに、Pythonオブジェクトまたは状態をスレッド間で共有することは安全ではありません。 スレッドで実行する関数を定義するときは、単一のジョブを実行し、状態を他のスレッドと共有または公開しない関数を定義するのが最適です。 get_wiki_page_existence
はそのような関数の例です。
ステップ2—ThreadPoolExecutorを使用してスレッドで関数を実行する
スレッドを使用した呼び出しに適した関数ができたので、ThreadPoolExecutor
を使用して、その関数の複数の呼び出しを適切に実行できます。
次の強調表示されたコードをwiki_page_function.py
のプログラムに追加しましょう。
wiki_page_function.py
import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = [ "https://en.wikipedia.org/wiki/Ocean", "https://en.wikipedia.org/wiki/Island", "https://en.wikipedia.org/wiki/this_page_does_not_exist", "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url)) for future in concurrent.futures.as_completed(futures): print(future.result())
このコードがどのように機能するかを見てみましょう。
concurrent.futures
は、ThreadPoolExecutor
へのアクセスを提供するためにインポートされます。with
ステートメントは、ThreadPoolExecutor
インスタンスexecutor
を作成するために使用され、完了時にスレッドを迅速にクリーンアップします。submitted
からexecutor
までの4つのジョブがあります。wiki_page_urls
リストの各URLに1つずつです。submit
を呼び出すたびに、futures
リストに格納されているFutureインスタンスが返されます。as_completed
関数は、各Future
get_wiki_page_existence
呼び出しが完了するのを待って、結果を出力できるようにします。
次のコマンドを使用して、このプログラムを再度実行すると、次のようになります。
python wiki_page_function.py
次のような出力が表示されます。
Outputhttps://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists
この出力は理にかなっています。3つのURLは有効なウィキペディアページであり、そのうちの1つthis_page_does_not_exist
はそうではありません。 出力の順序がこの出力と異なる場合があることに注意してください。 この例のconcurrent.futures.as_completed
関数は、ジョブが送信された順序に関係なく、結果が利用可能になるとすぐに結果を返します。
ステップ3—スレッドで実行される関数からの例外の処理
前の手順で、get_wiki_page_existence
はすべての呼び出しの値を正常に返しました。 このステップでは、ThreadPoolExecutor
がスレッド化された関数の呼び出しで生成された例外を発生させる可能性があることもわかります。
次のサンプルコードブロックを考えてみましょう。
wiki_page_function.py
import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = [ "https://en.wikipedia.org/wiki/Ocean", "https://en.wikipedia.org/wiki/Island", "https://en.wikipedia.org/wiki/this_page_does_not_exist", "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append( executor.submit( get_wiki_page_existence, wiki_page_url=url, timeout=0.00001 ) ) for future in concurrent.futures.as_completed(futures): try: print(future.result()) except requests.ConnectTimeout: print("ConnectTimeout.")
このコードブロックは、手順2で使用したものとほぼ同じですが、2つの重要な違いがあります。
- ここで、
timeout=0.00001
をget_wiki_page_existence
に渡します。requests
パッケージは0.00001
秒以内にウィキペディアへのWebリクエストを完了できないため、ConnectTimeout
例外が発生します。 future.result()
によって発生したConnectTimeout
例外をキャッチし、そのたびに文字列を出力します。
プログラムを再度実行すると、次の出力が表示されます。
OutputConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.
4つのConnectTimeout
メッセージが出力されます。0.00001
秒と4つのget_wiki_page_existence
のそれぞれで完了できなかったため、4つのwiki_page_urls
のそれぞれに1つです。 ]呼び出しにより、ConnectTimeout
例外が発生しました。
これで、ThreadPoolExecutor
に送信された関数呼び出しで例外が発生した場合、Future.result
を呼び出すことでその例外を正常に発生させることができることがわかりました。 送信されたすべての呼び出しでFuture.result
を呼び出すと、プログラムはスレッド化された関数から発生した例外を見逃すことはありません。
ステップ4—スレッドがある場合とない場合の実行時間の比較
次に、ThreadPoolExecutor
を使用すると実際にプログラムが高速になることを確認しましょう。
まず、スレッドなしで実行する場合は、get_wiki_page_existence
の時間を計りましょう。
wiki_page_function.py
import time import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running without threads:") without_threads_start = time.time() for url in wiki_page_urls: print(get_wiki_page_existence(wiki_page_url=url)) print("Without threads time:", time.time() - without_threads_start)
コード例では、50個の異なるウィキペディアページのURLを1つずつ使用してget_wiki_page_existence
関数を呼び出します。 time.time()関数を使用して、プログラムの実行にかかる秒数を出力します。
以前と同じようにこのコードを再度実行すると、次のような出力が表示されます。
OutputRunning without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182
この出力のエントリ2〜47は、簡潔にするために省略されています。
Without threads time
の後に印刷される秒数は、マシンで実行すると異なります。これで、ThreadPoolExecutor
を使用するソリューションと比較するためのベースライン数を取得できます。 この場合、~5.803
秒でした。
同じ50のウィキペディアURLをget_wiki_page_existence
で実行しましょう。ただし、今回はThreadPoolExecutor
を使用します。
wiki_page_function.py
import time import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running threaded:") threaded_start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url)) for future in concurrent.futures.as_completed(futures): print(future.result()) print("Threaded time:", time.time() - threaded_start)
このコードは、ステップ2で作成したコードと同じですが、コードの実行にかかる秒数を示すいくつかのprintステートメントが追加されています。
プログラムを再度実行すると、次のように表示されます。
OutputRunning threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543
繰り返しになりますが、Threaded time
の後に印刷される秒数は、コンピューターによって異なります(出力の順序も異なります)。
スレッドがある場合とない場合で、50個のウィキペディアページのURLを取得するための実行時間を比較できるようになりました。
このチュートリアルで使用したマシンでは、スレッドなしで~5.803
秒、スレッドありで~1.220
秒かかりました。 私たちのプログラムは、スレッドを使用すると大幅に高速に実行されました。
結論
このチュートリアルでは、Python3でThreadPoolExecutor
ユーティリティを使用して、I/Oバウンドのコードを効率的に実行する方法を学習しました。 スレッド内での呼び出しに適した関数を作成し、その関数のスレッド実行から出力と例外の両方を取得する方法を学び、スレッドを使用することで得られるパフォーマンスの向上を観察しました。
ここから、concurrent.futuresモジュールによって提供される他の同時実行機能について詳しく知ることができます。