Python3でThreadPoolExecutorを使用する方法

提供:Dev Guides
移動先:案内検索

著者はCOVID-19救済基金を選択し、 Write forDOnationsプログラムの一環として寄付を受け取りました。

序章

Python スレッド are a form of parallelism that allow your program to run multiple procedures at once. Parallelism in Python can also be achieved using multiple processes, but threads are particularly well suited to speeding up applications that involve significant amounts of I/O (input/output).

I/ Oバウンド操作には、Web要求の作成とファイルからのデータの読み取りが含まれます。 I / Oバウンド操作とは対照的に、 CPUバウンド操作(Python標準ライブラリで数学を実行するなど)は、Pythonスレッドの恩恵をあまり受けません。

Python 3には、スレッドでコードを実行するためのThreadPoolExecutorユーティリティが含まれています。

このチュートリアルでは、ThreadPoolExecutorを使用して、ネットワーク要求を適切に行います。 スレッド内での呼び出しに適した関数を定義し、ThreadPoolExecutorを使用してその関数を実行し、それらの実行の結果を処理します。

このチュートリアルでは、Wikipediaページの存在を確認するためにネットワークリクエストを行います。

注: I / Oバウンド操作がCPUバウンド操作よりもスレッドの恩恵を受けるという事実は、グローバルインタープリターロックと呼ばれるPythonの特異性が原因です。 必要に応じて、Pythonのグローバルインタープリターロックの詳細については、Pythonの公式ドキュメントを参照してください。


前提条件

このチュートリアルを最大限に活用するには、Pythonでのプログラミングと、requestsがインストールされたローカルPythonプログラミング環境にある程度精通していることをお勧めします。

必要な背景情報については、次のチュートリアルを確認できます。

pip install --user requests==2.23.0

ステップ1—スレッドで実行する関数を定義する

スレッドを使用して実行する関数を定義することから始めましょう。

nanoまたはお好みのテキストエディタ/開発環境を使用して、次のファイルを開くことができます。

nano wiki_page_function.py

このチュートリアルでは、ウィキペディアのページが存在するかどうかを判断する関数を記述します。

wiki_page_function.py

import requests

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

get_wiki_page_existence 関数は、ウィキペディアページへのURL(wiki_page_url)とtimeoutからの応答を待機する秒数の2つの引数を受け入れます。そのURL。

get_wiki_page_existenceは、 requests パッケージを使用して、そのURLへのWebリクエストを作成します。 HTTP responseステータスコードに応じて、ページが存在するかどうかを説明する文字列が返されます。 さまざまなステータスコードは、HTTPリクエストのさまざまな結果を表します。 この手順では、200「成功」ステータスコードはウィキペディアページが存在することを意味し、404「見つかりません」ステータスコードはウィキペディアページが存在しないことを意味します。

前提条件のセクションで説明されているように、この機能を実行するには、requestsパッケージがインストールされている必要があります。

urlを追加して関数を実行し、get_wiki_page_existence関数の後に関数呼び出しを実行してみましょう。

wiki_page_function.py

. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

コードを追加したら、ファイルを保存して閉じます。

このコードを実行すると、次のようになります。

python wiki_page_function.py

次のような出力が表示されます。

Outputhttps://en.wikipedia.org/wiki/Ocean - exists

有効なウィキペディアページでget_wiki_page_existence関数を呼び出すと、ページが実際に存在することを確認する文字列が返されます。

警告:一般に、同時実行のバグを回避するために特別な注意を払わずに、Pythonオブジェクトまたは状態をスレッド間で共有することは安全ではありません。 スレッドで実行する関数を定義するときは、単一のジョブを実行し、状態を他のスレッドと共有または公開しない関数を定義するのが最適です。 get_wiki_page_existenceはそのような関数の例です。


ステップ2—ThreadPoolExecutorを使用してスレッドで関数を実行する

スレッドを使用した呼び出しに適した関数ができたので、ThreadPoolExecutorを使用して、その関数の複数の呼び出しを適切に実行できます。

次の強調表示されたコードをwiki_page_function.pyのプログラムに追加しましょう。

wiki_page_function.py

import requests
import concurrent.futures

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

このコードがどのように機能するかを見てみましょう。

  • concurrent.futuresは、ThreadPoolExecutorへのアクセスを提供するためにインポートされます。
  • withステートメントは、ThreadPoolExecutorインスタンスexecutorを作成するために使用され、完了時にスレッドを迅速にクリーンアップします。
  • submittedからexecutorまでの4つのジョブがあります。wiki_page_urlsリストの各URLに1つずつです。
  • submitを呼び出すたびに、futuresリストに格納されているFutureインスタンスが返されます。
  • as_completed関数は、各Future get_wiki_page_existence呼び出しが完了するのを待って、結果を出力できるようにします。

