マルチプロセッシング —プロセスベースの並列処理
ソースコード: :source: `Lib / multiprocessing /`
序章
マルチプロセッシングは、スレッドモジュールと同様のAPIを使用したスポーンプロセスをサポートするパッケージです。 マルチプロセッシングパッケージは、ローカルとリモートの両方の同時実行性を提供し、スレッドの代わりにサブプロセスを使用することで、グローバルインタープリターロックを効果的に回避します。 このため、マルチプロセッシングモジュールを使用すると、プログラマーは特定のマシンで複数のプロセッサーを完全に活用できます。 UnixとWindowsの両方で動作します。
マルチプロセッシングモジュールでは、スレッドモジュールにアナログがないAPIも導入されています。 この代表的な例は、 Pool オブジェクトです。これは、関数の実行を複数の入力値に並列化し、入力データをプロセスに分散する便利な手段を提供します(データ並列処理)。 次の例は、子プロセスがそのモジュールを正常にインポートできるように、モジュールでそのような関数を定義する一般的な方法を示しています。 Pool を使用したデータ並列処理のこの基本的な例は、
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
標準出力に出力します
[1, 4, 9]
プロセスクラス
マルチプロセッシングでは、 Process オブジェクトを作成し、その start()メソッドを呼び出すことによってプロセスが生成されます。 Process は、 threading.Thread のAPIに従います。 マルチプロセスプログラムの簡単な例は次のとおりです。
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
関連する個々のプロセスIDを表示するために、以下に拡張例を示します。
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
if __name__ == '__main__'
の部分が必要な理由の説明については、プログラミングガイドラインを参照してください。
コンテキストと開始メソッド
プラットフォームに応じて、マルチプロセッシングはプロセスを開始する3つの方法をサポートします。 これらの開始メソッドは
- スポーン
親プロセスは、新しいPythonインタープリタープロセスを開始します。 子プロセスは、プロセスオブジェクトの run()メソッドを実行するために必要なリソースのみを継承します。 特に、親プロセスからの不要なファイル記述子とハンドルは継承されません。 この方法を使用してプロセスを開始すると、 fork または forkserver を使用する場合に比べてかなり遅くなります。
UnixとWindowsで利用できます。 WindowsとmacOSのデフォルト。
- フォーク
親プロセスは os.fork()を使用してPythonインタープリターをフォークします。 子プロセスは、開始時に、親プロセスと実質的に同じです。 親のすべてのリソースは、子プロセスによって継承されます。 マルチスレッドプロセスを安全にフォークすることには問題があることに注意してください。
Unixでのみ使用できます。 Unixのデフォルト。
- フォークサーバー
プログラムが起動し、 forkserver startメソッドを選択すると、サーバープロセスが開始されます。 それ以降、新しいプロセスが必要になるたびに、親プロセスはサーバーに接続し、新しいプロセスをフォークするように要求します。 フォークサーバープロセスはシングルスレッドであるため、 os.fork()を使用しても安全です。 不要なリソースは継承されません。
Unixパイプを介したファイル記述子の受け渡しをサポートするUnixプラットフォームで使用できます。
バージョン3.8で変更: macOSでは、 spawn startメソッドがデフォルトになりました。 fork startメソッドは、サブプロセスのクラッシュにつながる可能性があるため、安全ではないと見なす必要があります。 :issue: `33725` を参照してください。
バージョン3.4で変更: spawn がすべてのUNIXプラットフォームに追加され、 forkserver が一部のUNIXプラットフォームに追加されました。 子プロセスは、Windowsで親の継承可能なハンドルをすべて継承しなくなりました。
Unixでは、 spawn または forkserver startメソッドを使用すると、リンクされていない名前付きシステムリソース(名前付きセマフォやなど)を追跡するリソーストラッカープロセスも開始されます。プログラムのプロセスによって作成されたSharedMemory オブジェクト)。 すべてのプロセスが終了すると、リソーストラッカーは残りの追跡対象オブジェクトのリンクを解除します。 通常は存在しないはずですが、シグナルによってプロセスが強制終了された場合は、「リークされた」リソースが存在する可能性があります。 (リークされたセマフォも共有メモリセグメントも、次の再起動まで自動的にリンク解除されません。 システムは限られた数の名前付きセマフォのみを許可し、共有メモリセグメントはメインメモリの一部のスペースを占有するため、これは両方のオブジェクトにとって問題があります。)
開始メソッドを選択するには、メインモジュールのif __name__ == '__main__'
句で set_start_method()を使用します。 例えば:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()は、プログラム内で複数回使用しないでください。
または、 get_context()を使用してコンテキストオブジェクトを取得することもできます。 コンテキストオブジェクトはマルチプロセッシングモジュールと同じAPIを持ち、同じプログラムで複数の開始メソッドを使用できるようにします。
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
あるコンテキストに関連するオブジェクトは、別のコンテキストのプロセスと互換性がない場合があることに注意してください。 特に、 fork コンテキストを使用して作成されたロックは、 spawn または forkserver startメソッドを使用して開始されたプロセスに渡すことはできません。
特定のstartメソッドを使用したいライブラリは、ライブラリユーザーの選択に干渉しないように、おそらく get_context()を使用する必要があります。
警告
'spawn'
および'forkserver'
の開始メソッドは、現在、「凍結された」実行可能ファイル(つまり、 PyInstaller や cx_Freeze などのパッケージによって生成されたバイナリ)では使用できません。 Unix。 'fork'
のstartメソッドは機能します。
プロセス間でのオブジェクトの交換
マルチプロセッシングは、プロセス間の2種類の通信チャネルをサポートします。
キュー
Queue クラスは、 queue.Queue のニアクローンです。 例えば:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
キューはスレッドおよびプロセスセーフです。
パイプ
Pipe()関数は、デフォルトでデュプレックス(双方向)であるパイプで接続された接続オブジェクトのペアを返します。 例えば:
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
Pipe()によって返される2つの接続オブジェクトは、パイプの両端を表します。 各接続オブジェクトには、
send()
メソッドとrecv()
メソッドがあります(とりわけ)。 2つのプロセス(またはスレッド)がパイプの同じ端から同時に読み取りまたは書き込みを行おうとすると、パイプ内のデータが破損する可能性があることに注意してください。 もちろん、パイプの異なる端を同時に使用するプロセスによる破損のリスクはありません。
プロセス間の同期
マルチプロセッシングには、スレッドのすべての同期プリミティブに相当するものが含まれています。 たとえば、ロックを使用して、一度に1つのプロセスのみが標準出力に出力されるようにすることができます。
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
異なるプロセスからのロック出力を使用しないと、すべてが混乱する傾向があります。
労働者のプールを使用する
Pool クラスは、ワーカープロセスのプールを表します。 いくつかの異なる方法でタスクをワーカープロセスにオフロードできるようにするメソッドがあります。
例えば:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
プールのメソッドは、プールを作成したプロセスによってのみ使用される必要があることに注意してください。
ノート
このパッケージ内の機能には、__main__
モジュールが子によってインポート可能である必要があります。 これはプログラミングガイドラインでカバーされていますが、ここで指摘する価値があります。 これは、 multiprocessing.pool.Pool の例など、一部の例が対話型インタープリターで機能しないことを意味します。 例えば:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(これを試してみると、実際にはセミランダムにインターリーブされた3つの完全なトレースバックが出力されるため、親プロセスを何らかの方法で停止する必要がある場合があります。)
参照
マルチプロセッシングパッケージは、ほとんどの場合、スレッドモジュールのAPIを複製します。
プロセスと例外
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
プロセスオブジェクトは、別のプロセスで実行されるアクティビティを表します。 Process クラスには、 threading.Thread のすべてのメソッドと同等のものがあります。
コンストラクターは常にキーワード引数を使用して呼び出す必要があります。 group は常に
None
である必要があります。 threading.Thread との互換性のためにのみ存在します。 target は、 run()メソッドによって呼び出される呼び出し可能なオブジェクトです。 デフォルトはNone
で、何も呼び出されないことを意味します。 name はプロセス名です(詳細については、 name を参照してください)。 args は、ターゲット呼び出しの引数タプルです。 kwargs は、ターゲット呼び出しのキーワード引数の辞書です。 指定されている場合、キーワードのみのデーモン引数は、プロセスデーモンフラグをTrue
またはFalse
に設定します。None
(デフォルト)の場合、このフラグは作成プロセスから継承されます。デフォルトでは、 target に引数は渡されません。
サブクラスがコンストラクターをオーバーライドする場合、プロセスに他のことを行う前に、サブクラスが基本クラスコンストラクター(
Process.__init__()
)を呼び出すことを確認する必要があります。バージョン3.3で変更: デーモン引数が追加されました。
- run()
プロセスのアクティビティを表すメソッド。
サブクラスでこのメソッドをオーバーライドできます。 標準の run()メソッドは、 args および kwargs [から取得したシーケンシャル引数とキーワード引数を使用して、オブジェクトのコンストラクターに渡された呼び出し可能オブジェクトをターゲット引数として呼び出します(存在する場合)。 X211X]引数、それぞれ。
- start()
プロセスのアクティビティを開始します。
これは、プロセスオブジェクトごとに最大1回呼び出す必要があります。 オブジェクトの run()メソッドが別のプロセスで呼び出されるように調整します。
- join([timeout])
オプションの引数 timeout が
None
(デフォルト)の場合、メソッドは join()メソッドが呼び出されたプロセスが終了するまでブロックします。 timeout が正の数の場合、最大で timeout 秒をブロックします。 プロセスが終了した場合、またはメソッドがタイムアウトした場合、メソッドはNone
を返すことに注意してください。 プロセスの exitcode をチェックして、プロセスが終了したかどうかを確認します。プロセスは何度でも参加できます。
デッドロックが発生するため、プロセスはそれ自体に参加できません。 プロセスが開始される前にプロセスに参加しようとするとエラーになります。
- name
プロセスの名前。 名前は、識別目的でのみ使用される文字列です。 セマンティクスはありません。 複数のプロセスに同じ名前を付けることができます。
初期名はコンストラクターによって設定されます。 コンストラクターに明示的な名前が指定されていない場合、「Process-N 1 :N 2 :…:N k 」という形式の名前が作成されます。ここで、各N k はその親のN番目の子です。
- is_alive()
プロセスが生きているかどうかを返します。
大まかに言って、プロセスオブジェクトは、 start()メソッドが戻った瞬間から子プロセスが終了するまで存続します。
- daemon
プロセスのデーモンフラグ、ブール値。 これは、 start()を呼び出す前に設定する必要があります。
初期値は作成プロセスから継承されます。
プロセスが終了すると、そのデーモンの子プロセスをすべて終了しようとします。
デーモンプロセスは子プロセスを作成できないことに注意してください。 そうしないと、デーモンプロセスは、親プロセスが終了したときに終了した場合、子プロセスを孤立させたままにします。 さらに、これらは not Unixデーモンまたはサービスであり、非デーモンプロセスが終了した場合に終了する(および参加しない)通常のプロセスです。
threading.Thread APIに加えて、 Process オブジェクトは次の属性とメソッドもサポートします。
- pid
プロセスIDを返します。 プロセスが生成される前は、これは
None
になります。
- exitcode
子供の終了コード。 プロセスがまだ終了していない場合、これは
None
になります。 負の値 -N は、子がシグナル N によって終了したことを示します。
- authkey
プロセスの認証キー(バイト文字列)。
マルチプロセッシングが初期化されると、メインプロセスには os.urandom()を使用してランダムな文字列が割り当てられます。
Process オブジェクトが作成されると、その親プロセスの認証キーを継承しますが、 authkey を別のバイト文字列に設定することで変更できます。
認証キーを参照してください。
- sentinel
プロセスが終了すると「準備完了」になるシステムオブジェクトの数値ハンドル。
multiprocessing.connection.wait()を使用して一度に複数のイベントを待機する場合は、この値を使用できます。 それ以外の場合は、 join()を呼び出す方が簡単です。
Windowsでは、これは
WaitForSingleObject
およびWaitForMultipleObjects
ファミリーのAPI呼び出しで使用できるOSハンドルです。 Unixでは、これは select モジュールのプリミティブで使用できるファイル記述子です。バージョン3.3の新機能。
- terminate()
プロセスを終了します。 Unixでは、これは
SIGTERM
信号を使用して行われます。 WindowsではTerminateProcess()
が使用されます。 なお、exitハンドラやfinally句などは実行されません。プロセスの子孫プロセスは終了せず終了しないことに注意してください-それらは単に孤立します。
警告
関連するプロセスがパイプまたはキューを使用しているときにこのメソッドを使用すると、パイプまたはキューが破損する可能性があり、他のプロセスで使用できなくなる可能性があります。 同様に、プロセスがロックまたはセマフォなどを取得した場合。 その後、それを終了すると、他のプロセスがデッドロックする可能性があります。
- kill()
terminate()と同じですが、Unixでは
SIGKILL
信号を使用します。バージョン3.7の新機能。
- close()
Process オブジェクトを閉じて、それに関連付けられているすべてのリソースを解放します。 ValueError は、基になるプロセスがまだ実行中の場合に発生します。 close()が正常に戻ると、 Process オブジェクトの他のほとんどのメソッドと属性で ValueError が発生します。
バージョン3.7の新機能。
start()、 join()、 is_alive()、 terminate()、 exitcode に注意してください。メソッドは、プロセスオブジェクトを作成したプロセスによってのみ呼び出される必要があります。
Process のいくつかのメソッドの使用例:
>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError
- すべてのマルチプロセッシング例外の基本クラス。
- exception multiprocessing.BufferTooShort
指定されたバッファオブジェクトがメッセージを読み取るには小さすぎる場合に
Connection.recv_bytes_into()
によって発生する例外。e
が BufferTooShort のインスタンスである場合、e.args[0]
はメッセージをバイト文字列として提供します。
- exception multiprocessing.AuthenticationError
- 認証エラーが発生したときに発生します。
- exception multiprocessing.TimeoutError
- タイムアウトの期限が切れたときにタイムアウトのあるメソッドによって発生します。
パイプとキュー
複数のプロセスを使用する場合、通常、プロセス間の通信にメッセージパッシングを使用し、ロックなどの同期プリミティブを使用する必要がなくなります。
メッセージを渡すために、 Pipe()(2つのプロセス間の接続用)またはキュー(複数のプロデューサーとコンシューマーを許可する)を使用できます。
Queue 、 SimpleQueue 、および JoinableQueue タイプは、 queue.Queueをモデルにしたマルチプロデューサー、マルチコンシューマー FIFO キューです。標準ライブラリのクラス。 Queue にはPython2.5の queue.Queue クラスに導入された task_done()メソッドと join()メソッドがないという点で異なります。
JoinableQueue を使用する場合は、キューから削除されたタスクごとに JoinableQueue.task_done()を呼び出す必要があります。そうでない場合は、未完了のタスクの数をカウントするために使用されるセマフォを呼び出す必要があります。最終的にオーバーフローし、例外が発生する可能性があります。
マネージャオブジェクトを使用して共有キューを作成することもできることに注意してください。マネージャを参照してください。
ノート
マルチプロセッシングは、通常の queue.Empty および queue.Full 例外を使用してタイムアウトを通知します。 マルチプロセッシング名前空間では使用できないため、キューからインポートする必要があります。
ノート
オブジェクトがキューに入れられると、オブジェクトはピクルされ、バックグラウンドスレッドは後でピクルされたデータを基になるパイプにフラッシュします。 これには少し驚くべき結果がありますが、実際的な問題は発生しないはずです。本当に気になる場合は、代わりに manager で作成されたキューを使用できます。
- オブジェクトを空のキューに置いた後、キューの empty()メソッドが False を返し、 get_nowait()が[ X197X] queue.Empty 。
- 複数のプロセスがオブジェクトをキューに入れている場合、オブジェクトがもう一方の端で順不同で受信される可能性があります。 ただし、同じプロセスによってキューに入れられたオブジェクトは、常に相互に期待される順序になります。
警告
プロセスが Queue を使用しようとしているときに、 Process.terminate()または os.kill()を使用してプロセスが強制終了された場合、キュー内のデータ破損する可能性があります。 これにより、他のプロセスが後でキューを使用しようとしたときに例外が発生する可能性があります。
警告
上記のように、子プロセスがアイテムをキューに入れている場合( JoinableQueue.cancel_join_thread を使用していない場合)、そのプロセスは、バッファリングされたすべてのアイテムがパイプにフラッシュされるまで終了しません。
つまり、そのプロセスに参加しようとすると、キューに入れられたすべてのアイテムが消費されたことが確実でない限り、デッドロックが発生する可能性があります。 同様に、子プロセスが非デーモンである場合、親プロセスは、すべての非デーモンの子に参加しようとすると、終了時にハングする可能性があります。
マネージャーを使用して作成されたキューにはこの問題がないことに注意してください。 プログラミングガイドラインを参照してください。
プロセス間通信でのキューの使用例については、例を参照してください。
- multiprocessing.Pipe([duplex])
パイプの端を表す Connection オブジェクトのペア
(conn1, conn2)
を返します。デュプレックスが
True
(デフォルト)の場合、パイプは双方向です。 デュプレックスがFalse
の場合、パイプは単方向です。conn1
はメッセージの受信にのみ使用でき、conn2
はメッセージの送信にのみ使用できます。
- class multiprocessing.Queue([maxsize])
パイプといくつかのロック/セマフォを使用して実装されたプロセス共有キューを返します。 プロセスが最初にアイテムをキューに入れると、フィーダースレッドが開始され、オブジェクトがバッファーからパイプに転送されます。
標準ライブラリの queue モジュールからの通常の queue.Empty および queue.Full 例外は、タイムアウトを通知するために発生します。
Queue は、 task_done()と join()を除く、 queue.Queue のすべてのメソッドを実装します。
- qsize()
キューのおおよそのサイズを返します。 マルチスレッド/マルチプロセッシングのセマンティクスのため、この数は信頼できません。
これにより、
sem_getvalue()
が実装されていないMacOSXなどのUnixプラットフォームで NotImplementedError が発生する可能性があることに注意してください。
- empty()
キューが空の場合は
True
を返し、それ以外の場合はFalse
を返します。 マルチスレッド/マルチプロセッシングのセマンティクスのため、これは信頼できません。
- full()
キューがいっぱいの場合は
True
を返し、それ以外の場合はFalse
を返します。 マルチスレッド/マルチプロセッシングのセマンティクスのため、これは信頼できません。
- put(obj[, block[, timeout]])
objをキューに入れます。 オプションの引数 block が
True
(デフォルト)で timeout がNone
(デフォルト)の場合、必要に応じて空きスロットがなくなるまでブロックします。利用可能。 timeout が正の数の場合、最大で timeout 秒をブロックし、その時間内に使用可能な空きスロットがない場合は queue.Full 例外を発生させます。 それ以外の場合(ブロックはFalse
)、空きスロットがすぐに利用できる場合はアイテムをキューに入れ、それ以外の場合は queue.Full 例外(タイムアウト)を発生させますその場合、は無視されます)。バージョン3.8で変更:キューが閉じている場合、 AssertionError ではなく ValueError が発生します。
- put_nowait(obj)
put(obj, False)
と同等です。
- get([block[, timeout]])
キューからアイテムを削除して返します。 オプションの引数 block が
True
(デフォルト)で timeout がNone
(デフォルト)の場合、アイテムが使用可能になるまで必要に応じてブロックします。 timeout が正の数の場合、最大で timeout 秒をブロックし、その時間内に使用可能なアイテムがない場合は queue.Empty 例外を発生させます。 それ以外の場合(ブロックはFalse
)、アイテムがすぐに利用できる場合はアイテムを返し、それ以外の場合は queue.Empty 例外を発生させます(その場合、 timeout は無視されます)。バージョン3.8で変更:キューが閉じている場合、 OSError ではなく ValueError が発生します。
- get_nowait()
get(False)
と同等です。
multiprocessing.Queue には、 queue.Queue にはない追加のメソッドがいくつかあります。 これらのメソッドは通常、ほとんどのコードでは不要です。
- close()
現在のプロセスによってこのキューにこれ以上データが配置されないことを示します。 バックグラウンドスレッドは、バッファリングされたすべてのデータをパイプにフラッシュすると終了します。 これは、キューがガベージコレクションされるときに自動的に呼び出されます。
- join_thread()
バックグラウンドスレッドに参加します。 これは、 close()が呼び出された後にのみ使用できます。 バックグラウンドスレッドが終了するまでブロックし、バッファ内のすべてのデータがパイプにフラッシュされたことを確認します。
デフォルトでは、プロセスがキューの作成者でない場合、終了時にキューのバックグラウンドスレッドへの参加を試みます。 プロセスは cancel_join_thread()を呼び出して、 join_thread()が何もしないようにすることができます。
- cancel_join_thread()
join_thread()がブロックされないようにします。 特に、これにより、プロセスの終了時にバックグラウンドスレッドが自動的に結合されるのを防ぎます。 join_thread()を参照してください。
このメソッドのより適切な名前は
allow_exit_without_flush()
かもしれません。 キューに入れられたデータが失われる可能性があり、ほぼ確実に使用する必要はありません。 エンキューされたデータを基になるパイプにフラッシュするのを待たずに現在のプロセスをすぐに終了する必要があり、失われたデータを気にしない場合にのみ、実際に存在します。
ノート
このクラスの機能には、ホストオペレーティングシステム上で機能する共有セマフォの実装が必要です。 これがないと、このクラスの機能が無効になり、キューをインスタンス化しようとすると、 ImportError が発生します。 詳細については、:issue: `3770` を参照してください。 以下にリストされている特殊なキュータイプのいずれにも同じことが当てはまります。
- class multiprocessing.SimpleQueue
これは単純化されたキュータイプであり、ロックされたパイプに非常に近いものです。
- empty()
キューが空の場合は
True
を返し、それ以外の場合はFalse
を返します。
- get()
キューからアイテムを削除して返します。
- put(item)
item をキューに入れます。
- class multiprocessing.JoinableQueue([maxsize])
JoinableQueue 、 Queue サブクラスは、 task_done()および join()メソッドが追加されたキューです。
- task_done()
以前にキューに入れられたタスクが完了したことを示します。 キューコンシューマーによって使用されます。 タスクのフェッチに使用される get()ごとに、 task_done()への後続の呼び出しは、タスクの処理が完了したことをキューに通知します。
join()が現在ブロックしている場合、すべてのアイテムが処理されると再開されます(つまり、 putされたすべてのアイテムに対して task_done()呼び出しが受信されました()をキューに入れます)。
キューに配置されたアイテムよりも多く呼び出された場合、 ValueError を発生させます。
- join()
キュー内のすべてのアイテムが取得および処理されるまでブロックします。
アイテムがキューに追加されるたびに、未完了のタスクの数が増えます。 消費者が task_done()を呼び出して、アイテムが取得され、そのアイテムに対するすべての作業が完了したことを示すたびに、カウントが減少します。 未完了のタスクの数がゼロになると、 join()のブロックが解除されます。
その他
- multiprocessing.active_children()
現在のプロセスのすべての生きている子のリストを返します。
これを呼び出すと、すでに終了しているプロセスに「参加」するという副作用があります。
- multiprocessing.cpu_count()
システム内のCPUの数を返します。
この数は、現在のプロセスが使用できるCPUの数と同じではありません。 使用可能なCPUの数は
len(os.sched_getaffinity(0))
で取得できます。NotImplementedError が発生する可能性があります。
も参照してください
- multiprocessing.current_process()
現在のプロセスに対応する Process オブジェクトを返します。
- multiprocessing.parent_process()
current_process()の親プロセスに対応する Process オブジェクトを返します。 メインプロセスの場合、
parent_process
はNone
になります。バージョン3.8の新機能。
- multiprocessing.freeze_support()
マルチプロセッシングを使用するプログラムがフリーズしてWindows実行可能ファイルを生成する場合のサポートを追加します。 ( py2exe 、 PyInstaller 、 cx_Freeze でテスト済みです。)
この関数は、メインモジュールの
if __name__ == '__main__'
行の直後に呼び出す必要があります。 例えば:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
freeze_support()
行が省略されている場合、フリーズされた実行可能ファイルを実行しようとすると、 RuntimeError が発生します。freeze_support()
を呼び出しても、Windows以外のオペレーティングシステムで呼び出されても効果はありません。 さらに、モジュールがWindows上のPythonインタープリターによって正常に実行されている場合(プログラムがフリーズされていない場合)、freeze_support()
は効果がありません。
- multiprocessing.get_all_start_methods()
サポートされているstartメソッドのリストを返します。最初のメソッドがデフォルトです。 可能な開始方法は、
'fork'
、'spawn'
、および'forkserver'
です。 Windowsでは、'spawn'
のみが使用可能です。 Unixでは、'fork'
と'spawn'
が常にサポートされており、'fork'
がデフォルトです。バージョン3.4の新機能。
- multiprocessing.get_context(method=None)
マルチプロセッシングモジュールと同じ属性を持つコンテキストオブジェクトを返します。
method が
None
の場合、デフォルトのコンテキストが返されます。 それ以外の場合、メソッドは'fork'
、'spawn'
、'forkserver'
である必要があります。 ValueError は、指定された開始メソッドが使用できない場合に発生します。バージョン3.4の新機能。
- multiprocessing.get_start_method(allow_none=False)
プロセスの開始に使用されるstartメソッドの名前を返します。
startメソッドが修正されておらず、 allow_none がfalseの場合、startメソッドはデフォルトに固定され、名前が返されます。 startメソッドが修正されておらず、 allow_none がtrueの場合、
None
が返されます。戻り値は、
'fork'
、'spawn'
、'forkserver'
、またはNone
です。'fork'
はUnixのデフォルトですが、'spawn'
はWindowsとmacOSのデフォルトです。
バージョン3.8で変更: macOSでは、 spawn startメソッドがデフォルトになりました。 fork startメソッドは、サブプロセスのクラッシュにつながる可能性があるため、安全ではないと見なす必要があります。 :issue: `33725` を参照してください。
バージョン3.4の新機能。
- multiprocessing.set_executable()
子プロセスを開始するときに使用するPythonインタープリターのパスを設定します。 (デフォルトでは、 sys.executable が使用されます)。 埋め込み者はおそらく次のようなことをする必要があります
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
子プロセスを作成する前に。
バージョン3.4で変更:
'spawn'
startメソッドが使用されている場合、Unixでサポートされるようになりました。
- multiprocessing.set_start_method(method)
子プロセスを開始するために使用する方法を設定します。 メソッドは、
'fork'
、'spawn'
、または'forkserver'
にすることができます。これは最大で1回呼び出す必要があり、メインモジュールの
if __name__ == '__main__'
句内で保護する必要があることに注意してください。バージョン3.4の新機能。
接続オブジェクト
接続オブジェクトを使用すると、選択可能なオブジェクトまたは文字列を送受信できます。 それらは、メッセージ指向の接続されたソケットと考えることができます。
接続オブジェクトは通常、パイプを使用して作成されます。リスナーとクライアントも参照してください。
- class multiprocessing.connection.Connection
- send(obj)
recv()を使用して読み取る必要があるオブジェクトを接続のもう一方の端に送信します。
オブジェクトは選択可能である必要があります。 非常に大きなピクルス(OSによって異なりますが、約32 MiB +)は、 ValueError 例外を発生させる可能性があります。
- recv()
send()を使用して、接続のもう一方の端から送信されたオブジェクトを返します。 受け取るものができるまでブロックします。 受信するものがなく、もう一方の端が閉じている場合、 EOFError を発生させます。
- fileno()
接続で使用されるファイル記述子またはハンドルを返します。
- close()
接続を閉じます。
これは、接続がガベージコレクションされるときに自動的に呼び出されます。
- poll([timeout])
読み取ることができるデータがあるかどうかを返します。
timeout が指定されていない場合、すぐに戻ります。 timeout が数値の場合、これはブロックする最大時間を秒単位で指定します。 timeout が
None
の場合、無限タイムアウトが使用されます。multiprocessing.connection.wait()を使用すると、複数の接続オブジェクトを一度にポーリングできることに注意してください。
- send_bytes(buffer[, offset[, size]])
バイトのようなオブジェクトからバイトデータを完全なメッセージとして送信します。
offset が指定されている場合、データは buffer のその位置から読み取られます。 size が指定されている場合、その数のバイトがバッファーから読み取られます。 非常に大きなバッファ(OSによって異なりますが、約32 MiB +)は、 ValueError 例外を発生させる可能性があります。
- recv_bytes([maxlength])
接続のもう一方の端から送信されたバイトデータの完全なメッセージを文字列として返します。 受け取るものができるまでブロックします。 受信するものがなく、もう一方の端が閉じている場合、 EOFError を発生させます。
maxlength が指定されていて、メッセージが maxlength より長い場合、 OSError が発生し、接続が読み取れなくなります。
- recv_bytes_into(buffer[, offset])
接続のもう一方の端から送信されたバイトデータの完全なメッセージをバッファに読み込み、メッセージのバイト数を返します。 受け取るものができるまでブロックします。 受信するものがなく、もう一方の端が閉じている場合、 EOFError を発生させます。
buffer は、書き込み可能なバイトのようなオブジェクトである必要があります。 offset が指定されている場合、メッセージはその位置からバッファーに書き込まれます。 オフセットは、バッファの長さ(バイト単位)未満の負でない整数である必要があります。
バッファが短すぎる場合、
BufferTooShort
例外が発生し、完全なメッセージはe.args[0]
として利用できます。ここで、e
は例外インスタンスです。
バージョン3.3での変更:接続オブジェクト自体は、 Connection.send()および Connection.recv()を使用してプロセス間で転送できるようになりました。
バージョン3.3の新機能:接続オブジェクトがコンテキスト管理プロトコルをサポートするようになりました– コンテキストマネージャータイプを参照してください。 __ enter __()は接続オブジェクトを返し、 __ exit __()は close()を呼び出します。
例えば:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()メソッドは、受信したデータの選択を自動的に解除します。これは、メッセージを送信したプロセスを信頼できない限り、セキュリティリスクになる可能性があります。
したがって、接続オブジェクトがPipe()
を使用して生成された場合を除き、何らかの認証を実行した後は、 recv()および send()メソッドのみを使用する必要があります。 認証キーを参照してください。
警告
プロセスがパイプの読み取りまたは書き込みを試みているときにプロセスが強制終了されると、メッセージの境界がどこにあるかを確認できなくなる可能性があるため、パイプ内のデータが破損する可能性があります。
同期プリミティブ
一般に、同期プリミティブは、マルチスレッドプログラムの場合ほどマルチプロセスプログラムでは必要ありません。 スレッドモジュールのドキュメントを参照してください。
マネージャーオブジェクトを使用して同期プリミティブを作成することもできることに注意してください。 Managers を参照してください。
- class multiprocessing.Barrier(parties[, action[, timeout]])
バリアオブジェクト: threading.Barrier のクローン。
バージョン3.3の新機能。
- class multiprocessing.BoundedSemaphore([value])
制限付きセマフォオブジェクト: threading.BoundedSemaphore の類似物。
その類似物との唯一の違いが存在します。その
acquire
メソッドの最初の引数は、 Lock.acquire()と一致するように、 block という名前です。ノート
Mac OS Xでは、
sem_getvalue()
がそのプラットフォームに実装されていないため、これはセマフォと区別できません。
- class multiprocessing.Condition([lock])
条件変数: threading.Condition のエイリアス。
lock が指定されている場合は、マルチプロセッシングの Lock または RLock オブジェクトである必要があります。
バージョン3.3で変更: wait_for()メソッドが追加されました。
- class multiprocessing.Event
- threading.Event のクローン。
- class multiprocessing.Lock
非再帰的ロックオブジェクト: threading.Lock の類似物。 プロセスまたはスレッドがロックを取得すると、それ以降、プロセスまたはスレッドからロックを取得しようとすると、ロックが解放されるまでブロックされます。 どのプロセスまたはスレッドでもそれを解放できます。 スレッドに適用される threading.Lock の概念と動作は、特に明記されていない限り、プロセスまたはスレッドのいずれかに適用される multiprocessing.Lock に複製されます。
Lock は実際には、デフォルトのコンテキストで初期化された
multiprocessing.synchronize.Lock
のインスタンスを返すファクトリ関数であることに注意してください。Lock は、 context manager プロトコルをサポートしているため、 with ステートメントで使用できます。
- acquire(block=True, timeout=None)
ロック、ブロッキング、または非ブロッキングを取得します。
block 引数を
True
(デフォルト)に設定すると、メソッド呼び出しはロックがロック解除状態になるまでブロックし、次にロックに設定してTrue
を返します。 。 この最初の引数の名前は、 threading.Lock.acquire()の名前とは異なることに注意してください。block 引数を
False
に設定すると、メソッド呼び出しはブロックされません。 ロックが現在ロック状態にある場合は、False
を返します。 それ以外の場合は、ロックをロック状態に設定し、True
を返します。timeout の正の浮動小数点値で呼び出された場合、ロックを取得できない限り、 timeout で指定された最大秒数の間ブロックします。 timeout の値が負の呼び出しは、 timeout がゼロの場合と同等です。 timeout の値が
None
(デフォルト)の呼び出しでは、タイムアウト期間が無限に設定されます。 timeout の負の値またはNone
値の処理は、 threading.Lock.acquire()で実装されている動作とは異なることに注意してください。 timeout 引数は、 block 引数がFalse
に設定されているため無視される場合、実際的な影響はありません。 ロックが取得されている場合はTrue
を返し、タイムアウト期間が経過している場合はFalse
を返します。
- release()
ロックを解除します。 これは、最初にロックを取得したプロセスまたはスレッドだけでなく、任意のプロセスまたはスレッドから呼び出すことができます。
動作は threading.Lock.release()と同じですが、ロック解除されたロックで呼び出されると、 ValueError が発生する点が異なります。
- class multiprocessing.RLock
再帰的ロックオブジェクト: threading.RLock の類似物。 再帰ロックは、それを取得したプロセスまたはスレッドによって解放される必要があります。 プロセスまたはスレッドが再帰ロックを取得すると、同じプロセスまたはスレッドがブロックせずに再度取得できます。 そのプロセスまたはスレッドは、取得されるたびに1回解放する必要があります。
RLock は実際には、デフォルトのコンテキストで初期化された
multiprocessing.synchronize.RLock
のインスタンスを返すファクトリ関数であることに注意してください。RLock は、コンテキストマネージャープロトコルをサポートしているため、 with ステートメントで使用できます。
- acquire(block=True, timeout=None)
ロック、ブロッキング、または非ブロッキングを取得します。
block 引数を
True
に設定して呼び出された場合、ロックが現在のプロセスによってすでに所有されていない限り、ロックがロック解除状態(プロセスまたはスレッドによって所有されていない)になるまでブロックします。スレッド。 次に、現在のプロセスまたはスレッドがロックの所有権を取得し(まだ所有権がない場合)、ロック内の再帰レベルが1ずつ増加し、True
の戻り値になります。 threading.RLock.acquire()の実装と比較して、この最初の引数の動作には、引数自体の名前から始まるいくつかの違いがあることに注意してください。block 引数を
False
に設定して呼び出す場合は、ブロックしないでください。 ロックがすでに別のプロセスまたはスレッドによって取得されている(したがって所有されている)場合、現在のプロセスまたはスレッドは所有権を取得せず、ロック内の再帰レベルは変更されないため、戻り値は [X233Xになります。 ]。 ロックがロック解除状態の場合、現在のプロセスまたはスレッドが所有権を取得し、再帰レベルがインクリメントされ、True
の戻り値になります。timeout 引数の使用法と動作は、 Lock.acquire()の場合と同じです。 timeout のこれらの動作の一部は、 threading.RLock.acquire()で実装されている動作とは異なることに注意してください。
- release()
ロックを解除し、再帰レベルを下げます。 デクリメント後に再帰レベルがゼロの場合は、ロックをロック解除にリセットし(プロセスまたはスレッドが所有していない)、ロックがロック解除されるのを待って他のプロセスまたはスレッドがブロックされている場合は、そのうちの1つだけを続行します。 デクリメント後も再帰レベルがゼロ以外の場合、ロックはロックされたままで、呼び出し元のプロセスまたはスレッドによって所有されます。
このメソッドは、呼び出し元のプロセスまたはスレッドがロックを所有している場合にのみ呼び出してください。 AssertionError は、このメソッドが所有者以外のプロセスまたはスレッドによって呼び出された場合、またはロックがロック解除(所有されていない)状態にある場合に発生します。 この状況で発生する例外のタイプは、 threading.RLock.release()で実装されている動作とは異なることに注意してください。
- class multiprocessing.Semaphore([value])
セマフォオブジェクト: threading.Semaphore の類似物。
その類似物との唯一の違いが存在します。その
acquire
メソッドの最初の引数は、 Lock.acquire()と一致するように、 block という名前です。
ノート
Mac OS Xでは、sem_timedwait
はサポートされていないため、タイムアウトを指定してacquire()
を呼び出すと、スリープループを使用してその関数の動作がエミュレートされます。
ノート
Ctrl-C によって生成されたSIGINTシグナルが、BoundedSemaphore.acquire()
、 Lock.acquire()、 RLockの呼び出しによってメインスレッドがブロックされている間に到着した場合。 Acquisition()、Semaphore.acquire()
、Condition.acquire()
、またはCondition.wait()
の場合、呼び出しはすぐに中断され、 KeyboardInterrupt が発生します。
これは、同等のブロッキング呼び出しの進行中にSIGINTが無視される threading の動作とは異なります。
ノート
このパッケージの機能の一部には、ホストオペレーティングシステム上で機能する共有セマフォの実装が必要です。 これがないと、multiprocessing.synchronize
モジュールが無効になり、インポートしようとすると ImportError になります。 詳細については、:issue: `3770` を参照してください。
マネージャー
マネージャーは、異なるマシンで実行されているプロセス間でネットワークを介して共有するなど、異なるプロセス間で共有できるデータを作成する方法を提供します。 マネージャーオブジェクトは、共有オブジェクトを管理するサーバープロセスを制御します。 他のプロセスは、プロキシを使用して共有オブジェクトにアクセスできます。
- multiprocessing.Manager()
- プロセス間でオブジェクトを共有するために使用できる、開始された SyncManager オブジェクトを返します。 返されるマネージャーオブジェクトは、生成された子プロセスに対応し、共有オブジェクトを作成して対応するプロキシを返すメソッドを持っています。
Managerプロセスは、ガベージコレクションが行われるか、親プロセスが終了するとすぐにシャットダウンされます。 マネージャークラスは、 multiprocessing.managers モジュールで定義されています。
- class multiprocessing.managers.BaseManager([address[, authkey]])
BaseManagerオブジェクトを作成します。
作成したら、 start()または
get_server().serve_forever()
を呼び出して、マネージャーオブジェクトが開始されたマネージャープロセスを参照していることを確認する必要があります。address は、マネージャープロセスが新しい接続をリッスンするアドレスです。 アドレスが
None
の場合、任意のアドレスが選択されます。authkey は、サーバープロセスへの着信接続の有効性を確認するために使用される認証キーです。 authkey が
None
の場合、current_process().authkey
が使用されます。 それ以外の場合は、 authkey が使用され、バイト文字列である必要があります。- start([initializer[, initargs]])
サブプロセスを開始して、マネージャーを開始します。 初期化子が
None
でない場合、サブプロセスは開始時にinitializer(*initargs)
を呼び出します。
- get_server()
Managerの制御下にある実際のサーバーを表す
Server
オブジェクトを返します。Server
オブジェクトは、serve_forever()
メソッドをサポートします。>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
には、さらにアドレス属性があります。
- connect()
ローカルマネージャオブジェクトをリモートマネージャプロセスに接続します。
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- shutdown()
マネージャーが使用するプロセスを停止します。 これは、 start()を使用してサーバープロセスを開始した場合にのみ使用できます。
これは複数回呼び出すことができます。
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
タイプを登録するために使用できる、またはマネージャークラスに呼び出し可能なクラスメソッド。
typeid は、特定のタイプの共有オブジェクトを識別するために使用される「タイプ識別子」です。 これは文字列である必要があります。
callable は、このタイプ識別子のオブジェクトを作成するために使用される呼び出し可能オブジェクトです。 マネージャーインスタンスが connect()メソッドを使用してサーバーに接続される場合、または create_method 引数が
False
の場合、これはNone
。proxytype は、 BaseProxy のサブクラスであり、この typeid を使用して共有オブジェクトのプロキシを作成するために使用されます。
None
の場合、プロキシクラスが自動的に作成されます。日付付きは、 BaseProxy._callmethod()を使用してこのtypeidのプロキシにアクセスを許可するメソッド名のシーケンスを指定するために使用されます。 ( Exposure が
None
の場合、proxytype._exposed_
が使用されます。)公開リストが指定されていない場合、共有オブジェクトのすべての「パブリックメソッド」アクセス可能になります。 (ここで「パブリックメソッド」とは、 __ call __()メソッドを持ち、名前が'_'
で始まらない属性を意味します。)method_to_typeid は、プロキシを返す必要がある公開されたメソッドの戻りタイプを指定するために使用されるマッピングです。 メソッド名をtypeid文字列にマップします。 ( method_to_typeid が
None
の場合、存在する場合は代わりにproxytype._method_to_typeid_
が使用されます。)メソッドの名前がこのマッピングのキーでない場合、またはマッピングがNone
次に、メソッドによって返されるオブジェクトは値によってコピーされます。create_method は、 typeid という名前でメソッドを作成するかどうかを決定します。これを使用して、サーバープロセスに新しい共有オブジェクトを作成し、そのプロキシを返すように指示できます。 デフォルトでは
True
です。
BaseManager インスタンスにも、読み取り専用プロパティが1つあります。
- address
マネージャーが使用するアドレス。
バージョン3.3で変更:マネージャーオブジェクトはコンテキスト管理プロトコルをサポートします– コンテキストマネージャータイプを参照してください。 __ enter __()は、サーバープロセスを開始し(まだ開始されていない場合)、マネージャーオブジェクトを返します。 __ exit __()は shutdown()を呼び出します。
以前のバージョンでは、 __ enter __()は、マネージャーのサーバープロセスがまだ開始されていない場合、開始しませんでした。
- class multiprocessing.managers.SyncManager
プロセスの同期に使用できる BaseManager のサブクラス。 このタイプのオブジェクトは、
multiprocessing.Manager()
によって返されます。そのメソッドは、プロセス間で同期される一般的に使用される多数のデータ型のプロキシオブジェクトを作成して返します。 これには、特に共有リストと辞書が含まれます。
- Barrier(parties[, action[, timeout]])
共有 threading.Barrier オブジェクトを作成し、そのプロキシを返します。
バージョン3.3の新機能。
- BoundedSemaphore([value])
共有 threading.BoundedSemaphore オブジェクトを作成し、そのプロキシを返します。
- Condition([lock])
共有 threading.Condition オブジェクトを作成し、そのプロキシを返します。
lock が指定されている場合は、 threading.Lock または threading.RLock オブジェクトのプロキシである必要があります。
バージョン3.3で変更: wait_for()メソッドが追加されました。
- Event()
共有 threading.Event オブジェクトを作成し、そのプロキシを返します。
- Lock()
共有 threading.Lock オブジェクトを作成し、そのプロキシを返します。
- Namespace()
共有名前空間オブジェクトを作成し、そのプロキシを返します。
- Queue([maxsize])
共有 queue.Queue オブジェクトを作成し、そのプロキシを返します。
- RLock()
共有 threading.RLock オブジェクトを作成し、そのプロキシを返します。
- Semaphore([value])
共有 threading.Semaphore オブジェクトを作成し、そのプロキシを返します。
- Array(typecode, sequence)
配列を作成し、そのプロキシを返します。
- Value(typecode, value)
書き込み可能な
value
属性を持つオブジェクトを作成し、そのプロキシを返します。
- dict()
dict(mapping)
dict(sequence) 共有 dict オブジェクトを作成し、そのプロキシを返します。
- list()
list(sequence) 共有 list オブジェクトを作成し、そのプロキシを返します。
バージョン3.6で変更:共有オブジェクトはネストできます。 たとえば、共有リストなどの共有コンテナオブジェクトには、 SyncManager によってすべて管理および同期される他の共有オブジェクトを含めることができます。
- class multiprocessing.managers.Namespace
SyncManager に登録できるタイプ。
名前空間オブジェクトにはパブリックメソッドはありませんが、書き込み可能な属性はあります。 その表現は、その属性の値を示しています。
ただし、名前空間オブジェクトにプロキシを使用する場合、
'_'
で始まる属性はプロキシの属性であり、指示対象の属性ではありません。>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
カスタマイズされたマネージャー
独自のマネージャーを作成するには、 BaseManager のサブクラスを作成し、 register() classmethodを使用して、新しいタイプまたは呼び出し可能オブジェクトをマネージャークラスに登録します。 例えば:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
リモートマネージャーの使用
1台のマシンでマネージャーサーバーを実行し、クライアントに他のマシンからそれを使用させることができます(関係するファイアウォールがそれを許可していると仮定します)。
次のコマンドを実行すると、リモートクライアントがアクセスできる単一の共有キュー用のサーバーが作成されます。
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
1つのクライアントは、次のようにサーバーにアクセスできます。
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
別のクライアントもそれを使用できます:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
ローカルプロセスは、クライアントで上記のコードを使用してキューにリモートアクセスすることにより、そのキューにアクセスすることもできます。
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
プロキシオブジェクト
プロキシは、(おそらく)別のプロセスに存在する共有オブジェクトを参照するオブジェクトです。 共有オブジェクトは、プロキシの指示対象であると言われます。 複数のプロキシオブジェクトが同じ指示対象を持つ場合があります。
プロキシオブジェクトには、その指示対象の対応するメソッドを呼び出すメソッドがあります(ただし、指示対象のすべてのメソッドが必ずしもプロキシを介して利用できるわけではありません)。 このようにして、プロキシは、その指示対象と同じように使用できます。
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
str()をプロキシに適用すると、指示対象の表現が返されますが、 repr()を適用すると、プロキシの表現が返されることに注意してください。
プロキシオブジェクトの重要な機能は、プロセス間で受け渡すことができるように選択できることです。 そのため、指示対象にはプロキシオブジェクトを含めることができます。 これにより、これらの管理対象リスト、dict、およびその他のプロキシオブジェクトのネストが可能になります。
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
同様に、dictプロキシとlistプロキシは相互にネストできます。
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
標準(非プロキシ) list または dict オブジェクトが指示対象に含まれている場合、プロキシにはいつかを知る方法がないため、これらの可変値への変更はマネージャーを介して伝播されません。に含まれる値が変更されます。 ただし、コンテナプロキシ(プロキシオブジェクトで__setitem__
をトリガーする)に値を格納すると、マネージャを介して伝播されるため、そのようなアイテムを効果的に変更するには、変更した値をコンテナプロキシに再割り当てできます。 :
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
このアプローチは、ほとんどのユースケースでネストされたプロキシオブジェクトを使用するよりもおそらく便利ではありませんが、同期に対する制御のレベルも示しています。
ノート
マルチプロセッシングのプロキシタイプは、値による比較をサポートするために何もしません。 したがって、たとえば、次のようになります。
>>> manager.list([1,2,3]) == [1,2,3]
False
比較を行うときは、代わりに指示対象のコピーを使用する必要があります。
- class multiprocessing.managers.BaseProxy
プロキシオブジェクトは、 BaseProxy のサブクラスのインスタンスです。
- _callmethod(methodname[, args[, kwds]])
プロキシの指示対象のメソッドの結果を呼び出して返します。
proxy
が、指示対象がobj
であるプロキシである場合、式proxy._callmethod(methodname, args, kwds)
式を評価します
getattr(obj, methodname)(*args, **kwds)
マネージャーのプロセスで。
戻り値は、呼び出しの結果のコピーまたは新しい共有オブジェクトへのプロキシになります。 BaseManager.register()の method_to_typeid 引数のドキュメントを参照してください。
呼び出しによって例外が発生した場合は、 _callmethod()によって再発生します。 マネージャーのプロセスで他の例外が発生した場合、これは
RemoteError
例外に変換され、 _callmethod()によって発生します。特に、 methodname が exposed されていない場合、例外が発生することに注意してください。
_callmethod()の使用例:
>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()
指示対象のコピーを返します。
指示対象が選択できない場合、これにより例外が発生します。
- __repr__()
プロキシオブジェクトの表現を返します。
- __str__()
指示対象の表現を返します。
掃除
プロキシオブジェクトはweakrefコールバックを使用するため、ガベージコレクションが行われると、リファレントを所有するマネージャーから自身の登録が解除されます。
共有オブジェクトを参照するプロキシがなくなると、共有オブジェクトはマネージャプロセスから削除されます。
プロセスプール
Pool クラスで送信されたタスクを実行するプロセスのプールを作成できます。
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
ジョブを送信できるワーカープロセスのプールを制御するプロセスプールオブジェクト。 タイムアウトとコールバックを伴う非同期結果をサポートし、並列マップの実装があります。
processes は、使用するワーカープロセスの数です。 プロセスが
None
の場合、 os.cpu_count()によって返される番号が使用されます。イニシャライザーが
None
でない場合、各ワーカープロセスは起動時にinitializer(*initargs)
を呼び出します。maxtasksperchild は、ワーカープロセスが終了して新しいワーカープロセスに置き換えられる前に完了できるタスクの数であり、未使用のリソースを解放できるようにします。 デフォルトの maxtasksperchild は
None
です。これは、ワーカープロセスがプールと同じくらい存続することを意味します。context を使用して、ワーカープロセスの開始に使用されるコンテキストを指定できます。 通常、プールは、コンテキストオブジェクトの関数
multiprocessing.Pool()
または Pool()メソッドを使用して作成されます。 どちらの場合も、 context は適切に設定されています。プールオブジェクトのメソッドは、プールを作成したプロセスによってのみ呼び出される必要があることに注意してください。
警告
multiprocessing.pool オブジェクトには、プールをコンテキストマネージャーとして使用するか、 close()および terminateを呼び出すことにより、(他のリソースと同様に)適切に管理する必要がある内部リソースがあります。 ()手動。 これを怠ると、プロセスがファイナライズに掛かる可能性があります。
CPythonはプールのファイナライザーが呼び出されることを保証しないため、ガベージコレーターに依存してプールを破棄するのは正しくないことに注意してください(詳細については、 object .__ del __()を参照してください)。情報)。
バージョン3.2の新機能: maxtasksperchild
バージョン3.4の新機能: 環境
ノート
Pool 内のワーカープロセスは、通常、プールのワークキューの全期間にわたって存続します。 他のシステム(Apache、mod_wsgiなど)でよく見られるパターンは、ワーカーが保持するリソースを解放するために、プール内のワーカーが終了、クリーンアップ、新しいプロセスの生成を行う前に、設定された量の作業のみを完了できるようにすることです。古いものを交換します。 Pool の maxtasksperchild 引数は、この機能をエンドユーザーに公開します。
- apply(func[, args[, kwds]])
引数 args とキーワード引数 kwds を指定して func を呼び出します。 結果の準備ができるまでブロックします。 このブロックを考えると、 apply_async()は、作業を並行して実行するのに適しています。 さらに、 func は、プールのワーカーの1つでのみ実行されます。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
AsyncResult オブジェクトを返す apply()メソッドのバリアント。
callback が指定されている場合、それは単一の引数を受け入れる呼び出し可能である必要があります。 結果の準備が整うと、コールバックが適用されます。つまり、呼び出しが失敗した場合を除きます。失敗した場合は、代わりに error_callback が適用されます。
error_callback が指定されている場合、それは単一の引数を受け入れる呼び出し可能である必要があります。 ターゲット関数が失敗した場合、 error_callback が例外インスタンスで呼び出されます。
結果を処理するスレッドがブロックされるため、コールバックはすぐに完了する必要があります。
- map(func, iterable[, chunksize])
map()組み込み関数と同等の並列機能(ただし、 iterable 引数は1つしかサポートしませんが、複数のiterableについては starmap()を参照してください)。 結果の準備ができるまでブロックします。
このメソッドは、イテラブルをいくつかのチャンクに分割し、個別のタスクとしてプロセスプールに送信します。 これらのチャンクの(おおよその)サイズは、 chunksize を正の整数に設定することで指定できます。
非常に長い反復可能オブジェクトでは、メモリ使用量が高くなる可能性があることに注意してください。 効率を上げるために、 imap()または imap_unordered()を明示的な chunksize オプションとともに使用することを検討してください。
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
AsyncResult オブジェクトを返す map()メソッドのバリアント。
callback が指定されている場合、それは単一の引数を受け入れる呼び出し可能である必要があります。 結果の準備が整うと、コールバックが適用されます。つまり、呼び出しが失敗した場合を除きます。失敗した場合は、代わりに error_callback が適用されます。
error_callback が指定されている場合、それは単一の引数を受け入れる呼び出し可能である必要があります。 ターゲット関数が失敗した場合、 error_callback が例外インスタンスで呼び出されます。
結果を処理するスレッドがブロックされるため、コールバックはすぐに完了する必要があります。
- imap(func, iterable[, chunksize])
map()の遅延バージョン。
chunksize 引数は、 map()メソッドで使用される引数と同じです。 chunksize に大きな値を使用する非常に長い反復可能オブジェクトの場合、デフォルト値の
1
を使用するよりもはるかに速くジョブを完了することができます。また、 chunksize が
1
の場合、 imap()メソッドによって返されるイテレーターのnext()
メソッドには、オプションの timeout があります。 ]パラメータ: timeout 秒以内に結果を返すことができない場合、next(timeout)
は multiprocessing.TimeoutError を発生させます。
- imap_unordered(func, iterable[, chunksize])
imap()と同じですが、返されるイテレータからの結果の順序は任意であると見なす必要があります。 (ワーカープロセスが1つしかない場合にのみ、順序が「正しい」ことが保証されます。)
- starmap(func, iterable[, chunksize])
map()と同様ですが、 iterable の要素は、引数としてアンパックされるiterableであることが期待されている点が異なります。
したがって、
[(1,2), (3, 4)]
の iterable は、[func(1,2), func(3,4)]
になります。バージョン3.3の新機能。
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
starmap()と map_async()の組み合わせで、反復可能オブジェクトの反復可能を反復処理し、反復可能オブジェクトを解凍して func を呼び出します。 結果オブジェクトを返します。
バージョン3.3の新機能。
- close()
これ以上タスクがプールに送信されないようにします。 すべてのタスクが完了すると、ワーカープロセスは終了します。
- terminate()
未処理の作業を完了せずに、ワーカープロセスをただちに停止します。 プールオブジェクトがガベージコレクションされると、 terminate()がすぐに呼び出されます。
- join()
ワーカープロセスが終了するのを待ちます。 join()を使用する前に、 close()または terminate()を呼び出す必要があります。
バージョン3.3の新機能:プールオブジェクトがコンテキスト管理プロトコルをサポートするようになりました– コンテキストマネージャータイプを参照してください。 __ enter __()はプールオブジェクトを返し、 __ exit __()は terminate()を呼び出します。
- class multiprocessing.pool.AsyncResult
Pool.apply_async()および Pool.map_async()によって返される結果のクラス。
- get([timeout])
到着時に結果を返します。 timeout が
None
でなく、結果が timeout 秒以内に到着しない場合、 multiprocessing.TimeoutError が発生します。 リモート呼び出しで例外が発生した場合、その例外は get()によって再発生します。
- wait([timeout])
結果が利用可能になるまで、または timeout 秒が経過するまで待ちます。
- ready()
通話が完了したかどうかを返します。
- successful()
例外を発生させずに呼び出しが完了したかどうかを返します。 結果の準備ができていない場合、 ValueError が発生します。
バージョン3.7で変更:結果の準備ができていない場合、 AssertionError の代わりに ValueError が発生します。
次の例は、プールの使用法を示しています。
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
リスナーとクライアント
通常、プロセス間でのメッセージパッシングは、キューを使用するか、 Pipe()によって返される Connection オブジェクトを使用して行われます。
ただし、 multiprocessing.connection モジュールを使用すると、柔軟性がさらに高まります。 基本的に、ソケットまたはWindowsの名前付きパイプを処理するための高レベルのメッセージ指向APIを提供します。 また、 hmac モジュールを使用したダイジェスト認証、および複数の接続を同時にポーリングすることもサポートしています。
- multiprocessing.connection.deliver_challenge(connection, authkey)
ランダムに生成されたメッセージを接続のもう一方の端に送信し、応答を待ちます。
応答が authkey をキーとして使用してメッセージのダイジェストと一致する場合、ウェルカムメッセージが接続のもう一方の端に送信されます。 それ以外の場合は、 AuthenticationError が発生します。
- multiprocessing.connection.answer_challenge(connection, authkey)
メッセージを受信し、 authkey をキーとしてメッセージのダイジェストを計算してから、ダイジェストを送り返します。
ウェルカムメッセージが受信されない場合、 AuthenticationError が発生します。
- multiprocessing.connection.Client(address[, family[, authkey]])
アドレスアドレスを使用しているリスナーへの接続をセットアップしようとし、接続を返します。
接続のタイプはファミリ引数によって決定されますが、通常はアドレスの形式から推測できるため、通常は省略できます。 (アドレス形式を参照)
authkey が指定され、Noneではない場合、それはバイト文字列である必要があり、HMACベースの認証チャレンジの秘密鍵として使用されます。 authkey がNoneの場合、認証は行われません。 AuthenticationError は、認証が失敗した場合に発生します。 認証キーを参照してください。
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])
接続を「リッスン」している、バインドされたソケットまたはWindowsの名前付きパイプのラッパー。
address は、リスナーオブジェクトのバインドされたソケットまたは名前付きパイプによって使用されるアドレスです。
ノート
'0.0.0.0'のアドレスが使用されている場合、そのアドレスはWindowsで接続可能なエンドポイントにはなりません。 接続可能なエンドポイントが必要な場合は、「127.0.0.1」を使用する必要があります。
family は、使用するソケット(または名前付きパイプ)のタイプです。 これは、文字列
'AF_INET'
(TCPソケットの場合)、'AF_UNIX'
(Unixドメインソケットの場合)、または'AF_PIPE'
(Windows名前付きパイプの場合)のいずれかになります。 これらのうち、最初のものだけが利用可能であることが保証されています。 ファミリがNone
の場合、ファミリはアドレスの形式から推測されます。 アドレスもNone
の場合、デフォルトが選択されます。 このデフォルトは、利用可能な最速であると想定されるファミリです。 アドレス形式を参照してください。 ファミリが'AF_UNIX'
で、アドレスがNone
の場合、ソケットは tempfile.mkstemp()を使用して作成されたプライベート一時ディレクトリに作成されることに注意してください。 。リスナーオブジェクトがソケットを使用する場合、バインドされると、 backlog (デフォルトでは1)がソケットの listen()メソッドに渡されます。
authkey が指定され、Noneではない場合、それはバイト文字列である必要があり、HMACベースの認証チャレンジの秘密鍵として使用されます。 authkey がNoneの場合、認証は行われません。 AuthenticationError は、認証が失敗した場合に発生します。 認証キーを参照してください。
- accept()
リスナーオブジェクトのバインドされたソケットまたは名前付きパイプで接続を受け入れ、 Connection オブジェクトを返します。 認証が試行されて失敗した場合、 AuthenticationError が発生します。
- close()
リスナーオブジェクトのバインドされたソケットまたは名前付きパイプを閉じます。 これは、リスナーがガベージコレクションされるときに自動的に呼び出されます。 ただし、明示的に呼び出すことをお勧めします。
リスナーオブジェクトには、次の読み取り専用プロパティがあります。
- address
Listenerオブジェクトによって使用されているアドレス。
- last_accepted
最後に受け入れられた接続の送信元のアドレス。 これが利用できない場合は、
None
です。
バージョン3.3の新機能:リスナーオブジェクトがコンテキスト管理プロトコルをサポートするようになりました– コンテキストマネージャータイプを参照してください。 __ enter __()はリスナーオブジェクトを返し、 __ exit __()は close()を呼び出します。
- multiprocessing.connection.wait(object_list, timeout=None)
object_list のオブジェクトの準備ができるまで待ちます。 object_list 内の準備ができているオブジェクトのリストを返します。 timeout がfloatの場合、呼び出しは最大でその秒数の間ブロックされます。 timeout が
None
の場合、無制限の期間ブロックされます。 負のタイムアウトは、ゼロタイムアウトと同等です。UnixとWindowsの両方で、オブジェクトが object_list に表示される場合は、
読み取り可能な接続オブジェクト。
接続された読み取り可能な socket.socket オブジェクト。 また
接続またはソケットオブジェクトは、そこから読み取ることができるデータがある場合、またはもう一方の端が閉じられている場合に準備ができています。
Unix :
wait(object_list, timeout)
ほぼ同等のselect.select(object_list, [], [], timeout)
。 違いは、 select.select()がシグナルによって中断された場合、エラー番号EINTR
で OSError を発生させる可能性があるのに対し、 waitは()はしません。Windows : object_list の項目は、待機可能な整数ハンドル(Win32関数
WaitForMultipleObjects()
のドキュメントで使用されている定義による)であるか、次のいずれかである必要があります。ソケットハンドルまたはパイプハンドルを返すfileno()
メソッドを持つオブジェクトである。 (パイプハンドルとソケットハンドルは待機可能ハンドルではないことに注意してください。)バージョン3.3の新機能。
例
次のサーバーコードは、'secret password'
を認証キーとして使用するリスナーを作成します。 次に、接続を待機し、クライアントにデータを送信します。
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
次のコードはサーバーに接続し、サーバーからデータを受信します。
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
次のコードは、 wait()を使用して、複数のプロセスからのメッセージを一度に待機します。
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
アドレス形式
'AF_INET'
アドレスは、(hostname, port)
の形式のタプルです。ここで、 hostname は文字列、 port は整数です。'AF_UNIX'
アドレスは、ファイルシステム上のファイル名を表す文字列です。'AF_PIPE'
アドレスは、r'\.\pipe{PipeName}'
の形式の文字列です。 Client()を使用して ServerName という名前のリモートコンピューター上の名前付きパイプに接続するには、代わりにr'\ServerName\pipe{PipeName}'
の形式のアドレスを使用する必要があります。
2つの円記号で始まる文字列は、デフォルトでは'AF_UNIX'
アドレスではなく'AF_PIPE'
アドレスであると見なされることに注意してください。
認証キー
Connection.recv を使用すると、受信したデータは自動的に選択解除されます。 残念ながら、信頼できないソースからデータを選択解除することはセキュリティリスクです。 したがって、 Listener および Client()は、 hmac モジュールを使用してダイジェスト認証を提供します。
認証キーは、パスワードと見なすことができるバイト文字列です。接続が確立されると、両端は、相手が認証キーを知っていることの証明を要求します。 (両端が同じキーを使用していることを示すには、接続を介してキーを送信する必要はありませんではありません。)
認証が要求されたが認証キーが指定されていない場合、current_process().authkey
の戻り値が使用されます( Process を参照)。 この値は、現在のプロセスが作成する Process オブジェクトによって自動的に継承されます。 これは、(デフォルトで)マルチプロセスプログラムのすべてのプロセスが、それらの間の接続を設定するときに使用できる単一の認証キーを共有することを意味します。
os.urandom()を使用して、適切な認証キーを生成することもできます。
ロギング
ロギングのサポートが利用可能です。 ただし、 logging パッケージはプロセス共有ロックを使用しないため、(ハンドラーの種類によっては)異なるプロセスからのメッセージが混同される可能性があることに注意してください。
- multiprocessing.get_logger()
マルチプロセッシングで使用されるロガーを返します。 必要に応じて、新しいものが作成されます。
最初に作成されたとき、ロガーにはレベル
logging.NOTSET
があり、デフォルトのハンドラーはありません。 このロガーに送信されたメッセージは、デフォルトではルートロガーに伝播されません。Windowsでは、子プロセスは親プロセスのロガーのレベルのみを継承することに注意してください。ロガーの他のカスタマイズは継承されません。
- multiprocessing.log_to_stderr()
- この関数は get_logger()の呼び出しを実行しますが、get_loggerによって作成されたロガーを返すことに加えて、フォーマット
'[%(levelname)s/%(processName)s] %(message)s'
を使用して sys.stderr に出力を送信するハンドラーを追加します。 ]。
以下は、ロギングがオンになっているセッションの例です。
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
ロギングレベルの完全な表については、 logging モジュールを参照してください。
multiprocessing.dummy モジュール
multiprocessing.dummy は、 multiprocessing のAPIを複製しますが、 threading モジュールのラッパーにすぎません。
特に、 multiprocessing.dummy によって提供されるPool
関数は、すべてをサポートする Pool のサブクラスである ThreadPool のインスタンスを返します。同じメソッド呼び出しですが、ワーカープロセスではなくワーカースレッドのプールを使用します。
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])
ジョブを送信できるワーカースレッドのプールを制御するスレッドプールオブジェクト。 ThreadPool インスタンスは Pool インスタンスと完全にインターフェイス互換性があり、プールをコンテキストマネージャーとして使用するか、 close()[を呼び出すことにより、それらのリソースも適切に管理する必要があります。 X213X]および terminate()を手動で。
methods は、使用するワーカースレッドの数です。 プロセスが
None
の場合、 os.cpu_count()によって返される番号が使用されます。イニシャライザーが
None
でない場合、各ワーカープロセスは起動時にinitializer(*initargs)
を呼び出します。Pool とは異なり、 maxtasksperchild および context は提供できません。
ノート
ThreadPool は、 Pool と同じインターフェースを共有します。これは、プロセスのプールを中心に設計されており、 concurrent.futures モジュールの導入よりも前のものです。 そのため、スレッドに基づくプールには意味のないいくつかの操作を継承し、非同期ジョブのステータスを表す独自のタイプ AsyncResult を持ちます。これは、他のライブラリでは理解されません。 。
ユーザーは通常、 current.futures.ThreadPoolExecutor を使用することをお勧めします。これは、最初からスレッドを中心に設計されたシンプルなインターフェイスを備え、互換性のある current.futures.Future インスタンスを返します。 asyncio を含む他の多くのライブラリと一緒に。
プログラミングガイドライン
マルチプロセッシングを使用する際に遵守する必要のある特定のガイドラインとイディオムがあります。
すべての開始メソッド
以下は、すべてのstartメソッドに適用されます。
共有状態を避ける
可能な限り、プロセス間で大量のデータをシフトしないようにする必要があります。
下位レベルの同期プリミティブを使用するよりも、プロセス間の通信にキューまたはパイプを使用することに固執するのがおそらく最善です。
ピッカビリティ
プロキシのメソッドへの引数が選択可能であることを確認してください。
プロキシのスレッドセーフ
ロックで保護しない限り、複数のスレッドからのプロキシオブジェクトを使用しないでください。
(同じプロキシを使用するさまざまなプロセスで問題が発生することはありません。)
ゾンビプロセスへの参加
Unixでは、プロセスが終了したが参加していない場合、それはゾンビになります。 新しいプロセスが開始される(または active_children()が呼び出される)たびに、まだ参加されていない完了したすべてのプロセスが参加するため、非常に多くなることはありません。 また、終了したプロセスの Process.is_alive を呼び出すとプロセスに参加します。 それでも、開始するすべてのプロセスに明示的に参加することをお勧めします。
ピクルス/アンピクルよりも継承する方が良い
spawn または forkserver startメソッドを使用する場合、子プロセスがそれらを使用できるように、 multiprocessing からの多くのタイプを選択可能にする必要があります。 ただし、通常、パイプまたはキューを使用して共有オブジェクトを他のプロセスに送信することは避けてください。 代わりに、他の場所で作成された共有リソースへのアクセスを必要とするプロセスが祖先プロセスから継承できるようにプログラムを配置する必要があります。
プロセスの終了を回避する
Process.terminate メソッドを使用してプロセスを停止すると、プロセスで現在使用されている共有リソース(ロック、セマフォ、パイプ、キューなど)が壊れたり、他のプロセスで使用できなくなったりする可能性があります。
したがって、共有リソースをまったく使用しないプロセスでのみ Process.terminate を使用することを検討するのがおそらく最善です。
キューを使用するプロセスへの参加
アイテムをキューに入れたプロセスは、バッファリングされたすべてのアイテムが「フィーダー」スレッドによって基になるパイプに供給されるまで、終了するまで待機することに注意してください。 (子プロセスは、この動作を回避するために、キューの Queue.cancel_join_thread メソッドを呼び出すことができます。)
つまり、キューを使用するときは常に、プロセスに参加する前に、キューに配置されたすべてのアイテムが最終的に削除されることを確認する必要があります。 そうしないと、アイテムをキューに入れたプロセスが終了するかどうかを確認できません。 非デーモンプロセスは自動的に参加することにも注意してください。
デッドロックが発生する例は次のとおりです。
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()
ここでの修正は、最後の2行を交換することです(または単に
p.join()
行を削除します)。
子プロセスにリソースを明示的に渡す
fork startメソッドを使用するUnixでは、子プロセスは、グローバルリソースを使用して親プロセスで作成された共有リソースを利用できます。 ただし、子プロセスのコンストラクターに引数としてオブジェクトを渡すことをお勧めします。
コードを(潜在的に)Windowsや他のstartメソッドと互換性を持たせることは別として、これにより、子プロセスがまだ生きている限り、オブジェクトが親プロセスでガベージコレクションされないことが保証されます。 これは、オブジェクトが親プロセスでガベージコレクションされたときに一部のリソースが解放された場合に重要になる可能性があります。
だから例えば
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()
次のように書き直す必要があります
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
sys.stdin を「ファイルのようなオブジェクト」に置き換えることに注意してください
マルチプロセッシングは元々無条件に呼び出されました:
os.close(sys.stdin.fileno())
multiprocessing.Process._bootstrap()
メソッドの場合—これにより、プロセス内のプロセスで問題が発生しました。 これは次のように変更されました。sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)
これは、プロセスが互いに衝突して不正なファイル記述子エラーが発生するという基本的な問題を解決しますが、 sys.stdin()を出力バッファリングを備えた「ファイルのようなオブジェクト」に置き換えるアプリケーションに潜在的な危険をもたらします。 この危険性は、複数のプロセスがこのファイルのようなオブジェクトに対して close()を呼び出すと、同じデータがオブジェクトに複数回フラッシュされ、破損する可能性があることです。
ファイルのようなオブジェクトを作成して独自のキャッシュを実装する場合は、キャッシュに追加するたびにpidを保存し、pidが変更されたときにキャッシュを破棄することで、フォークセーフにすることができます。 例えば:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
詳細については、:issue: `5155` 、:issue:` 5313` 、および:issue: `5331` を参照してください。
spawn および forkserver 開始メソッド
fork startメソッドには適用されない追加の制限がいくつかあります。
より多くのピック可能性
Process.__init__()
へのすべての引数が選択可能であることを確認してください。 また、 Process をサブクラス化する場合は、 Process.start メソッドが呼び出されたときにインスタンスが選択可能であることを確認してください。
グローバル変数
子プロセスで実行されたコードがグローバル変数にアクセスしようとすると、表示される値(存在する場合)は、 Process.start [の時点での親プロセスの値と同じではない可能性があることに注意してください。 X205X]が呼び出されました。
ただし、モジュールレベルの定数であるグローバル変数は問題を引き起こしません。
メインモジュールの安全なインポート
メインモジュールが、意図しない副作用(新しいプロセスの開始など)を引き起こすことなく、新しいPythonインタープリターによって安全にインポートできることを確認してください。
たとえば、 spawn または forkserver startメソッドを使用して次のモジュールを実行すると、 RuntimeError で失敗します。
from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()
代わりに、次のように
if __name__ == '__main__':
を使用して、プログラムの「エントリポイント」を保護する必要があります。from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()
(プログラムがフリーズする代わりに正常に実行される場合は、
freeze_support()
行を省略できます。)これにより、新しく生成されたPythonインタープリターがモジュールを安全にインポートし、モジュールの
foo()
関数を実行できるようになります。プールまたはマネージャーがメインモジュールに作成されている場合も、同様の制限が適用されます。
例
カスタマイズされたマネージャーとプロキシを作成して使用する方法のデモンストレーション:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
プールの使用:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
キューを使用してタスクをワーカープロセスのコレクションにフィードし、結果を収集する方法を示す例:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()