Concurrency-in-python-quick-guide

提供:Dev Guides
移動先:案内検索

Pythonの同時実行性-はじめに

この章では、Pythonの並行性の概念を理解し、さまざまなスレッドとプロセスについて学習します。

並行性とは何ですか?

簡単に言えば、並行性とは、同時に2つ以上のイベントが発生することです。 多くのイベントが同時に発生するため、同時実行は自然現象です。

プログラミングに関して言えば、並行性とは、実行時に2つのタスクが重複することです。 並行プログラミングを使用すると、以前の要求が完了するのを待たずに要求を同時に処理できるため、アプリケーションとソフトウェアシステムのパフォーマンスを向上させることができます。

同時実行の履歴レビュー

次のポイントは、並行性の簡単な歴史的レビューを提供します-

鉄道の概念から

並行性は、鉄道の概念と密接に関連しています。 鉄道では、すべての列車が目的地に安全に到着するように、同じ鉄道システムで複数の列車を処理する必要がありました。

学界でのコンカレントコンピューティング

コンピューターサイエンスの並行性への関心は、エドガーWが発行した研究論文から始まりました 1965年のダイクストラ。 この論文では、同時実行制御の特性である相互排除の問題を特定し、解決しました。

高レベルの並行処理プリミティブ

最近では、プログラマーは、高レベルの並行処理プリミティブの導入により、並行ソリューションを改善しています。

プログラミング言語との並行性の改善

GoogleのGolang、Rust、Pythonなどのプログラミング言語は、より優れた並行ソリューションを実現するのに役立つ分野で驚くべき開発を行ってきました。

スレッドとマルチスレッドとは何ですか?

  • スレッド*は、オペレーティングシステムで実行できる実行の最小単位です。 それ自体はプログラムではなく、プログラム内で実行されます。 つまり、スレッドは互いに独立していません。 各スレッドはコードセクション、データセクションなどを共有します。 他のスレッドと。 軽量プロセスとも呼ばれます。

スレッドは、次のコンポーネントで構成されています-

  • 次の実行可能命令のアドレスで構成されるプログラムカウンター
  • スタック
  • レジスタのセット
  • 一意のID

一方、*マルチスレッド*は、複数のスレッドを同時に実行することでオペレーティングシステムの使用を管理するCPUの機能です。 マルチスレッドの主なアイデアは、プロセスを複数のスレッドに分割することで並列処理を実現することです。 マルチスレッドの概念は、次の例を使用して理解できます。

MS Wordを開いてコンテンツを入力する特定のプロセスを実行しているとします。 MS Wordを開くために1つのスレッドが割り当てられ、その中にコンテンツを入力するために別のスレッドが必要になります。 そして今、既存のものを編集したい場合、編集タスクなどを行うために別のスレッドが必要になります。

プロセスとマルチプロセッシングとは何ですか?

A プロセスは、システムに実装される基本的な作業単位を表すエンティティとして定義されます。 簡単に言えば、コンピュータープログラムをテキストファイルに記述し、このプログラムを実行すると、プログラムで言及されているすべてのタスクを実行するプロセスになります。 プロセスのライフサイクル中に、開始、準備完了、実行中、待機、終了のさまざまな段階を通過します。

次の図は、プロセスのさまざまな段階を示しています-

マルチプロセッシング

プロセスには、プライマリスレッドと呼ばれる1つのスレッドのみ、または独自のレジスタセット、プログラムカウンター、スタックを持つ複数のスレッドを含めることができます。 次の図は、違いを示しています-

Multiprocessing One

一方、*マルチプロセッシング*は、単一のコンピューターシステム内で2つ以上のCPUユニットを使用することです。 私たちの主な目標は、ハードウェアから最大限の可能性を引き出すことです。 これを実現するには、コンピューターシステムで利用可能なCPUコアをすべて利用する必要があります。 そのためには、マルチプロセッシングが最適なアプローチです。

Multiprocessing Two

Pythonは最も人気のあるプログラミング言語の1つです。 以下は、同時アプリケーションに適したいくつかの理由です-

構文糖

構文シュガーは、物を読みやすくしたり表現したりしやすくするように設計されたプログラミング言語内の構文です。 これにより、人間が使用する言語が「甘く」なります。物事をより明確に、より簡潔に、または好みに基づいた代替スタイルで表現できます。 Pythonには、オブジェクトに作用するように定義できるMagicメソッドが付属しています。 これらのMagicメソッドは構文糖として使用され、よりわかりやすいキーワードにバインドされています。

大規模なコミュニティ

Python言語は、AI、機械学習、深層学習、および定量分析の分野で働いており、データサイエンティストや数学者の間で大規模な採用率を示しています。

並行プログラミングに役立つAPI

Python 2および3には、並列/並行プログラミング専用のAPIが多数あります。 それらの中で最も人気があるのは、*スレッド、concurrent.features、マルチプロセッシング、asyncio、geventおよびgreenlets *などです。

並行アプリケーションの実装におけるPythonの制限

Pythonには、並行アプリケーションに対する制限があります。 この制限は* GIL(Global Interpreter Lock)*と呼ばれ、Python内に存在します。 GILでは、CPUの複数のコアを使用することはできません。したがって、Pythonには真のスレッドは存在しないと言えます。 私たちは次のようにGILの概念を理解することができます-

GIL(グローバルインタープリターロック)

Pythonの世界で最も物議を醸すトピックの1つです。 CPythonでは、GILは相互排他ロックです。これは、物事をスレッドセーフにします。 言い換えれば、GILは、複数のスレッドがPythonコードを並行して実行するのを防ぐと言えます。 ロックは一度に1つのスレッドのみが保持でき、スレッドを実行する場合は、最初にロックを取得する必要があります。 以下に示す図は、GILの動作を理解するのに役立ちます。

制限

ただし、Pythonには Numpy、JpythonIronPytbhon などのライブラリと実装があります。これらのライブラリはGILとの対話なしで機能します。

並行性と並列性

マルチスレッドプログラムに関しては、並行性と並列性の両方が使用されますが、それらの類似点と相違点については多くの混乱があります。 これに関する大きな問題は、並行性の並列性かどうかです。 両方の用語は非常に似ているように見えますが、上記の質問に対する答えは「いいえ」ですが、並行性と並列性は同じではありません。 今、それらが同じでない場合、それらの間の基本的な違いは何ですか?

簡単に言えば、並行性は異なるスレッドからの共有状態へのアクセスの管理を扱い、反対側は並列処理は複数のCPUまたはそのコアを利用してハードウェアのパフォーマンスを改善することを扱います。

並行性の詳細

同時実行とは、実行時に2つのタスクが重複する場合です。 アプリケーションが複数のタスクで同時に進行している場合があります。 図式的に理解できます。次のように、複数のタスクが同時に進行しています-

同時実行性

並行性のレベル

このセクションでは、プログラミングの観点から並行性の3つの重要なレベルについて説明します-

低レベルの並行性

このレベルの並行性では、アトミック操作の明示的な使用があります。 非常にエラーが発生しやすく、デバッグが難しいため、このような種類の同時実行をアプリケーションの構築に使用することはできません。 Pythonでさえ、このような並行性をサポートしていません。

中レベルの並行性

この並行性では、明示的なアトミック操作の使用はありません。 明示的なロックを使用します。 Pythonおよび他のプログラミング言語は、このような並行性をサポートしています。 ほとんどの場合、アプリケーションプログラマはこの並行性を使用します。

高レベルの並行性

この並行性では、明示的なアトミック操作も明示的なロックも使用されません。 Pythonには、このような種類の同時実行性をサポートする concurrent.futures モジュールがあります。

並行システムのプロパティ

プログラムまたは並行システムが正しいためには、いくつかのプロパティがそれによって満たされる必要があります。 システムの終了に関連するプロパティは次のとおりです-

正確性プロパティ

正しさプロパティは、プログラムまたはシステムが目的の正解を提供する必要があることを意味します。 単純にするために、システムは開始プログラムの状態を最終状態に正しくマッピングする必要があると言えます。

安全性

安全プロパティとは、プログラムまたはシステムが*「良い」または「安全な」状態のままでなければならず、「悪い」*を実行しないことを意味します。

活力プロパティ

このプロパティは、プログラムまたはシステムが「進行」する必要があり、望ましい状態に達することを意味します。

並行システムのアクター

これは、複数のプロセスとスレッドが同時に実行できる並行システムの一般的なプロパティの1つであり、それらは同時に実行されて、独自のタスクを進行させます。 これらのプロセスとスレッドは、並行システムのアクターと呼ばれます。

並行システムのリソース

アクターは、メモリ、ディスク、プリンターなどのリソースを利用する必要があります。 タスクを実行するため。

特定のルールセット

すべての並行システムには、アクターによって実行されるタスクの種類とそれぞれのタイミングを定義する一連のルールが必要です。 タスクには、ロックの取得、メモリ共有、状態の変更などがあります。

並行システムの障壁

データの共有

並行システムを実装する際の重要な問題は、複数のスレッドまたはプロセス間でデータを共有することです。 実際、プログラマーはロックが共有データを保護し、ロックへのすべてのアクセスがシリアル化され、一度に1つのスレッドまたはプロセスのみが共有データにアクセスできるようにする必要があります。 複数のスレッドまたはプロセスがすべて同じ共有データにアクセスしようとすると、少なくとも1つではなくすべてがブロックされ、アイドル状態のままになります。 つまり、ロックが有効になっているときに一度に使用できるプロセスまたはスレッドは1つだけだと言えます。 上記の障壁を取り除くためのいくつかの簡単な解決策があります-

データ共有の制限

最も簡単な解決策は、可変データを共有しないことです。 この場合、明示的なロックを使用する必要はなく、相互データによる並行性の障壁は解決されます。

データ構造支援

多くの場合、並行プロセスは同じデータに同時にアクセスする必要があります。 明示的なロックの使用よりも別の解決策は、同時アクセスをサポートするデータ構造を使用することです。 たとえば、スレッドセーフキューを提供する queue モジュールを使用できます。 マルチプロセッシングベースの同時実行性のために multiprocessing.JoinableQueue クラスを使用することもできます。

不変のデータ転送

並行キューなど、使用しているデータ構造が適切でない場合は、ロックせずに不変データを渡すことができます。

可変データ転送

上記のソリューションの続きとして、不変データではなく可変データのみを渡す必要がある場合、読み取り専用の可変データを渡すことができると仮定します。

I/Oリソースの共有

並行システムを実装する際のもう1つの重要な問題は、スレッドまたはプロセスによるI/Oリソースの使用です。 この問題は、1つのスレッドまたはプロセスがこのような長時間I/Oを使用しており、他のスレッドまたはプロセスがアイドル状態になっている場合に発生します。 I/Oの重いアプリケーションで作業しているときに、このような障壁を確認できます。 これは、Webブラウザーからページを要求する例の助けを借りて理解できます。 重いアプリケーションです。 ここで、データが要求される速度が消費される速度よりも遅い場合、並行システムにI/Oバリアがあります。

次のPythonスクリプトは、Webページを要求し、ネットワークが要求されたページを取得するのにかかった時間を取得するためのものです-

import urllib.request

import time

ts = time.time()

req = urllib.request.urlopen('http://www.finddevguides.com')

pageHtml = req.read()

te = time.time()

print("Page Fetching Time : {} Seconds".format (te-ts))

上記のスクリプトを実行すると、次のようにページ取得時間を取得できます。

出力

Page Fetching Time: 1.0991398811340332 Seconds

ページを取得するのに1秒以上かかることがわかります。 数千の異なるWebページを取得したい場合、ネットワークにどれだけ時間がかかるかを理解できます。

並列処理とは何ですか?

並列処理は、タスクを同時に処理できるサブタスクに分割する技術として定義できます。 上記で説明したように、2つ以上のイベントが同時に発生する並行性とは反対です。 図式的に理解できます。タスクは、次のように、並行して処理できるいくつかのサブタスクに分割されます-

平行度

並行性と並列性の違いについてより多くのアイデアを得るには、次の点を考慮してください-

同時だが並列ではない

アプリケーションは並行することができますが、並行ではない場合、複数のタスクを同時に処理しますが、タスクはサブタスクに分割されません。

並列だが同時ではない

アプリケーションは並列にできますが、同時ではありません。つまり、一度に1つのタスクでのみ動作し、サブタスクに分割されたタスクは並列に処理できます。

並列でも同時でもない

アプリケーションは、並列にも並行にもできません。 つまり、一度に1つのタスクのみで機能し、タスクがサブタスクに分割されることはありません。

並列と同時の両方

アプリケーションは、並行と並行の両方になります。つまり、同時に複数のタスクで動作し、タスクをサブタスクに分割して、それらを並行して実行します。

並列処理の必要性

サブタスクを単一のCPUの異なるコア間、またはネットワーク内で接続された複数のコンピューター間で分散することにより、並列処理を実現できます。

並列性を実現する必要がある理由を理解するには、次の重要な点を考慮してください-

効率的なコード実行

並列処理の助けを借りて、コードを効率的に実行できます。 パーツ内の同じコードが並行して実行されるため、時間を節約できます。

シーケンシャルコンピューティングよりも高速

シーケンシャルコンピューティングは物理的および実用的な要因によって制約されているため、高速なコンピューティング結果を得ることができません。 一方、この問題は並列計算によって解決され、順次計算よりも高速な計算結果が得られます。

実行時間の短縮

並列処理により、プログラムコードの実行時間が短縮されます。

実際の並列処理の例について話すと、コンピューターのグラフィックスカードは、独立して動作し同時に実行できる数百の処理コアを備えているため、並列処理の真の力を強調する例です。 このため、ハイエンドのアプリケーションやゲームも実行できます。

実装するプロセッサの理解

並行性、並列性、およびそれらの違いについては知っていますが、それが実装されるシステムについてはどうでしょうか。 ソフトウェアの設計中に十分な情報に基づいて意思決定を行うことが有益であるため、実装するシステムを理解する必要があります。 次の2種類のプロセッサがあります-

シングルコアプロセッサ

シングルコアプロセッサは、常に1つのスレッドを実行できます。 これらのプロセッサは、*コンテキストスイッチング*を使用して、特定の時間にスレッドに必要なすべての情報を保存し、後で情報を復元します。 コンテキストスイッチングメカニズムは、1秒以内に多数のスレッドを進行させるのに役立ち、システムが複数のことを処理しているように見えます。

シングルコアプロセッサには多くの利点があります。 これらのプロセッサは必要な電力が少なく、複数のコア間に複雑な通信プロトコルはありません。 一方、シングルコアプロセッサの速度は制限されており、大規模なアプリケーションには適していません。

マルチコアプロセッサ

マルチコアプロセッサには、*コア*とも呼ばれる複数の独立した処理ユニットがあります。

各コアには、格納された一連の命令を実行するために必要なものがすべて含まれているため、このようなプロセッサにはコンテキスト切り替えメカニズムは必要ありません。

フェッチ-デコード-実行サイクル

マルチコアプロセッサのコアは、実行サイクルに従います。 このサイクルは Fetch-Decode-Execute サイクルと呼ばれます。 それには、次の手順が含まれます-

フェッチ

これはサイクルの最初のステップで、プログラムメモリからの命令のフェッチが含まれます。

デコード

最近フェッチされた命令は、CPUの他の部分をトリガーする一連の信号に変換されます。

実行する

これは、フェッチおよびデコードされた命令が実行される最終ステップです。 実行結果はCPUレジスタに保存されます。

ここでの利点の1つは、マルチコアプロセッサでの実行がシングルコアプロセッサの実行よりも速いことです。 大規模なアプリケーションに適しています。 一方、複数のコア間の複雑な通信プロトコルは問題です。 マルチコアには、シングルコアプロセッサよりも多くの電力が必要です。

システムとメモリのアーキテクチャ

プログラムまたはコンカレントシステムを設計する際には、さまざまなシステムおよびメモリアーキテクチャスタイルを考慮する必要があります。 1つのシステムとメモリスタイルは1つのタスクに適しているが、他のタスクにエラーが発生しやすいため、非常に必要です。

並行性をサポートするコンピューターシステムアーキテクチャ

マイケルフリンは1972年に、さまざまなスタイルのコンピューターシステムアーキテクチャを分類するための分類法を提供しました。 この分類法は、次の4つの異なるスタイルを定義します-

  • 単一命令ストリーム、単一データストリーム(SISD)
  • 単一命令ストリーム、複数データストリーム(SIMD)
  • 複数の命令ストリーム、単一のデータストリーム(MISD)
  • 複数の命令ストリーム、複数のデータストリーム(MIMD)。

単一命令ストリーム、単一データストリーム(SISD)

名前が示すように、このような種類のシステムには、1つの連続する着信データストリームと、データストリームを実行するための単一の処理ユニットがあります。 それらは、並列コンピューティングアーキテクチャを備えたユニプロセッサシステムのようなものです。 以下は、SISDのアーキテクチャです-

SSID

SISDの利点

SISDアーキテクチャの利点は次のとおりです-

  • 必要な電力が少なくなります。
  • 複数のコア間の複雑な通信プロトコルの問題はありません。

SISDの欠点

SISDアーキテクチャの欠点は次のとおりです-

  • SISDアーキテクチャの速度は、シングルコアプロセッサと同様に制限されています。
  • 大規模なアプリケーションには適していません。

単一命令ストリーム、複数データストリーム(SIMD)

名前が示すように、そのような種類のシステムには、複数の受信データストリームと任意の時点で単一の命令で動作できる処理ユニットの数があります。 それらは、並列コンピューティングアーキテクチャを備えたマルチプロセッサシステムのようなものです。 以下はSIMDのアーキテクチャです-

simd

SIMDの最良の例は、グラフィックカードです。 これらのカードには、何百もの個別の処理ユニットがあります。 SISDとSIMDの計算の違いについて説明する場合、追加配列 [5、15、20] および* [15、25、10]、*に対して、SISDアーキテクチャは3つの異なる追加操作を実行する必要があります。 一方、SIMDアーキテクチャでは、1回の追加操作で追加できます。

SIMDの利点

SIMDアーキテクチャの利点は次のとおりです-

  • 1つの命令のみを使用して、複数の要素で同じ操作を実行できます。
  • システムのスループットは、プロセッサのコアの数を増やすことで増やすことができます。
  • 処理速度は、SISDアーキテクチャよりも高速です。

SIMDの欠点

SIMDアーキテクチャの欠点は次のとおりです-

  • プロセッサのコアの数の間には複雑な通信があります。
  • コストはSISDアーキテクチャよりも高くなります。

複数命令単一データ(MISD)ストリーム

MISDストリームを使用するシステムには、同じデータセットで異なる命令を実行することにより、異なる操作を実行する多数の処理ユニットがあります。 以下はMISDのアーキテクチャです-

MISD

MISDアーキテクチャの代表はまだ商業的に存在しません。

複数命令複数データ(MIMD)ストリーム

MIMDアーキテクチャを使用するシステムでは、マルチプロセッサシステムの各プロセッサは、異なるデータセットのセットに対して独立して、異なる命令セットを並列に実行できます。 これは、単一の操作が複数のデータセットで実行されるSIMDアーキテクチャとは反対です。 以下はMIMDのアーキテクチャです-

MIMD

通常のマルチプロセッサはMIMDアーキテクチャを使用します。 これらのアーキテクチャは、基本的に、コンピューター支援設計/コンピューター支援製造、シミュレーション、モデリング、通信スイッチなどの多くのアプリケーション分野で使用されます。

同時実行性をサポートするメモリアーキテクチャ

並行性や並列処理などの概念を使用しながら、プログラムを高速化する必要が常にあります。 コンピューター設計者が見つけた解決策の1つは、共有メモリマルチコンピューター、つまり単一の物理アドレス空間を持つコンピューターを作成することです。 このシナリオでは、さまざまなスタイルのアーキテクチャが存在する可能性がありますが、次の3つの重要なアーキテクチャスタイルがあります-

UMA(Uniform Memory Access)

このモデルでは、すべてのプロセッサが物理メモリを均一に共有します。 すべてのプロセッサは、すべてのメモリワードへのアクセス時間が等しくなっています。 各プロセッサにはプライベートキャッシュメモリがあります。 周辺機器は一連のルールに従います。

すべてのプロセッサがすべての周辺機器に同等にアクセスできる場合、システムは*対称型マルチプロセッサ*と呼ばれます。 周辺機器にアクセスできるプロセッサが1つまたは少数の場合、システムは「非対称マルチプロセッサ」と呼ばれます。

UMA

不均一メモリアクセス(NUMA)

NUMAマルチプロセッサモデルでは、アクセス時間はメモリワードの場所によって異なります。 ここでは、共有メモリはローカルメモリと呼ばれるすべてのプロセッサに物理的に分散されています。 すべてのローカルメモリのコレクションは、すべてのプロセッサがアクセスできるグローバルアドレス空間を形成します。

NUMA

キャッシュオンリーメモリアーキテクチャ(COMA)

COMAモデルは、NUMAモデルの特殊バージョンです。 ここでは、すべての分散メインメモリがキャッシュメモリに変換されます。

coma Concurrency-in-python-threads

スレッドの実装

この章では、Pythonでスレッドを実装する方法を学びます。

スレッド実装用のPythonモジュール

Pythonスレッドは、プロセスよりもはるかに少ないメモリを占有するため、軽量プロセスと呼ばれることもあります。 スレッドを使用すると、複数のタスクを一度に実行できます。 Pythonでは、プログラムにスレッドを実装する次の2つのモジュールがあります-

  • * <_ thread> *モジュール
  • * <threading> *モジュール

これら2つのモジュールの主な違いは、 <_ thread> モジュールはスレッドを関数として扱うのに対して、 <threading> モジュールはすべてのスレッドをオブジェクトとして扱い、オブジェクト指向の方法で実装することです。 さらに、* <_ thread> モジュールは低レベルのスレッド化で効果的であり、 *<threading> モジュールよりも機能が少なくなります。

<_thread>モジュール

Pythonの以前のバージョンでは、 <thread> モジュールがありましたが、かなり長い間「非推奨」と見なされてきました。 代わりに <threading> モジュールを使用することをお勧めします。 したがって、Python 3では、モジュール「スレッド」は使用できなくなりました。 Python3の後方非互換性のために、「 <_ thread> 」に名前が変更されました。

*<_ thread>* モジュールの助けを借りて新しいスレッドを生成するには、その *start_new_thread* メソッドを呼び出す必要があります。 この方法の動作は、次の構文の助けを借りて理解することができます-
_thread.start_new_thread ( function, args[, kwargs] )

ここに-

  • args は引数のタプルです
  • kwargs は、キーワード引数のオプションの辞書です

引数を渡さずに関数を呼び出したい場合、 args で引数の空のタプルを使用する必要があります。

このメソッド呼び出しはすぐに戻り、子スレッドが開始され、引数のリスト(存在する場合)を使用して関数を呼び出します。 関数が戻ると、スレッドは終了します。

以下は、 <_ thread> モジュールを使用して新しいスレッドを生成する例です。 ここではstart_new_thread()メソッドを使用しています。

import _thread
import time

def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print ("%s: %s" % ( threadName, time.ctime(time.time()) ))

try:
   _thread.start_new_thread( print_time, ("Thread-1", 2, ) )
   _thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
   print ("Error: unable to start thread")
while 1:
   pass

出力

次の出力は、 <_ thread> モジュールの助けを借りて、新しいスレッドの生成を理解するのに役立ちます。

Thread-1: Mon Apr 23 10:03:33 2018
Thread-2: Mon Apr 23 10:03:35 2018
Thread-1: Mon Apr 23 10:03:35 2018
Thread-1: Mon Apr 23 10:03:37 2018
Thread-2: Mon Apr 23 10:03:39 2018
Thread-1: Mon Apr 23 10:03:39 2018
Thread-1: Mon Apr 23 10:03:41 2018
Thread-2: Mon Apr 23 10:03:43 2018
Thread-2: Mon Apr 23 10:03:47 2018
Thread-2: Mon Apr 23 10:03:51 2018

<threading>モジュール

*<threading>* モジュールはオブジェクト指向の方法で実装し、すべてのスレッドをオブジェクトとして扱います。 したがって、<_ thread>モジュールよりもはるかに強力で高レベルのスレッドサポートを提供します。 このモジュールはPython 2.4に含まれています。

<threading>モジュールの追加メソッド

*<threading>* モジュールには *<_ thread>* モジュールのすべてのメソッドが含まれますが、追加のメソッドも提供します。 追加の方法は次のとおりです-
  • threading.activeCount()*-このメソッドは、アクティブなスレッドオブジェクトの数を返します
  • threading.currentThread()*-このメソッドは、呼び出し元のスレッドコントロール内のスレッドオブジェクトの数を返します。
  • threading.enumerate()*-このメソッドは、現在アクティブなすべてのスレッドオブジェクトのリストを返します。

スレッドを実装するために、 <threading> モジュールには、次のメソッドを提供する Thread クラスがあります-

  • * run()*-run()メソッドは、スレッドのエントリポイントです。
  • * start()*-start()メソッドは、runメソッドを呼び出してスレッドを開始します。
  • * join([time])*-join()はスレッドが終了するのを待ちます。
  • * isAlive()*-isAlive()メソッドは、スレッドがまだ実行中かどうかを確認します。
  • * getName()*-getName()メソッドは、スレッドの名前を返します。
  • * setName()*-setName()メソッドは、スレッドの名前を設定します。

<threading>モジュールを使用してスレッドを作成する方法は?

このセクションでは、 <threading> モジュールを使用してスレッドを作成する方法を学習します。 次の手順に従って、<threading>モジュールを使用して新しいスレッドを作成します-

  • ステップ1 *-このステップでは、 *Thread クラスの新しいサブクラスを定義する必要があります。
  • ステップ2 *-さらに引数を追加するには、 init(self [、args])*メソッドをオーバーライドする必要があります。
  • *ステップ3 *-このステップでは、run(self [、args])メソッドをオーバーライドして、開始時にスレッドが行うべきことを実装する必要があります。

*<threading>* モジュールを使用して新しいスレッドを生成する方法を学習するには、この例を検討してください。
import threading
import time
exitFlag = 0

class myThread (threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print ("Starting " + self.name)
      print_time(self.name, self.counter, 5)
      print ("Exiting " + self.name)
def print_time(threadName, delay, counter):
   while counter:
      if exitFlag:
         threadName.exit()
      time.sleep(delay)
      print ("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1

thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("Exiting Main Thread")
Starting Thread-1
Starting Thread-2

出力

今、次の出力を考慮してください-

Thread-1: Mon Apr 23 10:52:09 2018
Thread-1: Mon Apr 23 10:52:10 2018
Thread-2: Mon Apr 23 10:52:10 2018
Thread-1: Mon Apr 23 10:52:11 2018
Thread-1: Mon Apr 23 10:52:12 2018
Thread-2: Mon Apr 23 10:52:12 2018
Thread-1: Mon Apr 23 10:52:13 2018
Exiting Thread-1
Thread-2: Mon Apr 23 10:52:14 2018
Thread-2: Mon Apr 23 10:52:16 2018
Thread-2: Mon Apr 23 10:52:18 2018
Exiting Thread-2
Exiting Main Thread

さまざまなスレッド状態のPythonプログラム

スレッドの状態には、新規、実行可能、実行中、待機中、デッドの5つがあります。 これらの5つのうち、主に3つの状態(実行中、待機中、および死亡)に焦点を当てます。 スレッドは、実行状態のリソースを取得し、待機状態のリソースを待機します。実行および取得されたリソースの最終リリースは、デッド状態です。

start()、sleep()、およびjoin()メソッドの助けを借りた次のPythonプログラムは、スレッドがそれぞれ実行状態、待機状態、およびデッド状態に入った方法を示します。

  • ステップ1 *-必要なモジュール、<threading>および<time>をインポートします
import threading
import time
  • ステップ2 *-スレッドの作成中に呼び出される関数を定義します。
def thread_states():
   print("Thread entered in running state")
  • ステップ3 *-timeモジュールのsleep()メソッドを使用して、スレッドを2秒待機させます。
time.sleep(2)
  • ステップ4 *-ここで、上記で定義された関数の引数を取るT1という名前のスレッドを作成しています。
T1 = threading.Thread(target=thread_states)
  • ステップ5 *-これで、start()関数の助けを借りて、スレッドを開始できます。 関数を定義するときに設定されたメッセージが生成されます。
T1.start()
Thread entered in running state
  • ステップ6 *-実行が完了した後、ようやくjoin()メソッドでスレッドを強制終了できます。
T1.join()

Pythonでスレッドを開始する

Pythonでは、さまざまな方法で新しいスレッドを開始できますが、その中で最も簡単な方法は、単一の関数として定義することです。 関数を定義した後、これを新しい threading.Thread オブジェクトのターゲットとして渡すことができます。 次のPythonコードを実行して、関数の仕組みを理解します-

import threading
import time
import random
def Thread_execution(i):
   print("Execution of Thread {} started\n".format(i))
   sleepTime = random.randint(1,4)
   time.sleep(sleepTime)
   print("Execution of Thread {} finished".format(i))
for i in range(4):
   thread = threading.Thread(target=Thread_execution, args=(i,))
   thread.start()
   print("Active Threads:" , threading.enumerate())

出力

Execution of Thread 0 started
Active Threads:
   [<_MainThread(MainThread, started 6040)>,
      <HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
      <Thread(Thread-3576, started 3932)>]

Execution of Thread 1 started
Active Threads:
   [<_MainThread(MainThread, started 6040)>,
      <HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
      <Thread(Thread-3576, started 3932)>,
      <Thread(Thread-3577, started 3080)>]

Execution of Thread 2 started
Active Threads:
   [<_MainThread(MainThread, started 6040)>,
      <HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
      <Thread(Thread-3576, started 3932)>,
      <Thread(Thread-3577, started 3080)>,
      <Thread(Thread-3578, started 2268)>]

Execution of Thread 3 started
Active Threads:
   [<_MainThread(MainThread, started 6040)>,
      <HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
      <Thread(Thread-3576, started 3932)>,
      <Thread(Thread-3577, started 3080)>,
      <Thread(Thread-3578, started 2268)>,
      <Thread(Thread-3579, started 4520)>]
Execution of Thread 0 finished
Execution of Thread 1 finished
Execution of Thread 2 finished
Execution of Thread 3 finished

Pythonのデーモンスレッド

Pythonでデーモンスレッドを実装する前に、デーモンスレッドとその使用法を知る必要があります。 コンピューティングに関しては、デーモンは、データ送信、ファイル転送などのさまざまなサービスの要求を処理するバックグラウンドプロセスです。 不要になった場合は休止状態になります。 同じタスクは、非デーモンスレッドの助けを借りて行うこともできます。 ただし、この場合、メインスレッドは非デーモンスレッドを手動で追跡する必要があります。 一方、デーモンスレッドを使用している場合、メインスレッドはこれを完全に忘れることがあり、メインスレッドが終了すると強制終了されます。 デーモンスレッドに関するもう1つの重要な点は、デーモンスレッドが完了しないか、途中で強制終了された場合に影響を与えない重要でないタスクにのみ使用することを選択できることです。 以下は、Pythonのデーモンスレッドの実装です-

import threading
import time

def nondaemonThread():
   print("starting my thread")
   time.sleep(8)
   print("ending my thread")
def daemonThread():
   while True:
   print("Hello")
   time.sleep(2)
if __name__ == '__main__':
   nondaemonThread = threading.Thread(target = nondaemonThread)
   daemonThread = threading.Thread(target = daemonThread)
   daemonThread.setDaemon(True)
   daemonThread.start()
   nondaemonThread.start()

上記のコードには、> nondaemonThread()*および> daemonThread()*という2つの関数があります。 最初の関数はその状態を出力し、8秒後にスリープしますが、deamonThread()関数は2秒ごとに無期限にHelloを出力します。 私たちは次の出力の助けを借りて、非デーモンとデーモンスレッドの違いを理解することができます-

Hello

starting my thread
Hello
Hello
Hello
Hello
ending my thread
Hello
Hello
Hello
Hello
Hello

Concurrency-in-python-synchronizing-threads

スレッド相互通信

現実には、人々のチームが共通のタスクに取り組んでいる場合、タスクを適切に終了するためにコミュニケーションが必要です。 同じ類推はスレッドにも適用できます。 プログラミングでは、プロセッサの理想的な時間を短縮するために、複数のスレッドを作成し、すべてのスレッドに異なるサブタスクを割り当てます。 したがって、通信機能が必要であり、相互に対話して、同期された方法でジョブを終了する必要があります。

スレッド相互通信に関連する次の重要な点を考慮してください-

  • パフォーマンス向上なし-スレッドとプロセス間の適切な通信を達成できない場合、同時実行性と並列処理によるパフォーマンス向上は役に立ちません。
  • タスクを適切に実行-スレッド間の適切な相互通信メカニズムがないと、割り当てられたタスクを適切に完了できません。
  • プロセス間通信よりも効率的-プロセス内のすべてのスレッドが同じアドレス空間を共有し、共有メモリを使用する必要がないため、スレッド間通信はプロセス間通信よりも効率的で使いやすいです。

スレッドセーフな通信のためのPythonデータ構造

マルチスレッドコードでは、あるスレッドから別のスレッドに情報を渡すという問題が発生します。 標準の通信プリミティブはこの問題を解決しません。 したがって、スレッド間でオブジェクトを共有して通信をスレッドセーフにするために、独自の複合オブジェクトを実装する必要があります。 以下は、いくつかのデータ構造であり、それらにいくつかの変更を加えた後にスレッドセーフな通信を提供します-

Sets

スレッドセーフな方法でsetデータ構造を使用するには、setクラスを拡張して独自のロックメカニズムを実装する必要があります。

ここにクラスを拡張するPythonの例があります-

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()

   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

上記の例では、 extend_class という名前のクラスオブジェクトが定義されており、Python set class からさらに継承されています。 ロックオブジェクトは、このクラスのコンストラクター内で作成されます。 現在、2つの関数-* add() delete()があります。 これらの関数は定義されており、スレッドセーフです。 どちらも *super クラスの機能に依存していますが、1つの重要な例外があります。

デコレータ

これは、スレッドセーフ通信のもう1つの重要な方法であり、デコレータを使用します。

デコレータ&mminus;の使用方法を示すPythonの例を考えてみましょう。

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

上記の例では、lock_decoratorという名前のデコレータメソッドが定義されており、Pythonメソッドクラスからさらに継承されています。 次に、このクラスのコンストラクター内にロックオブジェクトが作成されます。 現在、add()とdelete()の2つの関数があります。 これらの関数は定義されており、スレッドセーフです。 どちらもスーパークラスの機能に依存していますが、1つの重要な例外があります。

リスト

リストのデータ構造は、スレッドセーフで、迅速で、一時的なメモリ内ストレージの簡単な構造です。 Cpythonでは、GILはそれらへの同時アクセスから保護します。 リストがスレッドセーフであることを知るようになりましたが、リストにあるデータについてはどうでしょう。 実際、リストのデータは保護されていません。 たとえば、* L.append(x)は、別のスレッドが同じことをしようとしている場合に期待される結果を返すことを保証しません。 これは、 append()*はアトミック操作でスレッドセーフですが、他のスレッドがリストのデータを並行して変更しようとしているため、出力に競合状態の副作用が見られるためです。

この種の問題を解決し、データを安全に変更するには、適切なロックメカニズムを実装する必要があります。これにより、複数のスレッドが競合状態になる可能性がさらになくなります。 適切なロックメカニズムを実装するために、前の例で行ったようにクラスを拡張できます。

リスト上の他のいくつかのアトミック操作は次のとおりです-

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

ここに-

  • L、L1、L2はすべてリストです
  • D、D1、D2は辞書です
  • x、yはオブジェクト
  • i、jはintです

キュー

リストのデータが保護されていない場合、結果に直面する必要があります。 競合状態の間違ったデータ項目を取得または削除する場合があります。 そのため、キューデータ構造を使用することをお勧めします。 キューの実際の例は、車線が最初に出て最初に出る単一車線の片道です。 チケットウィンドウとバス停でのキューのより現実的な例が見られます。

キュー

キューはデフォルトでスレッドセーフなデータ構造であり、複雑なロックメカニズムの実装について心配する必要はありません。 Pythonは、アプリケーションでさまざまなタイプのキューを使用するモジュールを提供します。

キューの種類

このセクションでは、さまざまなタイプのキューについて獲得します。 Pythonは、 <queue> モジュールから使用するキューの3つのオプションを提供します-

  • 通常のキュー(FIFO、先入れ先出し)
  • LIFO、後入れ先出し
  • 優先度

後続のセクションでさまざまなキューについて学習します。

通常のキュー(FIFO、先入れ先出し)

Pythonが提供する最も一般的に使用されるキュー実装です。 このキューイングメカニズムでは、誰でも最初に来る人が最初にサービスを取得します。 FIFOは通常のキューとも呼ばれます。 FIFOキューは次のように表すことができます-

FIFO

FIFOキューのPython実装

Pythonでは、FIFOキューはシングルスレッドとマルチスレッドで実装できます。

シングルスレッドのFIFOキュー

単一スレッドでFIFOキューを実装するために、 Queue クラスは基本的な先入れ先出しコンテナーを実装します。 要素は、* put()を使用してシーケンスの一方の「端」に追加され、 get()*を使用してもう一方の端から削除されます。

以下は、シングルスレッドでFIFOキューを実装するためのPythonプログラムです-

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

出力

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

出力は、上記のプログラムが単一のスレッドを使用して、エレメントが挿入されたのと同じ順序でキューから削除されることを示していることを示しています。

複数のスレッドを持つFIFOキュー

複数のスレッドでFIFOを実装するには、キューモジュールから拡張されたmyqueue()関数を定義する必要があります。 get()およびput()メソッドの動作は、シングルスレッドでFIFOキューを実装する際の上記と同じです。 次に、マルチスレッド化するには、スレッドを宣言してインスタンス化する必要があります。 これらのスレッドは、FIFO方式でキューを消費します。

以下は、複数のスレッドでFIFOキューを実装するためのPythonプログラムです

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

LIFO、先入れ先出しキュー

このキューは、FIFO(先入れ先出し)キューとはまったく逆のアナロジーを使用します。 このキューイングメカニズムでは、最後に来た人が最初にサービスを取得します。 これは、スタックデータ構造の実装に似ています。 LIFOキューは、人工知能のアルゴリズムのような深さ優先の検索を実装する際に役立ちます。

LIFOキューのPython実装

Pythonでは、LIFOキューはマルチスレッドだけでなくシングルスレッドでも実装できます。

シングルスレッドのLIFOキュー

単一スレッドでLIFOキューを実装するため、 Queue クラスは、構造 Queue .LifoQueueを使用して、基本的な後入れ先出しコンテナーを実装します。 これで、* put()を呼び出すと、コンテナのヘッドに要素が追加され、 get()*を使用してもヘッドから削除されます。

以下は、シングルスレッドでLIFOキューを実装するためのPythonプログラムです-

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

出力は、上記のプログラムが単一のスレッドを使用して、要素が挿入された順序とは逆の順序でキューから削除されることを示していることを示しています。

複数のスレッドを持つLIFOキュー

実装は、複数のスレッドでFIFOキューの実装を行ったのと同様です。 唯一の違いは、構造 Queue.LifoQueue を使用して、基本的な後入れ先出しコンテナーを実装する Queue クラスを使用する必要があることです。

以下は、複数のスレッドでLIFOキューを実装するためのPythonプログラムです-

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

優先キュー

FIFOおよびLIFOキューでは、アイテムの順序は挿入の順序に関連しています。 ただし、挿入の順序よりも優先順位が重要な場合が多くあります。 実世界の例を考えてみましょう。 空港のセキュリティがさまざまなカテゴリの人々をチェックしているとします。 VVIPの人々、航空会社のスタッフ、税関職員、カテゴリは、一般人のように到着に基づいてチェックされるのではなく、優先的にチェックされる場合があります。

優先度キューで考慮する必要があるもう1つの重要な側面は、タスクスケジューラの開発方法です。 一般的な設計の1つは、キュー内で優先度に基づいて最も多くのエージェントタスクを処理することです。 このデータ構造を使用して、優先度値に基づいてキューからアイテムを取得できます。

優先度キューのPython実装

Pythonでは、優先度キューはマルチスレッドだけでなくシングルスレッドでも実装できます。

シングルスレッドの優先キュー

単一スレッドで優先度キューを実装するために、 Queue クラスは、構造 Queue .PriorityQueueを使用して優先度コンテナにタスクを実装します。 現在、* put()を呼び出すと、最低値が最高の優先度を持つ値が要素に追加されるため、 get()*を使用して最初に取得されます。

シングルスレッドで優先度キューを実装するための次のPythonプログラムを検討してください-

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

出力

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

上記の出力では、優先度に基づいてアイテムがキューに保存されていることがわかります。値が小さいほど優先度が高くなります。

マルチスレッドの優先キュー

実装は、複数のスレッドを持つFIFOおよびLIFOキューの実装に似ています。 唯一の違いは、構造 Queue.PriorityQueue を使用して優先度を初期化するために Queue クラスを使用する必要があることです。 別の違いは、キューの生成方法にあります。 以下の例では、2つの同一のデータセットで生成されます。

次のPythonプログラムは、複数のスレッドで優先度キューの実装に役立ちます-

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue

スレッドアプリケーションのテスト

この章では、スレッドアプリケーションのテストについて学習します。 また、テストの重要性も学びます。

テストする理由

テストの重要性について議論する前に、テストとは何かを知る必要があります。 一般的に、テストとは、何かがうまく機能しているかどうかを調べる手法です。 一方、特にコンピュータープログラムまたはソフトウェアについて話す場合、テストはソフトウェアプログラムの機能にアクセスする手法です。

このセクションでは、ソフトウェアテストの重要性について説明します。 ソフトウェア開発では、クライアントにソフトウェアをリリースする前に二重チェックが必要です。 そのため、経験豊富なテストチームがソフトウェアをテストすることが非常に重要です。 ソフトウェアテストの重要性を理解するために、次の点を考慮してください-

ソフトウェア品質の改善

確かに、低品質のソフトウェアを提供したい企業や低品質のソフトウェアを購入したいクライアントはいません。 テストは、その中のバグを見つけて修正することにより、ソフトウェアの品質を向上させます。

顧客満足度

ビジネスの最も重要な部分は、顧客の満足度です。 バグのない良質のソフトウェアを提供することにより、企業は顧客満足度を達成できます。

新機能の影響を軽減

10000行のソフトウェアシステムを作成し、新しい機能を追加する必要があると仮定すると、開発チームはこの新しい機能がソフトウェア全体に及ぼす影響について懸念を持つことになります。 ここでも、テストが重要な役割を果たします。テストチームが適切なテストスイートを作成した場合、壊滅的な中断から私たちを救うことができるからです。

ユーザー体験

ビジネスのもう1つの最も重要な部分は、その製品のユーザーのエクスペリエンスです。 エンドユーザーが製品をシンプルかつ簡単に使用できることを確認できるのは、テストのみです。

経費削減

テストでは、配布後に修正するのではなく、開発のテスト段階でバグを見つけて修正することにより、ソフトウェアの総コストを削減できます。 ソフトウェアの配信後に大きなバグがある場合、費用の観点から見た具体的なコストと、顧客の不満、会社の否定的な評判などの観点から見た無形コストが増加します。

何をテストしますか?

テスト対象について適切な知識を持つことが常に推奨されます。 このセクションでは、まずソフトウェアをテストする際のテスターの主な動機であることを理解します。 コードカバレッジ、つまり、テスト中にテストスイートがヒットするコードの行数は避ける必要があります。 これは、テスト中にコードの行数だけに焦点を合わせると、システムに実質的な価値が追加されないためです。 いくつかのバグが残っている場合があり、それらは展開後でも後の段階で反映されます。

テスト対象に関連する次の重要な点を考慮してください-

  • コードカバレッジではなく、コードの機能のテストに集中する必要があります。
  • 最初にコードの最も重要な部分をテストし、次にコードの重要度の低い部分に向かって移動する必要があります。 それは間違いなく時間を節約します。
  • テスターは、ソフトウェアを限界までプッシュできるさまざまなテストを行う必要があります。

並行ソフトウェアプログラムをテストするためのアプローチ

マルチコアアーキテクチャの真の機能を利用できるため、コンカレントソフトウェアシステムがシーケンシャルシステムに取って代わります。 最近では、携帯電話から洗濯機、車から飛行機など、すべてで並行システムプログラムが使用されています。 すでにバグが存在する単一のスレッドアプリケーションに複数のスレッドを追加した場合、複数のバグが発生するため、並行ソフトウェアプログラムのテストにはさらに注意する必要があります。

並行ソフトウェアプログラムのテスト手法は、競合状態、デッドロック、アトミック性の違反などの潜在的に有害なパターンを公開するインターリーブの選択に重点を置いています。 以下は、並行ソフトウェアプログラムをテストするための2つのアプローチです-

体系的な調査

このアプローチは、インターリーブのスペースをできるだけ広く探索することを目的としています。 そのようなアプローチはブルートフォース技術を採用でき、他のアプローチは半順序縮小技術またはヒューリスティック技術を採用してインターリーブの空間を探索できます。

プロパティ駆動

プロパティ駆動型のアプローチは、疑わしいメモリアクセスパターンなどの特定のプロパティを公開するインターリーブで並行性障害が発生する可能性が高いという観察に依存しています。 さまざまなプロパティ駆動型のアプローチは、競合状態、デッドロック、原子性の違反など、さまざまな障害を対象としています。これらは、1つまたは他の特定のプロパティにさらに依存します。

テスト戦略

テスト戦略は、テストアプローチとしても知られています。 この戦略は、テストの実行方法を定義します。 テストアプローチには2つのテクニックがあります-

積極的

ビルドを作成する前に欠陥を見つけて修正するために、テスト設計プロセスをできるだけ早く開始するアプローチ。

リアクティブ

開発プロセスが完了するまでテストを開始しないアプローチ。

pythonプログラムにテスト戦略またはアプローチを適用する前に、ソフトウェアプログラムに発生する可能性のあるエラーの種類についての基本的な考え方が必要です。 エラーは次のとおりです-

構文エラー

プログラム開発中に、多くの小さなエラーが発生する可能性があります。 エラーは主に入力ミスによるものです。 たとえば、コロンの欠落やキーワードのつづりの誤りなど。 このようなエラーは、ロジックではなくプログラム構文の誤りによるものです。 したがって、これらのエラーは構文エラーと呼ばれます。

セマンティックエラー

セマンティックエラーは、論理エラーとも呼ばれます。 ソフトウェアプログラムに論理的または意味的なエラーがある場合、ステートメントは正しくコンパイルおよび実行されますが、ロジックが正しくないため、目的の出力が得られません。

単体テスト

これは、Pythonプログラムをテストするために最も使用されるテスト戦略の1つです。 この戦略は、コードのユニットまたはコンポーネントのテストに使用されます。 ユニットまたはコンポーネントとは、コードのクラスまたは機能を意味します。 ユニットテストは、「小さな」ユニットをテストすることにより、大規模なプログラミングシステムのテストを簡素化します。 上記の概念の助けを借りて、単体テストは、ソースコードの個々のユニットがテストされて、目的の出力が返されるかどうかを判断する方法として定義できます。

以降のセクションでは、単体テスト用のさまざまなPythonモジュールについて学習します。

unittestモジュール

ユニットテストの最初のモジュールはユニットテストモジュールです。 JUnitに触発され、デフォルトでPython3.6に含まれています。 テストの自動化、テストのセットアップおよびシャットダウンコードの共有、テストのコレクションへの集約、レポートフレームワークからのテストの独立性をサポートします。

以下は、unittestモジュールでサポートされるいくつかの重要な概念です。

テキストフィクスチャ

テストを開始する前に実行し、テストの終了後に解体できるように、テストを設定するために使用されます。 一時データベース、ディレクトリなどの作成が含まれる場合があります。 テストを開始する前に必要です。

テストケース

テストケースは、必要な応答が特定の入力セットから来ているかどうかをチェックします。 unittestモジュールには、新しいテストケースの作成に使用できるTestCaseという名前の基本クラスが含まれています。 デフォルトで2つの方法が含まれています-

  • * setUp()*-テストフィクスチャを実行する前にセットアップするフックメソッド。 これは、実装されたテストメソッドを呼び出す前に呼び出されます。
  • * tearDown(*-クラス内のすべてのテストを実行した後、クラスフィクスチャを分解するためのフックメソッド。

テストスイート

これは、テストスイート、テストケース、またはその両方のコレクションです。

テストランナー

テストケースまたはスーツの実行を制御し、結果をユーザーに提供します。 結果を提供するために、GUIまたはシンプルなテキストインターフェイスを使用できます。

次のPythonプログラムは、unittestモジュールを使用して Fibonacci というモジュールをテストします。 このプログラムは、数値のフィボナッチ数列の計算に役立ちます。 この例では、異なるメソッドを使用してテストケースを定義するために、Fibo_testという名前のクラスを作成しました。 これらのメソッドは、unittest.TestCaseから継承されます。 デフォルトでは、setUp()とtearDown()の2つのメソッドを使用しています。 testfibocalメソッドも定義します。 テストの名前はレターテストで始まる必要があります。 最後のブロックでは、unittest.main()がテストスクリプトへのコマンドラインインターフェイスを提供します。

import unittest
def fibonacci(n):
   a, b = 0, 1
   for i in range(n):
   a, b = b, a + b
   return a
class Fibo_Test(unittest.TestCase):
   def setUp(self):
   print("This is run before our tests would be executed")
   def tearDown(self):
   print("This is run after the completion of execution of our tests")

   def testfibocal(self):
   self.assertEqual(fib(0), 0)
   self.assertEqual(fib(1), 1)
   self.assertEqual(fib(5), 5)
   self.assertEqual(fib(10), 55)
   self.assertEqual(fib(20), 6765)

if __name__ == "__main__":
   unittest.main()

コマンドラインから実行すると、上記のスクリプトは次のような出力を生成します-

出力

This runs before our tests would be executed.
This runs after the completion of execution of our tests.
.
----------------------------------------------------------------------
Ran 1 test in 0.006s
OK

今、それを明確にするために、フィボナッチモジュールの定義に役立つコードを変更しています。

例として次のコードブロックを考慮してください-

def fibonacci(n):
   a, b = 0, 1
   for i in range(n):
   a, b = b, a + b
   return a

以下に示すように、コードブロックにいくつかの変更が加えられます-

def fibonacci(n):
   a, b = 1, 1
   for i in range(n):
   a, b = b, a + b
   return a

今、変更されたコードでスクリプトを実行した後、次の出力を取得します-

This runs before our tests would be executed.
This runs after the completion of execution of our tests.
F
======================================================================
FAIL: testCalculation (__main__.Fibo_Test)
----------------------------------------------------------------------
Traceback (most recent call last):
File "unitg.py", line 15, in testCalculation
self.assertEqual(fib(0), 0)
AssertionError: 1 != 0
----------------------------------------------------------------------
Ran 1 test in 0.007s

FAILED (failures = 1)

上記の出力は、モジュールが目的の出力を提供できなかったことを示しています。

ドックテストモジュール

docktestモジュールは単体テストにも役立ちます。 また、Pythonがあらかじめパッケージ化されています。 unittestモジュールよりも使いやすいです。 unittestモジュールは、複雑なテストにより適しています。 doctestモジュールを使用するには、インポートする必要があります。 対応する関数のdocstringには、出力とともにインタラクティブなPythonセッションが必要です。

コードのすべてが正常であれば、docktestモジュールからの出力はありません。それ以外の場合は、出力が提供されます。

次のPythonの例では、docktestモジュールを使用してFibonacciというモジュールをテストします。これは、数値のフィボナッチ数列の計算に役立ちます。

import doctest
def fibonacci(n):
   """
   Calculates the Fibonacci number

   >>> fibonacci(0)
   0
   >>> fibonacci(1)
   1
   >>> fibonacci(10)
   55
   >>> fibonacci(20)
   6765
   >>>

   """
   a, b = 1, 1
   for i in range(n):
   a, b = b, a + b
   return a
      if __name__ == "__main__":
   doctest.testmod()

fibという名前の対応する関数のdocstringには、出力とともにインタラクティブなpythonセッションがあったことがわかります。 コードが正常であれば、doctestモジュールからの出力はありません。 しかし、それがどのように機能するかを確認するには、-vオプションを使用して実行します。

(base) D:\ProgramData>python dock_test.py -v
Trying:
   fibonacci(0)
Expecting:
   0
ok
Trying:
   fibonacci(1)
Expecting:
   1
ok
Trying:
   fibonacci(10)
Expecting:
   55
ok
Trying:
   fibonacci(20)
Expecting:
   6765
ok
1 items had no tests:
   __main__
1 items passed all tests:
4 tests in __main__.fibonacci
4 tests in 2 items.
4 passed and 0 failed.
Test passed.

ここで、フィボナッチモジュールの定義に役立つコードを変更します

例として次のコードブロックを考慮してください-

def fibonacci(n):
   a, b = 0, 1
   for i in range(n):
   a, b = b, a + b
   return a

次のコードブロックは、変更に役立ちます-

def fibonacci(n):
   a, b = 1, 1
   for i in range(n):
   a, b = b, a + b
   return a

変更されたコードを使用して、-vオプションがなくてもスクリプトを実行すると、次のように出力が得られます。

出力

(base) D:\ProgramData>python dock_test.py
**********************************************************************
File "unitg.py", line 6, in __main__.fibonacci
Failed example:
   fibonacci(0)
Expected:
   0
Got:
   1
**********************************************************************
File "unitg.py", line 10, in __main__.fibonacci
Failed example:
   fibonacci(10)
Expected:
   55
Got:
   89
**********************************************************************
File "unitg.py", line 12, in __main__.fibonacci
Failed example:
   fibonacci(20)
Expected:
   6765
Got:
   10946
**********************************************************************
1 items had failures:
   3 of 4 in __main__.fibonacci
***Test Failed*** 3 failures.

上記の出力では、3つのテストが失敗したことがわかります。

スレッドアプリケーションのデバッグ

この章では、スレッドアプリケーションのデバッグ方法を学習します。 また、デバッグの重要性も学びます。

デバッグとは何ですか?

コンピュータープログラミングでは、デバッグとは、コンピュータープログラムのバグ、エラー、および異常を見つけて削除するプロセスです。 このプロセスは、コードが記述されるとすぐに開始され、コードがプログラミングの他のユニットと組み合わされてソフトウェア製品が形成されると、連続した段階で継続されます。 デバッグはソフトウェアテストプロセスの一部であり、ソフトウェア開発ライフサイクル全体の不可欠な部分です。

Pythonデバッガー

Pythonデバッガーまたは pdb は、Python標準ライブラリの一部です。 見つけにくいバグを追跡するための優れたフォールバックツールであり、障害のあるコードを迅速かつ確実に修正できます。 以下は、 pdp デバッガーの2つの最も重要なタスクです-

  • これにより、実行時に変数の値を確認できます。
  • コードをステップ実行して、ブレークポイントを設定することもできます。

私たちは次の2つの方法でpdbで作業することができます-

  • コマンドラインから;これは事後デバッグとも呼ばれます。
  • 対話的にpdbを実行します。

pdbでの作業

Pythonデバッガで作業するには、デバッガに侵入したい場所で次のコードを使用する必要があります-

import pdb;
pdb.set_trace()

コマンドラインからpdbを操作するには、次のコマンドを検討してください。

  • h(ヘルプ)
  • d(下)
  • u(up)
  • b(ブレーク)
  • cl(クリア)
  • l(リスト))
  • n(次))
  • c(続行)
  • s(ステップ)
  • r(リターン))
  • b(ブレーク)

以下は、Pythonデバッガーのh(help)コマンドのデモです-

import pdb

pdb.set_trace()
--Call--
>d:\programdata\lib\site-packages\ipython\core\displayhook.py(247)__call__()
-> def __call__(self, result = None):
(Pdb) h

Documented commands (type help <topic>):
========================================
EOF   c         d       h        list     q       rv      undisplay
a     cl        debug   help     ll       quit    s       unt
alias clear     disable ignore   longlist r       source  until
args  commands  display interact n        restart step    up
b     condition down    j        next     return  tbreak  w
break cont      enable  jump     p        retval  u       whatis
bt    continue  exit    l        pp       run     unalias where

Miscellaneous help topics:
==========================
exec pdb

Pythonデバッガーでの作業中に、次の行を使用してスクリプト内の任意の場所にブレークポイントを設定できます-

import pdb;
pdb.set_trace()

ブレークポイントを設定した後、スクリプトを正常に実行できます。 スクリプトは特定のポイントまで実行されます。行が設定されるまで。 スクリプト内のさまざまな場所で上記の行を使用してスクリプトを実行する次の例を検討してください-

import pdb;
a = "aaa"
pdb.set_trace()
b = "bbb"
c = "ccc"
final = a + b + c
print (final)

上記のスクリプトを実行すると、a =“ aaa”までプログラムが実行されます。これは、次の出力で確認できます。

出力

--Return--
> <ipython-input-7-8a7d1b5cc854>(3)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
*** NameError: name 'b' is not defined
(Pdb) p c
*** NameError: name 'c' is not defined

pdbでコマンド「p(print)」を使用すると、このスクリプトは「aaa」のみを印刷します。 a = "aaa"までブレークポイントを設定しているため、これに続いてエラーが発生します。

同様に、ブレークポイントを変更してスクリプトを実行し、出力の違いを確認できます-

import pdb
a = "aaa"
b = "bbb"
c = "ccc"
pdb.set_trace()
final = a + b + c
print (final)

出力

--Return--
> <ipython-input-9-a59ef5caf723>(5)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
'bbb'
(Pdb) p c
'ccc'
(Pdb) p final
*** NameError: name 'final' is not defined
(Pdb) exit

次のスクリプトでは、プログラムの最後の行にブレークポイントを設定しています-

import pdb
a = "aaa"
b = "bbb"
c = "ccc"
final = a + b + c
pdb.set_trace()
print (final)

出力は次のとおりです-

--Return--
> <ipython-input-11-8019b029997d>(6)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
'bbb'
(Pdb) p c
'ccc'
(Pdb) p final
'aaabbbccc'
(Pdb)

ベンチマークとプロファイリング

この章では、ベンチマークとプロファイリングがパフォーマンスの問題にどのように役立つかを学習します。

コードを作成して、目的の結果が得られたとしても、ニーズが変わったためにこのコードを少し速く実行したい場合はどうでしょうか。 この場合、コードのどの部分がプログラム全体を遅くしているのかを知る必要があります。 この場合、ベンチマークとプロファイリングが役立ちます。

ベンチマークとは何ですか?

ベンチマークの目的は、標準と比較して何かを評価することです。 ただし、ここで生じる問題は、ベンチマークとなるものと、ソフトウェアプログラミングの場合にベンチマークが必要な理由です。 コードのベンチマークとは、コードの実行速度とボトルネックの場所を意味します。 ベンチマークの主な理由の1つは、コードを最適化することです。

ベンチマークはどのように機能しますか?

ベンチマークの仕組みについて話す場合、プログラム全体を1つの現在の状態としてベンチマークすることから始めてから、マイクロベンチマークを組み合わせて、プログラムをより小さなプログラムに分解することができます。 プログラム内のボトルネックを見つけて最適化するため。 言い換えれば、大きくて難しい問題を、それらを最適化するための一連の小さくて少し簡単な問題に分割することとして理解できます。

ベンチマーク用のPythonモジュール

Pythonには、 timeit と呼ばれるベンチマーク用のデフォルトモジュールがあります。 timeit モジュールの助けを借りて、メインプログラム内の小さなPythonコードのパフォーマンスを測定できます。

次のPythonスクリプトでは、 timeit モジュールをインポートしています。このモジュールは、2つの関数( functionA および functionB )の実行にかかる時間をさらに測定します-

import timeit
import time
def functionA():
   print("Function A starts the execution:")
   print("Function A completes the execution:")
def functionB():
   print("Function B starts the execution")
   print("Function B completes the execution")
start_time = timeit.default_timer()
functionA()
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
functionB()
print(timeit.default_timer() - start_time)

上記のスクリプトを実行した後、次のように両方の関数の実行時間を取得します。

出力

Function A starts the execution:
Function A completes the execution:
0.0014599495514175942
Function B starts the execution
Function B completes the execution
0.0017024724827479076

デコレータ関数を使用して独自のタイマーを作成する

Pythonでは、 timeit モジュールのように動作する独自のタイマーを作成できます。 decorator 関数を使用して実行できます。 以下は、カスタムタイマーの例です-

import random
import time

def timer_func(func):

   def function_timer(*args, **kwargs):
   start = time.time()
   value = func(*args, **kwargs)
   end = time.time()
   runtime = end - start
   msg = "{func} took {time} seconds to complete its execution."
      print(msg.format(func = func.__name__,time = runtime))
   return value
   return function_timer

@timer_func
def Myfunction():
   for x in range(5):
   sleep_time = random.choice(range(1,3))
   time.sleep(sleep_time)

if __name__ == '__main__':
   Myfunction()

上記のPythonスクリプトは、ランダム時間モジュールのインポートに役立ちます。 timer_func()デコレータ関数を作成しました。 これには、その中にfunction_timer()関数があります。 これで、ネストされた関数は、渡された関数を呼び出す前に時間を取得します。 次に、関数が戻るまで待機し、終了時間を取得します。 このようにして、最終的にpythonスクリプトに実行時間を出力させることができます。 スクリプトは、次のように出力を生成します。

出力

Myfunction took 8.000457763671875 seconds to complete its execution.

プロファイリングとは

プログラマーは、メモリの使用、時間の複雑さ、またはプログラムに関する特定の命令の使用などの属性を測定して、そのプログラムの実際の能力を測定したいことがあります。 このようなプログラムに関する測定は、プロファイリングと呼ばれます。 プロファイリングでは、動的プログラム分析を使用してこのような測定を行います。

後続のセクションでは、プロファイリング用のさまざまなPythonモジュールについて学習します。

cProfile –組み込みモジュール

*cProfile* は、プロファイリング用のPython組み込みモジュールです。 このモジュールは、妥当なオーバーヘッドを持つC拡張機能であり、長時間実行されるプログラムのプロファイリングに適しています。 実行後、すべての機能と実行時間が記録されます。 それは非常に強力ですが、時には解釈して行動するのが少し難しいです。 次の例では、以下のコードでcProfileを使用しています-

def increment_global():

   global x
   x += 1

def taskofThread(lock):

   for _ in range(50000):
   lock.acquire()
   increment_global()
   lock.release()

def main():
   global x
   x = 0

   lock = threading.Lock()

   t1 = threading.Thread(target=taskofThread, args=(lock,))
   t2 = threading.Thread(target= taskofThread, args=(lock,))

   t1.start()
   t2.start()

   t1.join()
   t2.join()

if __name__ == "__main__":
   for i in range(5):
      main()
   print("x = {1} after Iteration {0}".format(i,x))

上記のコードは thread_increment.py ファイルに保存されます。 今、次のようにコマンドラインでcProfileでコードを実行します-

(base) D:\ProgramData>python -m cProfile thread_increment.py
x = 100000 after Iteration 0
x = 100000 after Iteration 1
x = 100000 after Iteration 2
x = 100000 after Iteration 3
x = 100000 after Iteration 4
      3577 function calls (3522 primitive calls) in 1.688 seconds

   Ordered by: standard name

   ncalls tottime percall cumtime percall filename:lineno(function)

   5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:103(release)
   5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:143(__init__)
   5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:147(__enter__)
   … … … …

上記の出力から、cProfileは呼び出されたすべての3577関数を、それぞれに費やした時間と呼び出された回数とともに出力することが明らかです。 以下は、出力で得られた列です-

  • ncalls -これは行われた呼び出しの数です。
  • tottime -指定された関数で費やされた合計時間です。
  • percall -tottimeをncallsで割った商を指します。
  • cumtime -これは、このサブ機能およびすべてのサブ機能で費やされた累積時間です。 再帰関数に対しても正確です。
  • percall -cumtimeをプリミティブコールで割った商です。
  • * filename:lineno(function)*-基本的に各関数のそれぞれのデータを提供します。

Pythonの同時実行-スレッドのプール

マルチスレッドタスクのために多数のスレッドを作成する必要があるとします。 スレッドが多すぎるためにパフォーマンスの問題が多く発生する可能性があるため、計算コストが最も高くなります。 大きな問題は、スループットが制限されることです。 スレッドのプールを作成することにより、この問題を解決できます。 スレッドプールは、事前にインスタンス化され、アイドル状態のスレッドのグループとして定義できます。これらのスレッドは、作業を行う準備ができています。 スレッドプールの作成は、多数のタスクを実行する必要がある場合に、すべてのタスクの新しいスレッドをインスタンス化するよりも優先されます。 スレッドプールは、次のように多数のスレッドの同時実行を管理できます-

  • スレッドプール内のスレッドが実行を完了すると、そのスレッドを再利用できます。
  • スレッドが終了すると、別のスレッドが作成され、そのスレッドが置き換えられます。

Pythonモジュール– Concurrent.futures

Python標準ライブラリには、 concurrent.futures モジュールが含まれています。 このモジュールは、非同期タスクを起動するための高レベルインターフェイスを開発者に提供するために、Python 3.2で追加されました。 これは、スレッドまたはプロセスのプールを使用してタスクを実行するためのインターフェースを提供するためのPythonのスレッド化およびマルチプロセッシングモジュールの上部にある抽象化レイヤーです。

以降のセクションでは、concurrent.futuresモジュールのさまざまなクラスについて学習します。

エグゼキュータークラス

  • Executor は、 *concurrent.futures Pythonモジュールの抽象クラスです。 直接使用することはできません。次の具体的なサブクラスのいずれかを使用する必要があります-
  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor –具体的なサブクラス

これは、Executorクラスの具象サブクラスの1つです。 サブクラスはマルチスレッドを使用し、タスクを送信するためのスレッドのプールを取得します。 このプールは、利用可能なスレッドにタスクを割り当て、実行するようにスケジュールします。

ThreadPoolExecutorを作成する方法は?

*concurrent.futures* モジュールとその具体的なサブクラス *Executor* の助けを借りて、スレッドのプールを簡単に作成できます。 このためには、プールに必要なスレッド数で *ThreadPoolExecutor* を構築する必要があります。 デフォルトでは、数は5です。 その後、タスクをスレッドプールに送信できます。 タスクを* submit()*すると、 *Future* が返されます。 Futureオブジェクトには、* done()*というメソッドがあり、futureが解決したかどうかを通知します。 これにより、その特定の将来のオブジェクトに値が設定されました。 タスクが完了すると、スレッドプールエグゼキューターは値をfutureオブジェクトに設定します。

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

出力

False
True
Completed

上記の例では、5つのスレッドで ThreadPoolExecutor が構築されています。 次に、メッセージを与える前に2秒間待機するタスクがスレッドプールエグゼキューターに送信されます。 出力からわかるように、タスクは2秒まで完了しないため、* done()の最初の呼び出しはFalseを返します。 2秒後、タスクは完了し、 result()*メソッドを呼び出して未来の結果を取得します。

ThreadPoolExecutorのインスタンス化–コンテキストマネージャー

*ThreadPoolExecutor* をインスタンス化する別の方法は、コンテキストマネージャーを使用することです。 上記の例で使用した方法と同様に機能します。 コンテキストマネージャを使用する主な利点は、構文的に見た目が良いことです。 インスタンス化は、次のコードの助けを借りて行うことができます-
with ThreadPoolExecutor(max_workers = 5) as executor

次の例は、Pythonドキュメントから引用したものです。 この例では、まず concurrent.futures モジュールをインポートする必要があります。 次に、* load_url()という名前の関数が作成され、要求されたURLがロードされます。 この関数は、プール内の5つのスレッドで *ThreadPoolExecutor を作成します。 ThreadPoolExecutor はコンテキストマネージャーとして利用されています。 将来の結果を取得するには、* result()*メソッドを呼び出します。

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

出力

以下は、上記のPythonスクリプトの出力です。

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Executor.map()関数の使用

Pythonの* map()関数は、多くのタスクで広く使用されています。 そのようなタスクの1つは、特定の関数をiterables内のすべての要素に適用することです。 同様に、イテレータのすべての要素を関数にマップし、これらを独立したジョブとして *ThreadPoolExecutor に送信できます。 関数の動作を理解するために、次のPythonスクリプトの例を検討してください。

以下のこの例では、map関数を使用して、values配列内のすべての値に* square()*関数を適用しています。

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

出力

上記のPythonスクリプトは次の出力を生成します-

4
9
16
25

Pythonの同時実行-プロセスのプール

プロセスのプールは、スレッドのプールを作成して使用したのと同じ方法で作成して使用できます。 プロセスプールは、事前にインスタンス化されたプロセスとアイドルプロセスのグループとして定義できます。 多数のタスクを実行する必要がある場合、すべてのタスクの新しいプロセスをインスタンス化するよりもプロセスプールを作成することをお勧めします。

Pythonモジュール– Concurrent.futures

Python標準ライブラリには、 concurrent.futures というモジュールがあります。 このモジュールは、非同期タスクを起動するための高レベルインターフェイスを開発者に提供するために、Python 3.2で追加されました。 これは、スレッドまたはプロセスのプールを使用してタスクを実行するためのインターフェースを提供するためのPythonのスレッド化およびマルチプロセッシングモジュールの上部にある抽象化レイヤーです。

以降のセクションでは、concurrent.futuresモジュールのさまざまなサブクラスを見ていきます。

エグゼキュータークラス

*Executor* は、 *concurrent.futures* Pythonモジュールの抽象クラスです。 直接使用することはできません。次の具体的なサブクラスのいずれかを使用する必要があります-
  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor –具体的なサブクラス

これは、Executorクラスの具象サブクラスの1つです。 マルチプロセッシングを使用し、タスクを送信するためのプロセスのプールを取得します。 このプールは、利用可能なプロセスにタスクを割り当て、実行するようにスケジュールします。

ProcessPoolExecutorを作成する方法は?

*concurrent.futures* モジュールとその具体的なサブクラス *Executor* を使用して、プロセスのプールを簡単に作成できます。 このために、プールに必要なプロセスの数で *ProcessPoolExecutor* を構築する必要があります。 デフォルトでは、数は5です。 その後、プロセスプールにタスクを送信します。

ここで、スレッドプールの作成時に使用したのと同じ例を検討します。唯一の違いは、 ThreadPoolExecutor の代わりに ProcessPoolExecutor を使用することです。

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

出力

False
False
Completed

上記の例では、Process * PoolExecutor は5つのスレッドで構成されています。 次に、メッセージを出す前に2秒間待機するタスクがプロセスプールエグゼキューターに送信されます。 出力からわかるように、タスクは2秒まで完了しないため、 done()の最初の呼び出しはFalseを返します。 2秒後、タスクは完了し、 result()*メソッドを呼び出して未来の結果を取得します。