次のコマンドを使用して、このプログラムを再度実行すると、次のようになります。

python wiki_page_function.py

次のような出力が表示されます。

Outputhttps://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists

この出力は理にかなっています。3つのURLは有効なウィキペディアページであり、そのうちの1つthis_page_does_not_existはそうではありません。 出力の順序がこの出力と異なる場合があることに注意してください。 この例のconcurrent.futures.as_completed関数は、ジョブが送信された順序に関係なく、結果が利用可能になるとすぐに結果を返します。

ステップ3—スレッドで実行される関数からの例外の処理

前の手順で、get_wiki_page_existenceはすべての呼び出しの値を正常に返しました。 このステップでは、ThreadPoolExecutorがスレッド化された関数の呼び出しで生成された例外を発生させる可能性があることもわかります。

次のサンプルコードブロックを考えてみましょう。

wiki_page_function.py

import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status


wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(
            executor.submit(
                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
            )
        )
    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except requests.ConnectTimeout:
            print("ConnectTimeout.")

このコードブロックは、手順2で使用したものとほぼ同じですが、2つの重要な違いがあります。

  • ここで、timeout=0.00001get_wiki_page_existenceに渡します。 requestsパッケージは0.00001秒以内にウィキペディアへのWebリクエストを完了できないため、ConnectTimeout例外が発生します。
  • future.result()によって発生したConnectTimeout例外をキャッチし、そのたびに文字列を出力します。

プログラムを再度実行すると、次の出力が表示されます。

OutputConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.

4つのConnectTimeoutメッセージが出力されます。0.00001秒と4つのget_wiki_page_existenceのそれぞれで完了できなかったため、4つのwiki_page_urlsのそれぞれに1つです。 ]呼び出しにより、ConnectTimeout例外が発生しました。

これで、ThreadPoolExecutorに送信された関数呼び出しで例外が発生した場合、Future.resultを呼び出すことでその例外を正常に発生させることができることがわかりました。 送信されたすべての呼び出しでFuture.resultを呼び出すと、プログラムはスレッド化された関数から発生した例外を見逃すことはありません。

ステップ4—スレッドがある場合とない場合の実行時間の比較

次に、ThreadPoolExecutorを使用すると実際にプログラムが高速になることを確認しましょう。

まず、スレッドなしで実行する場合は、get_wiki_page_existenceの時間を計りましょう。

wiki_page_function.py

import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

コード例では、50個の異なるウィキペディアページのURLを1つずつ使用してget_wiki_page_existence関数を呼び出します。 time.time()関数を使用して、プログラムの実行にかかる秒数を出力します。

以前と同じようにこのコードを再度実行すると、次のような出力が表示されます。

OutputRunning without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182

この出力のエントリ2〜47は、簡潔にするために省略されています。

Without threads timeの後に印刷される秒数は、マシンで実行すると異なります。これで、ThreadPoolExecutorを使用するソリューションと比較するためのベースライン数を取得できます。 この場合、~5.803秒でした。

同じ50のウィキペディアURLをget_wiki_page_existenceで実行しましょう。ただし、今回はThreadPoolExecutorを使用します。

wiki_page_function.py

import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
print("Threaded time:", time.time() - threaded_start)

このコードは、ステップ2で作成したコードと同じですが、コードの実行にかかる秒数を示すいくつかのprintステートメントが追加されています。

プログラムを再度実行すると、次のように表示されます。

OutputRunning threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543

繰り返しになりますが、Threaded timeの後に印刷される秒数は、コンピューターによって異なります(出力の順序も異なります)。

スレッドがある場合とない場合で、50個のウィキペディアページのURLを取得するための実行時間を比較できるようになりました。

このチュートリアルで使用したマシンでは、スレッドなしで~5.803秒、スレッドありで~1.220秒かかりました。 私たちのプログラムは、スレッドを使用すると大幅に高速に実行されました。

結論

このチュートリアルでは、Python3でThreadPoolExecutorユーティリティを使用して、I/Oバウンドのコードを効率的に実行する方法を学習しました。 スレッド内での呼び出しに適した関数を作成し、その関数のスレッド実行から出力と例外の両方を取得する方法を学び、スレッドを使用することで得られるパフォーマンスの向上を観察しました。

ここから、concurrent.futuresモジュールによって提供される他の同時実行機能について詳しく知ることができます。