Concurrency-in-python-threads-intercommunication

提供:Dev Guides
移動先:案内検索

スレッド相互通信

現実には、人々のチームが共通のタスクに取り組んでいる場合、タスクを適切に終了するためにコミュニケーションが必要です。 同じ類推はスレッドにも適用できます。 プログラミングでは、プロセッサの理想的な時間を短縮するために、複数のスレッドを作成し、すべてのスレッドに異なるサブタスクを割り当てます。 したがって、通信機能が必要であり、相互に対話して、同期された方法でジョブを終了する必要があります。

スレッド相互通信に関連する次の重要な点を考慮してください-

  • パフォーマンス向上なし-スレッドとプロセス間の適切な通信を達成できない場合、同時実行性と並列処理によるパフォーマンス向上は役に立ちません。
  • タスクを適切に実行-スレッド間の適切な相互通信メカニズムがないと、割り当てられたタスクを適切に完了できません。
  • プロセス間通信よりも効率的-プロセス内のすべてのスレッドが同じアドレス空間を共有し、共有メモリを使用する必要がないため、スレッド間通信はプロセス間通信よりも効率的で使いやすいです。

スレッドセーフな通信のためのPythonデータ構造

マルチスレッドコードでは、あるスレッドから別のスレッドに情報を渡すという問題が発生します。 標準の通信プリミティブはこの問題を解決しません。 したがって、スレッド間でオブジェクトを共有して通信をスレッドセーフにするために、独自の複合オブジェクトを実装する必要があります。 以下は、いくつかのデータ構造であり、それらにいくつかの変更を加えた後にスレッドセーフな通信を提供します-

Sets

スレッドセーフな方法でsetデータ構造を使用するには、setクラスを拡張して独自のロックメカニズムを実装する必要があります。

ここにクラスを拡張するPythonの例があります-

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()

   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

上記の例では、 extend_class という名前のクラスオブジェクトが定義されており、Python set class からさらに継承されています。 ロックオブジェクトは、このクラスのコンストラクター内で作成されます。 現在、2つの関数-* add() delete()があります。 これらの関数は定義されており、スレッドセーフです。 どちらも *super クラスの機能に依存していますが、1つの重要な例外があります。

デコレータ

これは、スレッドセーフ通信のもう1つの重要な方法であり、デコレータを使用します。

デコレータ&mminus;の使用方法を示すPythonの例を考えてみましょう。

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

上記の例では、lock_decoratorという名前のデコレータメソッドが定義されており、Pythonメソッドクラスからさらに継承されています。 次に、このクラスのコンストラクター内にロックオブジェクトが作成されます。 現在、add()とdelete()の2つの関数があります。 これらの関数は定義されており、スレッドセーフです。 どちらもスーパークラスの機能に依存していますが、1つの重要な例外があります。

リスト

リストのデータ構造は、スレッドセーフで、迅速で、一時的なメモリ内ストレージの簡単な構造です。 Cpythonでは、GILはそれらへの同時アクセスから保護します。 リストがスレッドセーフであることを知るようになりましたが、リストにあるデータについてはどうでしょう。 実際、リストのデータは保護されていません。 たとえば、* L.append(x)は、別のスレッドが同じことをしようとしている場合に期待される結果を返すことを保証しません。 これは、 append()*はアトミック操作でスレッドセーフですが、他のスレッドがリストのデータを並行して変更しようとしているため、出力に競合状態の副作用が見られるためです。

この種の問題を解決し、データを安全に変更するには、適切なロックメカニズムを実装する必要があります。これにより、複数のスレッドが競合状態になる可能性がさらになくなります。 適切なロックメカニズムを実装するために、前の例で行ったようにクラスを拡張できます。

リスト上の他のいくつかのアトミック操作は次のとおりです-

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

ここに-

  • L、L1、L2はすべてリストです
  • D、D1、D2は辞書です
  • x、yはオブジェクト
  • i、jはintです

キュー

リストのデータが保護されていない場合、結果に直面する必要があります。 競合状態の間違ったデータ項目を取得または削除する場合があります。 そのため、キューデータ構造を使用することをお勧めします。 キューの実際の例は、車線が最初に出て最初に出る単一車線の片道です。 チケットウィンドウとバス停でのキューのより現実的な例が見られます。

キュー

キューはデフォルトでスレッドセーフなデータ構造であり、複雑なロックメカニズムの実装について心配する必要はありません。 Pythonは、アプリケーションでさまざまなタイプのキューを使用するモジュールを提供します。

キューの種類

このセクションでは、さまざまなタイプのキューについて獲得します。 Pythonは、 <queue> モジュールから使用するキューの3つのオプションを提供します-

  • 通常のキュー(FIFO、先入れ先出し)
  • LIFO、後入れ先出し
  • 優先度

後続のセクションでさまざまなキューについて学習します。