ProcessPoolExecutorのインスタンス化–コンテキストマネージャー

ProcessPoolExecutorをインスタンス化する別の方法は、コンテキストマネージャーを使用することです。 上記の例で使用した方法と同様に機能します。 コンテキストマネージャを使用する主な利点は、構文的に見た目が良いことです。 インスタンス化は、次のコードの助けを借りて行うことができます-

with ProcessPoolExecutor(max_workers = 5) as executor

理解を深めるために、スレッドプールの作成時に使用したのと同じ例を取り上げます。 この例では、 concurrent.futures モジュールをインポートすることから始める必要があります。 次に、* load_url()という名前の関数が作成され、要求されたURLがロードされます。 次に、プール内の5つのスレッド数で *ProcessPoolExecutor が作成されます。 Process * PoolExecutor は、コンテキストマネージャーとして利用されています。 将来の結果を取得するには、 result()*メソッドを呼び出します。

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

出力

上記のPythonスクリプトは次の出力を生成します-

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Executor.map()関数の使用

Pythonの* map()関数は、多くのタスクを実行するために広く使用されています。 そのようなタスクの1つは、特定の関数をiterables内のすべての要素に適用することです。 同様に、イテレータのすべての要素を関数にマップし、これらを独立したジョブとして *ProcessPoolExecutor に送信できます。 これを理解するには、次のPythonスクリプトの例を検討してください。

  • Executor.map()関数を使用してスレッドプールを作成するときに使用したのと同じ例を検討します。 以下の例では、map関数を使用して、values配列のすべての値に square()*関数を適用しています。
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

出力

上記のPythonスクリプトは、次の出力を生成します

4
9
16
25

ProcessPoolExecutorとThreadPoolExecutorを使用する場合

executorクラス(ThreadPoolExecutorとProcessPoolExecutor)の両方について学習したので、どのエクゼキューターをいつ使用するかを知る必要があります。 CPUバウンドワークロードの場合はProcessPoolExecutorを選択し、I/Oバウンドワークロードの場合はThreadPoolExecutorを選択する必要があります。

*ProcessPoolExecutor* を使用する場合、マルチプロセッシングを使用するため、GILを心配する必要はありません。 さらに、 *ThreadPoolExecution* と比較すると、実行時間は短くなります。 これを理解するには、次のPythonスクリプト例を検討してください。

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

出力

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

出力

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

上記の両方のプログラムの出力から、 ProcessPoolExecutorThreadPoolExecutor を使用しているときの実行時間の違いを確認できます。

Pythonの同時実行性-マルチプロセッシング

この章では、マルチプロセッシングとマルチスレッドの比較に焦点を当てます。

マルチプロセッシング

これは、単一のコンピューターシステム内で2つ以上のCPUユニットを使用することです。 コンピューターシステムで利用可能なCPUコアを最大限に活用して、ハードウェアから最大限の可能性を引き出すのが最善の方法です。

マルチスレッド

CPUが複数のスレッドを同時に実行することでオペレーティングシステムの使用を管理する能力です。 マルチスレッドの主なアイデアは、プロセスを複数のスレッドに分割することで並列処理を実現することです。

次の表は、それらの間の重要な違いのいくつかを示しています-

Multiprocessing Multiprogramming
Multiprocessing refers to processing of multiple processes at same time by multiple CPUs. Multiprogramming keeps several programs in main memory at the same time and execute them concurrently utilizing single CPU.
It utilizes multiple CPUs. It utilizes single CPU.
It permits parallel processing. Context switching takes place.
Less time taken to process the jobs. More Time taken to process the jobs.
It facilitates much efficient utilization of devices of the computer system. Less efficient than multiprocessing.
Usually more expensive. Such systems are less expensive.

グローバルインタープリターロック(GIL)の影響を排除

並行アプリケーションを使用している間、Pythonには* GIL(Global Interpreter Lock)*と呼ばれる制限があります。 GILでは、CPUの複数のコアを使用することはできません。したがって、Pythonには真のスレッドは存在しないと言えます。 GILは相互排他ロックであり、物事をスレッドセーフにします。 言い換えれば、GILは、複数のスレッドがPythonコードを並行して実行するのを防ぐと言えます。 ロックは一度に1つのスレッドのみが保持でき、スレッドを実行する場合は、最初にロックを取得する必要があります。

マルチプロセッシングを使用すると、GILによる制限を効果的に回避できます-

  • マルチプロセッシングを使用することにより、複数のプロセスの機能を利用しているため、GILの複数のインスタンスを利用しています。
  • このため、プログラム内で一度に1つのスレッドのバイトコードを実行するという制限はありません。

Pythonでのプロセスの開始

次の3つの方法は、マルチプロセッシングモジュール内でPythonでプロセスを開始するために使用することができます-

  • Fork
  • スポーン
  • フォークサーバー

Forkでプロセスを作成する

Forkコマンドは、UNIXにある標準コマンドです。 子プロセスと呼ばれる新しいプロセスを作成するために使用されます。 この子プロセスは、親プロセスと呼ばれるプロセスと同時に実行されます。 これらの子プロセスも親プロセスと同一であり、親が使用できるすべてのリソースを継承します。 次のシステムコールは、フォークでプロセスを作成するときに使用されます-

  • * fork()*-一般的にカーネルに実装されているシステムコールです。 process.p>のコピーを作成するために使用されます
  • * getpid()*-このシステムコールは呼び出しプロセスのプロセスID(PID)を返します。

次のPythonスクリプトの例は、新しい子プロセスを作成し、子プロセスと親プロセスのPIDを取得する方法を理解するのに役立ちます-

import os

def child():
   n = os.fork()

   if n > 0:
      print("PID of Parent process is : ", os.getpid())

   else:
      print("PID of Child process is : ", os.getpid())
child()

出力

PID of Parent process is : 25989
PID of Child process is : 25990

Spawnを使用してプロセスを作成する

スポーンとは、何か新しいことを始めることを意味します。 したがって、プロセスの生成とは、親プロセスによる新しいプロセスの作成を意味します。 親プロセスは非同期に実行を継続するか、子プロセスが実行を終了するまで待機します。 プロセスを生成するには、次の手順に従います-

  • マルチプロセッシングモジュールのインポート。
  • オブジェクトプロセスの作成。
  • * start()*メソッドを呼び出してプロセスアクティビティを開始します。
  • プロセスが作業を終了するまで待機し、* join()*メソッドを呼び出して終了します。

次のPythonスクリプトの例は、3つのプロセスの生成に役立ちます

import multiprocessing

def spawn_process(i):
   print ('This is process: %s' %i)
   return

if __name__ == '__main__':
   Process_jobs = []
   for i in range(3):
   p = multiprocessing.Process(target = spawn_process, args = (i,))
      Process_jobs.append(p)
   p.start()
   p.join()

出力

This is process: 0
This is process: 1
This is process: 2

Forkserverでプロセスを作成する

Forkserverメカニズムは、Unixパイプを介したファイル記述子の受け渡しをサポートする選択されたUNIXプラットフォームでのみ使用可能です。 Forkserverメカニズムの動作を理解するには、次の点を考慮してください-

  • サーバーは、新しいプロセスを開始するためにForkserverメカニズムを使用してインスタンス化されます。
  • サーバーはコマンドを受信し、新しいプロセスを作成するためのすべての要求を処理します。
  • 新しいプロセスを作成するために、PythonプログラムはForkserverにリクエストを送信し、プロセスを作成します。 *最後に、この新しく作成されたプロセスをプログラムで使用できます。

Pythonのデーモンプロセス

Python* multiprocessing *モジュールを使用すると、デーモンオプションを使用してデーモンプロセスを実行できます。 デーモンプロセスまたはバックグラウンドで実行されているプロセスは、デーモンスレッドと同様の概念に従います。 バックグラウンドでプロセスを実行するには、デーモンフラグをtrueに設定する必要があります。 デーモンプロセスは、メインプロセスが実行されている限り実行を継続し、実行の終了後またはメインプログラムが強制終了されたときに終了します。

ここでは、デーモンスレッドで使用されているのと同じ例を使用しています。 唯一の違いは、モジュールを multithreading から multiprocessing に変更し、デーモンフラグをtrueに設定することです。 ただし、以下に示すように出力に変更があります-

import multiprocessing
import time

def nondaemonProcess():
   print("starting my Process")
   time.sleep(8)
   print("ending my Process")
def daemonProcess():
   while True:
   print("Hello")
   time.sleep(2)
if __name__ == '__main__':
   nondaemonProcess = multiprocessing.Process(target = nondaemonProcess)
   daemonProcess = multiprocessing.Process(target = daemonProcess)
   daemonProcess.daemon = True
   nondaemonProcess.daemon = False
   daemonProcess.start()
   nondaemonProcess.start()

出力

starting my Process
ending my Process

デーモンなしモードのプロセスには出力があるため、出力はデーモンスレッドによって生成されたものとは異なります。 したがって、実行中のプロセスの永続性を回避するために、メインプログラムの終了後にデーモンプロセスが自動的に終了します。

Pythonでのプロセスの終了

  • terminate()*メソッドを使用すると、プロセスを即座に強制終了または終了できます。 このメソッドを使用して、関数の助けを借りて作成された子プロセスを、実行を完了する直前に終了します。

import multiprocessing
import time
def Child_process():
   print ('Starting function')
   time.sleep(5)
   print ('Finished function')
P = multiprocessing.Process(target = Child_process)
P.start()
print("My Process has terminated, terminating main thread")
print("Terminating Child Process")
P.terminate()
print("Child Process successfully terminated")

出力

My Process has terminated, terminating main thread
Terminating Child Process
Child Process successfully terminated

出力は、Child_process()関数を使用して作成された子プロセスの実行前にプログラムが終了することを示しています。 これは、子プロセスが正常に終了したことを意味します。

Pythonで現在のプロセスを特定する

オペレーティングシステムのすべてのプロセスには、PIDと呼ばれるプロセスIDがあります。 Pythonでは、次のコマンドの助けを借りて現在のプロセスのPIDを見つけることができます-

import multiprocessing
print(multiprocessing.current_process().pid)

Pythonスクリプトの次の例は、メインプロセスのPIDと子プロセスのPIDを見つけるのに役立ちます-

import multiprocessing
import time
def Child_process():
   print("PID of Child Process is: {}".format(multiprocessing.current_process().pid))
print("PID of Main process is: {}".format(multiprocessing.current_process().pid))
P = multiprocessing.Process(target=Child_process)
P.start()
P.join()

出力

PID of Main process is: 9401
PID of Child Process is: 9402

サブクラスでプロセスを使用する

*threading.Thread* クラスをサブクラス化することでスレッドを作成できます。 さらに、 *multiprocessing.Process* クラスをサブクラス化してプロセスを作成することもできます。 サブクラスでプロセスを使用するには、次の点を考慮する必要があります-
  • Process クラスの新しいサブクラスを定義する必要があります。
  • * init(self [、args])*クラスをオーバーライドする必要があります。
  • run を実装するには、* run(self [、args])*メソッドのをオーバーライドする必要があります
  • start()メソッドを呼び出してプロセスを開始する必要があります。

import multiprocessing
class MyProcess(multiprocessing.Process):
   def run(self):
   print ('called run method in process: %s' %self.name)
   return
if __name__ == '__main__':
   jobs = []
   for i in range(5):
   P = MyProcess()
   jobs.append(P)
   P.start()
   P.join()

出力

called run method in process: MyProcess-1
called run method in process: MyProcess-2
called run method in process: MyProcess-3
called run method in process: MyProcess-4
called run method in process: MyProcess-5

Pythonマルチプロセッシングモジュール–プールクラス

Pythonアプリケーションで単純な並列*処理*タスクについて話す場合、マルチプロセッシングモジュールはPoolクラスを提供します。 Pool クラスの次のメソッドを使用して、メインプログラム内の子プロセスの数を増やすことができます。

apply()メソッド

このメソッドは、。ThreadPoolExecutor。。submit()メソッドに似ています。結果の準備ができるまでブロックします。

apply_async()メソッド

タスクの並列実行が必要な場合、apply_async()メソッドを使用してタスクをプールに送信する必要があります。 これは、すべての子プロセスが実行されるまでメインスレッドをロックしない非同期操作です。

map()メソッド

  • apply()メソッドと同様に、結果の準備ができるまでブロックします。 これは、反復可能なデータをいくつかのチャンクに分割し、個別のタスクとしてプロセスプールに送信する組み込みの map()*関数と同等です。

map_async()メソッド

  • apply_async() apply()メソッドに対するものであるため、これは map()*メソッドのバリアントです。 結果オブジェクトを返します。 結果の準備が整うと、呼び出し可能オブジェクトが適用されます。 呼び出し可能オブジェクトはすぐに完了する必要があります。そうしないと、結果を処理するスレッドがブロックされます。

次の例は、並列実行を実行するプロセスプールの実装に役立ちます。 multiprocessing.Pool メソッドを介して* square()関数を適用することにより、数の2乗の簡単な計算が実行されました。 入力は0〜4の整数のリストであるため、 pool.map()を使用して5を送信しました。 結果は *p_outputs に保存され、出力されます。

def square(n):
   result = n*n
   return result
if __name__ == '__main__':
   inputs = list(range(5))
   p = multiprocessing.Pool(processes = 4)
   p_outputs = pool.map(function_square, inputs)
   p.close()
   p.join()
   print ('Pool :', p_outputs)

出力

Pool : [0, 1, 4, 9, 16]

プロセスの相互通信

プロセス相互通信とは、プロセス間でデータを交換することです。 並列アプリケーションを開発するには、プロセス間でデータを交換する必要があります。 次の図は、複数のサブプロセス間の同期のためのさまざまな通信メカニズムを示しています-

相互通信

さまざまな通信メカニズム

このセクションでは、さまざまな通信メカニズムについて学習します。 メカニズムは以下のとおりです-

キュー

キューはマルチプロセスプログラムで使用できます。 multiprocessing モジュールのQueueクラスは Queue.Queue クラスに似ています。 したがって、同じAPIを使用できます。 Multiprocessing .Queueは、プロセス間の通信のスレッドおよびプロセスセーフFIFO(先入れ先出し)メカニズムを提供します。

以下は、マルチプロセッシングのQueueクラスの概念を理解するために、マルチプロセッシングに関するPython公式ドキュメントから取られた簡単な例です。

from multiprocessing import Process, Queue
import queue
import random
def f(q):
   q.put([42, None, 'hello'])
def main():
   q = Queue()
   p = Process(target = f, args = (q,))
   p.start()
   print (q.get())
if __name__ == '__main__':
   main()

出力

[42, None, 'hello']

パイプ

これは、マルチプロセスプログラムのプロセス間で通信するために使用されるデータ構造です。 Pipe()関数は、デフォルトではduplex(two way)であるパイプで接続された接続オブジェクトのペアを返します。 それは次の方法で動作します-

  • パイプの両端を表す接続オブジェクトのペアを返します。
  • すべてのオブジェクトには、プロセス間で通信するための2つのメソッド(* send()および recv()*)があります。

以下は、マルチプロセッシングの* Pipe()*関数の概念を理解するために、マルチプロセッシングに関するPython公式ドキュメントから取られた簡単な例です。

from multiprocessing import Process, Pipe

def f(conn):
   conn.send([42, None, 'hello'])
   conn.close()

if __name__ == '__main__':
   parent_conn, child_conn = Pipe()
   p = Process(target = f, args = (child_conn,))
   p.start()
   print (parent_conn.recv())
   p.join()

出力

[42, None, 'hello']

部長

Managerは、すべてのユーザー間で共有情報を調整する方法を提供するマルチプロセッシングモジュールのクラスです。 マネージャオブジェクトは、共有オブジェクトを管理し、他のプロセスがそれらを操作できるようにするサーバープロセスを制御します。 つまり、マネージャーは、異なるプロセス間で共有できるデータを作成する方法を提供します。 以下は、マネージャオブジェクトのさまざまなプロパティです-

  • managerの主なプロパティは、共有オブジェクトを管理するサーバープロセスを制御することです。 *別の重要なプロパティは、プロセスが変更したときにすべての共有オブジェクトを更新することです。

サーバープロセスでリストレコードを作成し、そのリストに新しいレコードを追加するためにマネージャーオブジェクトを使用する例を次に示します。

import multiprocessing

def print_records(records):
   for record in records:
      print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))

def insert_record(record, records):
   records.append(record)
      print("A New record is added\n")

if __name__ == '__main__':
   with multiprocessing.Manager() as manager:

      records = manager.list([('Computers', 1), ('Histoty', 5), ('Hindi',9)])
      new_record = ('English', 3)

      p1 = multiprocessing.Process(target = insert_record, args = (new_record, records))
      p2 = multiprocessing.Process(target = print_records, args = (records,))
      p1.start()
      p1.join()
      p2.start()
      p2.join()

出力

A New record is added

Name: Computers
Score: 1

Name: Histoty
Score: 5

Name: Hindi
Score: 9

Name: English
Score: 3

Managerの名前空間の概念

Manager Classには名前空間の概念があります。これは、複数のプロセスで複数の属性を共有するための簡単な方法です。 名前空間には、呼び出すことができるパブリックメソッドはありませんが、書き込み可能な属性があります。

次のPythonスクリプトの例は、メインプロセスと子プロセスでデータを共有するために名前空間を利用するのに役立ちます-

import multiprocessing

def Mng_NaSp(using_ns):

   using_ns.x +=5
   using_ns.y* = 10

if __name__ == '__main__':
   manager = multiprocessing.Manager()
   using_ns = manager.Namespace()
   using_ns.x = 1
   using_ns.y = 1

   print ('before', using_ns)
   p = multiprocessing.Process(target = Mng_NaSp, args = (using_ns,))
   p.start()
   p.join()
   print ('after', using_ns)

出力

before Namespace(x = 1, y = 1)
after Namespace(x = 6, y = 10)

Ctypes配列と値

マルチプロセッシングモジュールは、共有メモリマップにデータを保存するためのArrayおよびValueオブジェクトを提供します。 Array は共有メモリから割り当てられたctypes配列であり、 Value は共有メモリから割り当てられたctypesオブジェクトです。

あるために、マルチプロセスからプロセス、値、配列をインポートします。

次のPythonスクリプトは、プロセス間でデータを共有するためにCtypes配列と値を利用するためのpython docsからの例です。

def f(n, a):
   n.value = 3.1415927
   for i in range(len(a)):
   a[i] = -a[i]

if __name__ == '__main__':
   num = Value('d', 0.0)
   arr = Array('i', range(10))

   p = Process(target = f, args = (num, arr))
   p.start()
   p.join()
   print (num.value)
   print (arr[:])

出力

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

順次プロセスの通信(CSP​​)

CSPは、システムと並行モデルを特徴とする他のシステムとの相互作用を示すために使用されます。 CSPは、メッセージの受け渡しを介して並行プログラムまたはプログラムを記述するためのフレームワークであるため、並行性を記述するのに効果的です。

Pythonライブラリ– PyCSP

CSPにあるコアプリミティブを実装するために、PythonにはPyCSPというライブラリがあります。 非常に簡単に理解できるように、実装を非常に短く読みやすい状態に保ちます。 以下は、PyCSPの基本的なプロセスネットワークです-

PyCSP

上記のPyCSPプロセスネットワークには、Process1とProcess 2の2つのプロセスがあります。 これらのプロセスは、2つのチャネル(チャネル1とチャネル2)を介してメッセージを渡すことにより通信します。

PyCSPのインストール

次のコマンドの助けを借りて、PythonライブラリPyCSPをインストールできます-

pip install PyCSP

次のPythonスクリプトは、2つのプロセスを並行して実行する簡単な例です。 それはPyCSP python libabaryの助けを借りて行われます-

from pycsp.parallel import *
import time
@process
def P1():
   time.sleep(1)
   print('P1 exiting')
@process
def P2():
   time.sleep(1)
   print('P2 exiting')
def main():
   Parallel(P1(), P2())
   print('Terminating')
if __name__ == '__main__':
   main()

上記のスクリプトでは、 P1P2 という2つの関数が作成され、それらをプロセスに変換するために @ process で装飾されています。

出力

P2 exiting
P1 exiting
Terminating

Concurrency-in-python-eventdriven-programming

リアクティブプログラミング

リアクティブプログラミングは、データフローと変更の伝播を扱うプログラミングパラダイムです。 これは、1つのコンポーネントからデータフローが発行されると、リアクティブプログラミングライブラリによって他のコンポーネントに変更が伝播されることを意味します。 変更の伝播は、最終受信者に到達するまで続きます。 イベント駆動型プログラミングと事後対応型プログラミングの違いは、イベント駆動型プログラミングはイベントを中心に展開し、反応型プログラミングはデータを中心に展開することです。

リアクティブプログラミング用のReactiveXまたはRX

ReactiveXまたはRaective Extensionは、リアクティブプログラミングの最も有名な実装です。 ReactiveXの動作は、次の2つのクラスに依存します-

観測可能なクラス

このクラスはデータストリームまたはイベントのソースであり、着信データをパックして、データがあるスレッドから別のスレッドに渡されるようにします。 オブザーバーがサブスクライブするまで、データは提供されません。

オブザーバークラス

このクラスは、 observable によって発行されたデータストリームを消費します。 observableを持つ複数のオブザーバーが存在する可能性があり、各オブザーバーは発行される各データ項目を受け取ります。 オブザーバーはオブザーバブルにサブスクライブすることにより、3種類のイベントを受け取ることができます-

  • on_next()event -データストリームに要素があることを意味します。
  • on_completed()event -それは放出の終わりを意味し、これ以上のアイテムは来ていません。
  • on_error()event -また、エミッションの終了を意味しますが、 observable によってエラーがスローされた場合。

RxPY –リアクティブプログラミング用のPythonモジュール

RxPYは、リアクティブプログラミングに使用できるPythonモジュールです。 モジュールがインストールされていることを確認する必要があります。 次のコマンドを使用して、RxPYモジュールをインストールできます-

pip install RxPY

以下はPythonスクリプトで、 RxPY モジュールとそのクラス Observable および Observe for リアクティブプログラミングを使用しています。 基本的に2つのクラスがあります-

  • * get_strings()*-オブザーバーから文字列を取得するため。
  • * PrintObserver()*-オブザーバーから文字列を印刷するため。 オブザーバークラスの3つのイベントすべてを使用します。 また、subscribe()クラスを使用します。
from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

出力

Received Ram
Received Mohan
Received Shyam
Finished

リアクティブプログラミングのためのPyFunctionalライブラリ

  • PyFunctional *は、リアクティブプログラミングに使用できるもう1つのPythonライブラリです。 Pythonプログラミング言語を使用して機能的なプログラムを作成できます。 連鎖機能演算子を使用してデータパイプラインを作成できるため、便利です。

RxPYとPyFunctionalの違い

両方のライブラリはリアクティブプログラミングに使用され、同様の方法でストリームを処理しますが、両者の主な違いはデータの処理に依存します。 RxPY はシステム内のデータとイベントを処理し、 PyFunctional は関数型プログラミングパラダイムを使用したデータの変換に焦点を当てています。

PyFunctionalモジュールのインストール

このモジュールを使用する前にインストールする必要があります。 それは次のようにpipコマンドの助けを借りてインストールすることができます-

pip install pyfunctional

次の例では、PyFunctional モジュールとその *seq クラスを使用します。これらのクラスは、反復および操作できるストリームオブジェクトとして機能します。 このプログラムでは、すべての値を2倍にするlamda関数を使用してシーケンスをマッピングし、xが4より大きい値をフィルター処理し、最後に残りのすべての値の合計にシーケンスを縮小します。

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

出力

Result: 6