通常のキュー(FIFO、先入れ先出し)

Pythonが提供する最も一般的に使用されるキュー実装です。 このキューイングメカニズムでは、誰でも最初に来る人が最初にサービスを取得します。 FIFOは通常のキューとも呼ばれます。 FIFOキューは次のように表すことができます-

FIFO

FIFOキューのPython実装

Pythonでは、FIFOキューはシングルスレッドとマルチスレッドで実装できます。

シングルスレッドのFIFOキュー

単一スレッドでFIFOキューを実装するために、 Queue クラスは基本的な先入れ先出しコンテナーを実装します。 要素は、* put()を使用してシーケンスの一方の「端」に追加され、 get()*を使用してもう一方の端から削除されます。

以下は、シングルスレッドでFIFOキューを実装するためのPythonプログラムです-

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

出力

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

出力は、上記のプログラムが単一のスレッドを使用して、エレメントが挿入されたのと同じ順序でキューから削除されることを示していることを示しています。

複数のスレッドを持つFIFOキュー

複数のスレッドでFIFOを実装するには、キューモジュールから拡張されたmyqueue()関数を定義する必要があります。 get()およびput()メソッドの動作は、シングルスレッドでFIFOキューを実装する際の上記と同じです。 次に、マルチスレッド化するには、スレッドを宣言してインスタンス化する必要があります。 これらのスレッドは、FIFO方式でキューを消費します。

以下は、複数のスレッドでFIFOキューを実装するためのPythonプログラムです

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

LIFO、先入れ先出しキュー

このキューは、FIFO(先入れ先出し)キューとはまったく逆のアナロジーを使用します。 このキューイングメカニズムでは、最後に来た人が最初にサービスを取得します。 これは、スタックデータ構造の実装に似ています。 LIFOキューは、人工知能のアルゴリズムのような深さ優先の検索を実装する際に役立ちます。

LIFOキューのPython実装

Pythonでは、LIFOキューはマルチスレッドだけでなくシングルスレッドでも実装できます。

シングルスレッドのLIFOキュー

単一スレッドでLIFOキューを実装するため、 Queue クラスは、構造 Queue .LifoQueueを使用して、基本的な後入れ先出しコンテナーを実装します。 これで、* put()を呼び出すと、コンテナのヘッドに要素が追加され、 get()*を使用してもヘッドから削除されます。

以下は、シングルスレッドでLIFOキューを実装するためのPythonプログラムです-

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

出力は、上記のプログラムが単一のスレッドを使用して、要素が挿入された順序とは逆の順序でキューから削除されることを示していることを示しています。

複数のスレッドを持つLIFOキュー

実装は、複数のスレッドでFIFOキューの実装を行ったのと同様です。 唯一の違いは、構造 Queue.LifoQueue を使用して、基本的な後入れ先出しコンテナーを実装する Queue クラスを使用する必要があることです。

以下は、複数のスレッドでLIFOキューを実装するためのPythonプログラムです-

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

優先キュー

FIFOおよびLIFOキューでは、アイテムの順序は挿入の順序に関連しています。 ただし、挿入の順序よりも優先順位が重要な場合が多くあります。 実世界の例を考えてみましょう。 空港のセキュリティがさまざまなカテゴリの人々をチェックしているとします。 VVIPの人々、航空会社のスタッフ、税関職員、カテゴリは、一般人のように到着に基づいてチェックされるのではなく、優先的にチェックされる場合があります。

優先度キューで考慮する必要があるもう1つの重要な側面は、タスクスケジューラの開発方法です。 一般的な設計の1つは、キュー内で優先度に基づいて最も多くのエージェントタスクを処理することです。 このデータ構造を使用して、優先度値に基づいてキューからアイテムを取得できます。

優先度キューのPython実装

Pythonでは、優先度キューはマルチスレッドだけでなくシングルスレッドでも実装できます。

シングルスレッドの優先キュー

単一スレッドで優先度キューを実装するために、 Queue クラスは、構造 Queue .PriorityQueueを使用して優先度コンテナにタスクを実装します。 現在、* put()を呼び出すと、最低値が最高の優先度を持つ値が要素に追加されるため、 get()*を使用して最初に取得されます。

シングルスレッドで優先度キューを実装するための次のPythonプログラムを検討してください-

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

出力

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

上記の出力では、優先度に基づいてアイテムがキューに保存されていることがわかります。値が小さいほど優先度が高くなります。

マルチスレッドの優先キュー

実装は、複数のスレッドを持つFIFOおよびLIFOキューの実装に似ています。 唯一の違いは、構造 Queue.PriorityQueue を使用して優先度を初期化するために Queue クラスを使用する必要があることです。 別の違いは、キューの生成方法にあります。 以下の例では、2つの同一のデータセットで生成されます。

次のPythonプログラムは、複数のスレッドで優先度キューの実装に役立ちます-

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue