16.6. マルチプロセッシング—プロセスベースの「スレッド化」インターフェース—Pythonドキュメント
16.6。 マルチプロセッシング —プロセスベースの「スレッド化」インターフェース
バージョン2.6の新機能。
16.6.1。 序章
マルチプロセッシングは、スレッドモジュールと同様のAPIを使用したスポーンプロセスをサポートするパッケージです。 マルチプロセッシングパッケージは、ローカルとリモートの両方の同時実行性を提供し、スレッドの代わりにサブプロセスを使用することで、グローバルインタープリターロックを効果的に回避します。 このため、マルチプロセッシングモジュールを使用すると、プログラマーは特定のマシンで複数のプロセッサーを完全に活用できます。 UnixとWindowsの両方で動作します。
マルチプロセッシングモジュールでは、スレッドモジュールにアナログがないAPIも導入されています。 この代表的な例は、Pool
オブジェクトです。これは、関数の実行を複数の入力値に並列化し、入力データをプロセスに分散する便利な手段を提供します(データ並列処理)。 次の例は、子プロセスがそのモジュールを正常にインポートできるように、モジュールでそのような関数を定義する一般的な方法を示しています。 Pool
を使用したデータ並列処理のこの基本的な例、
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
標準出力に出力します
[1, 4, 9]
16.6.1.1。 NS プロセスクラス
マルチプロセッシングでは、 Process オブジェクトを作成し、その start()メソッドを呼び出すことによってプロセスが生成されます。 Process は、 threading.Thread のAPIに従います。 マルチプロセスプログラムの簡単な例は次のとおりです。
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
関連する個々のプロセスIDを表示するために、以下に拡張例を示します。
from multiprocessing import Process
import os
def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
(Windowsで)if __name__ == '__main__'
パーツが必要な理由の説明については、プログラミングガイドラインを参照してください。
16.6.1.2。 プロセス間でのオブジェクトの交換
マルチプロセッシングは、プロセス間の2種類の通信チャネルをサポートします。
キュー
Queue クラスは、 Queue.Queue のニアクローンです。 例えば:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()
キューはスレッドおよびプロセスセーフです。
パイプ
Pipe()関数は、デフォルトでデュプレックス(双方向)であるパイプで接続された接続オブジェクトのペアを返します。 例えば:
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print parent_conn.recv() # prints "[42, None, 'hello']" p.join()
Pipe()によって返される2つの接続オブジェクトは、パイプの両端を表します。 各接続オブジェクトには、 send()メソッドと recv()メソッドがあります(とりわけ)。 2つのプロセス(またはスレッド)がパイプの同じ端から同時に読み取りまたは書き込みを行おうとすると、パイプ内のデータが破損する可能性があることに注意してください。 もちろん、パイプの異なる端を同時に使用するプロセスによる破損のリスクはありません。
16.6.1.3。 プロセス間の同期
マルチプロセッシングには、スレッドのすべての同期プリミティブに相当するものが含まれています。 たとえば、ロックを使用して、一度に1つのプロセスのみが標準出力に出力されるようにすることができます。
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print 'hello world', i
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
異なるプロセスからのロック出力を使用しないと、すべてが混乱する傾向があります。
16.6.1.5。 労働者のプールを使用する
Pool
クラスは、ワーカープロセスのプールを表します。 いくつかの異なる方法でタスクをワーカープロセスにオフロードできるようにするメソッドがあります。
例えば:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"
プールのメソッドは、プールを作成したプロセスによってのみ使用される必要があることに注意してください。
ノート
このパッケージ内の機能には、__main__
モジュールが子によってインポート可能である必要があります。 これはプログラミングガイドラインでカバーされていますが、ここで指摘する価値があります。 これは、Pool
の例など、一部の例が対話型インタープリターで機能しないことを意味します。 例えば:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(これを試してみると、実際にはセミランダムにインターリーブされた3つの完全なトレースバックが出力され、マスタープロセスを何らかの方法で停止する必要がある場合があります。)
16.6.2。 リファレンス
マルチプロセッシングパッケージは、ほとんどの場合、スレッドモジュールのAPIを複製します。
16.6.2.1。 プロセスおよび例外
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
プロセスオブジェクトは、別のプロセスで実行されるアクティビティを表します。 Process クラスには、 threading.Thread のすべてのメソッドと同等のものがあります。
コンストラクターは常にキーワード引数を使用して呼び出す必要があります。 group は常に
None
である必要があります。 threading.Thread との互換性のためにのみ存在します。 target は、 run()メソッドによって呼び出される呼び出し可能なオブジェクトです。 デフォルトはNone
で、何も呼び出されないことを意味します。 name はプロセス名です。 デフォルトでは、一意の名前は「Process-N 1 :N 2 :…:N k 」の形式で構成されます。ここで、N 1 [ X114X]、N 2 、…、N k は、プロセスの世代によって長さが決定される整数のシーケンスです。 args は、ターゲット呼び出しの引数タプルです。 kwargs は、ターゲット呼び出しのキーワード引数の辞書です。 デフォルトでは、 target に引数は渡されません。サブクラスがコンストラクターをオーバーライドする場合、プロセスに他のことを行う前に、サブクラスが基本クラスコンストラクター(
Process.__init__()
)を呼び出すことを確認する必要があります。- run()
プロセスのアクティビティを表すメソッド。
サブクラスでこのメソッドをオーバーライドできます。 標準の run()メソッドは、 args および kwargs [から取得したシーケンシャル引数とキーワード引数を使用して、オブジェクトのコンストラクターに渡された呼び出し可能オブジェクトをターゲット引数として呼び出します(存在する場合)。 X211X]引数、それぞれ。
- start()
プロセスのアクティビティを開始します。
これは、プロセスオブジェクトごとに最大1回呼び出す必要があります。 オブジェクトの run()メソッドが別のプロセスで呼び出されるように調整します。
- join([timeout])
join()メソッドが呼び出されたプロセスが終了するまで、またはオプションのタイムアウトが発生するまで、呼び出しスレッドをブロックします。
timeout が
None
の場合、タイムアウトはありません。プロセスは何度でも参加できます。
デッドロックが発生するため、プロセスはそれ自体に参加できません。 プロセスが開始される前にプロセスに参加しようとするとエラーになります。
- name
プロセスの名前。
名前は、識別目的でのみ使用される文字列です。 セマンティクスはありません。 複数のプロセスに同じ名前を付けることができます。 初期名はコンストラクターによって設定されます。
- is_alive()
プロセスが生きているかどうかを返します。
大まかに言って、プロセスオブジェクトは、 start()メソッドが戻った瞬間から子プロセスが終了するまで存続します。
- daemon
プロセスのデーモンフラグ、ブール値。 これは、 start()を呼び出す前に設定する必要があります。
初期値は作成プロセスから継承されます。
プロセスが終了すると、そのデーモンの子プロセスをすべて終了しようとします。
デーモンプロセスは子プロセスを作成できないことに注意してください。 そうしないと、デーモンプロセスは、親プロセスが終了したときに終了した場合、子プロセスを孤立させたままにします。 さらに、これらは not Unixデーモンまたはサービスであり、非デーモンプロセスが終了した場合に終了する(および参加しない)通常のプロセスです。
threading.Thread APIに加えて、 Process オブジェクトは次の属性とメソッドもサポートします。
- pid
プロセスIDを返します。 プロセスが生成される前は、これは
None
になります。
- exitcode
子供の終了コード。 プロセスがまだ終了していない場合、これは
None
になります。 負の値 -N は、子がシグナル N によって終了したことを示します。
- authkey
プロセスの認証キー(バイト文字列)。
マルチプロセッシングが初期化されると、メインプロセスには os.urandom()を使用してランダムな文字列が割り当てられます。
Process オブジェクトが作成されると、その親プロセスの認証キーを継承しますが、 authkey を別のバイト文字列に設定することで変更できます。
認証キーを参照してください。
- terminate()
プロセスを終了します。 Unixでは、これは
SIGTERM
信号を使用して行われます。 WindowsではTerminateProcess()
が使用されます。 なお、exitハンドラやfinally句などは実行されません。プロセスの子孫プロセスは終了せず終了しないことに注意してください-それらは単に孤立します。
警告
関連するプロセスがパイプまたはキューを使用しているときにこのメソッドを使用すると、パイプまたはキューが破損する可能性があり、他のプロセスで使用できなくなる可能性があります。 同様に、プロセスがロックまたはセマフォなどを取得した場合。 その後、それを終了すると、他のプロセスがデッドロックする可能性があります。
start()、 join()、 is_alive()、 terminate()、 exitcode に注意してください。メソッドは、プロセスオブジェクトを作成したプロセスによってのみ呼び出される必要があります。
Process のいくつかのメソッドの使用例:
>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print p, p.is_alive() <Process(Process-1, initial)> False >>> p.start() >>> print p, p.is_alive() <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print p, p.is_alive() <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.BufferTooShort
指定されたバッファオブジェクトが小さすぎてメッセージを読み取れない場合に、 Connection.recv_bytes_into()によって発生する例外。
e
が BufferTooShort のインスタンスである場合、e.args[0]
はメッセージをバイト文字列として提供します。
16.6.2.2。 パイプとキュー
複数のプロセスを使用する場合、通常、プロセス間の通信にメッセージパッシングを使用し、ロックなどの同期プリミティブを使用する必要がなくなります。
メッセージを渡すために、 Pipe()(2つのプロセス間の接続用)またはキュー(複数のプロデューサーとコンシューマーを許可する)を使用できます。
Queue 、 multiprocessing.queues.SimpleQueue 、および JoinableQueue タイプは、 Queue.Queue をモデルにしたマルチプロデューサーおよびマルチコンシューマーFIFOキューです。 ]標準ライブラリのクラス。 Queue にはPython2.5の Queue.Queue クラスに導入された task_done()メソッドと join()メソッドがないという点で異なります。
JoinableQueue を使用する場合は、キューから削除されたタスクごとに JoinableQueue.task_done()を呼び出す必要があります。そうでない場合は、未完了のタスクの数をカウントするために使用されるセマフォを呼び出す必要があります。最終的にオーバーフローし、例外が発生する可能性があります。
マネージャオブジェクトを使用して共有キューを作成することもできることに注意してください。マネージャを参照してください。
ノート
マルチプロセッシングは、通常の Queue.Empty および Queue.Full 例外を使用してタイムアウトを通知します。 マルチプロセッシング名前空間では使用できないため、キューからインポートする必要があります。
ノート
オブジェクトがキューに入れられると、オブジェクトはピクルされ、バックグラウンドスレッドは後でピクルされたデータを基になるパイプにフラッシュします。 これには少し驚くべき結果がありますが、実際的な問題は発生しないはずです。本当に気になる場合は、代わりに manager で作成されたキューを使用できます。
- オブジェクトを空のキューに置いた後、キューの empty()メソッドが False を返し、 get_nowait()が[ X197X] Queue.Empty 。
- 複数のプロセスがオブジェクトをキューに入れている場合、オブジェクトがもう一方の端で順不同で受信される可能性があります。 ただし、同じプロセスによってキューに入れられたオブジェクトは、常に相互に期待される順序になります。
警告
プロセスが Queue を使用しようとしているときに、 Process.terminate()または os.kill()を使用してプロセスが強制終了された場合、キュー内のデータ破損する可能性があります。 これにより、他のプロセスが後でキューを使用しようとしたときに例外が発生する可能性があります。
警告
上記のように、子プロセスがアイテムをキューに入れている場合( JoinableQueue.cancel_join_thread を使用していない場合)、そのプロセスは、バッファリングされたすべてのアイテムがパイプにフラッシュされるまで終了しません。
つまり、そのプロセスに参加しようとすると、キューに入れられたすべてのアイテムが消費されたことが確実でない限り、デッドロックが発生する可能性があります。 同様に、子プロセスが非デーモンである場合、親プロセスは、すべての非デーモンの子に参加しようとすると、終了時にハングする可能性があります。
マネージャーを使用して作成されたキューにはこの問題がないことに注意してください。 プログラミングガイドラインを参照してください。
プロセス間通信でのキューの使用例については、例を参照してください。
- multiprocessing.Pipe([duplex])
パイプの端を表す Connection オブジェクトのペア
(conn1, conn2)
を返します。デュプレックスが
True
(デフォルト)の場合、パイプは双方向です。 デュプレックスがFalse
の場合、パイプは単方向です。conn1
はメッセージの受信にのみ使用でき、conn2
はメッセージの送信にのみ使用できます。
- class multiprocessing.Queue([maxsize])
パイプといくつかのロック/セマフォを使用して実装されたプロセス共有キューを返します。 プロセスが最初にアイテムをキューに入れると、フィーダースレッドが開始され、オブジェクトがバッファーからパイプに転送されます。
標準ライブラリの Queue モジュールからの通常の Queue.Empty および Queue.Full 例外は、タイムアウトを通知するために発生します。
Queue は、 task_done()と join()を除く、 Queue.Queue のすべてのメソッドを実装します。
- qsize()
キューのおおよそのサイズを返します。 マルチスレッド/マルチプロセッシングのセマンティクスのため、この数は信頼できません。
これにより、
sem_getvalue()
が実装されていないMacOSXなどのUnixプラットフォームでNotImplementedError
が発生する可能性があることに注意してください。
- empty()
キューが空の場合は
True
を返し、それ以外の場合はFalse
を返します。 マルチスレッド/マルチプロセッシングのセマンティクスのため、これは信頼できません。
- full()
キューがいっぱいの場合は
True
を返し、それ以外の場合はFalse
を返します。 マルチスレッド/マルチプロセッシングのセマンティクスのため、これは信頼できません。
- put(obj[, block[, timeout]])
objをキューに入れます。 オプションの引数 block が
True
(デフォルト)で timeout がNone
(デフォルト)の場合、必要に応じて空きスロットがなくなるまでブロックします。利用可能。 timeout が正の数の場合、最大で timeout 秒をブロックし、その時間内に使用可能な空きスロットがない場合は Queue.Full 例外を発生させます。 それ以外の場合(ブロックはFalse
)、空きスロットがすぐに利用できる場合はアイテムをキューに入れ、それ以外の場合は Queue.Full 例外(タイムアウト)を発生させますその場合、は無視されます)。
- put_nowait(obj)
put(obj, False)
と同等です。
- get([block[, timeout]])
キューからアイテムを削除して返します。 オプションの引数 block が
True
(デフォルト)で timeout がNone
(デフォルト)の場合、アイテムが使用可能になるまで必要に応じてブロックします。 timeout が正の数の場合、最大で timeout 秒をブロックし、その時間内に使用可能なアイテムがない場合は Queue.Empty 例外を発生させます。 それ以外の場合(ブロックはFalse
)、アイテムがすぐに利用できる場合はアイテムを返します。それ以外の場合は、 Queue.Empty 例外を発生させます(その場合、 timeout は無視されます)。
- get_nowait()
get(False)
と同等です。
Queue には、 Queue.Queue にはない追加のメソッドがいくつかあります。 これらのメソッドは通常、ほとんどのコードでは不要です。
- close()
現在のプロセスによってこのキューにこれ以上データが配置されないことを示します。 バックグラウンドスレッドは、バッファリングされたすべてのデータをパイプにフラッシュすると終了します。 これは、キューがガベージコレクションされるときに自動的に呼び出されます。
- join_thread()
バックグラウンドスレッドに参加します。 これは、 close()が呼び出された後にのみ使用できます。 バックグラウンドスレッドが終了するまでブロックし、バッファ内のすべてのデータがパイプにフラッシュされたことを確認します。
デフォルトでは、プロセスがキューの作成者でない場合、終了時にキューのバックグラウンドスレッドへの参加を試みます。 プロセスは cancel_join_thread()を呼び出して、 join_thread()が何もしないようにすることができます。
- cancel_join_thread()
join_thread()がブロックされないようにします。 特に、これにより、プロセスの終了時にバックグラウンドスレッドが自動的に結合されるのを防ぎます。 join_thread()を参照してください。
このメソッドのより適切な名前は
allow_exit_without_flush()
かもしれません。 キューに入れられたデータが失われる可能性があり、ほぼ確実に使用する必要はありません。 エンキューされたデータを基になるパイプにフラッシュするのを待たずに現在のプロセスをすぐに終了する必要があり、失われたデータを気にしない場合にのみ、実際に存在します。
ノート
このクラスの機能には、ホストオペレーティングシステム上で機能する共有セマフォの実装が必要です。 これがないと、このクラスの機能が無効になり、キューをインスタンス化しようとすると、
ImportError
になります。 詳細については、:issue: `3770` を参照してください。 以下にリストされている特殊なキュータイプのいずれにも同じことが当てはまります。
- class multiprocessing.queues.SimpleQueue
これは単純化されたキュータイプであり、ロックされたパイプに非常に近いものです。
- empty()
キューが空の場合は
True
を返し、それ以外の場合はFalse
を返します。
- get()
キューからアイテムを削除して返します。
- put(item)
item をキューに入れます。
- class multiprocessing.JoinableQueue([maxsize])
JoinableQueue 、 Queue サブクラスは、 task_done()および join()メソッドが追加されたキューです。
- task_done()
以前にキューに入れられたタスクが完了したことを示します。 キューコンシューマスレッドによって使用されます。 タスクのフェッチに使用される get()ごとに、 task_done()への後続の呼び出しは、タスクの処理が完了したことをキューに通知します。
join()が現在ブロックしている場合、すべてのアイテムが処理されると再開されます(つまり、 putされたすべてのアイテムに対して task_done()呼び出しが受信されました()をキューに入れます)。
キューに配置されたアイテムよりも多く呼び出された場合、
ValueError
を発生させます。
- join()
キュー内のすべてのアイテムが取得および処理されるまでブロックします。
アイテムがキューに追加されるたびに、未完了のタスクの数が増えます。 コンシューマースレッドが task_done()を呼び出して、アイテムが取得され、そのアイテムに対するすべての作業が完了したことを示すたびに、カウントが減少します。 未完了のタスクの数がゼロになると、 join()のブロックが解除されます。
16.6.2.3。 その他
- multiprocessing.active_children()
現在のプロセスのすべての生きている子のリストを返します。
これを呼び出すと、すでに終了しているプロセスに「参加」するという副作用があります。
- multiprocessing.cpu_count()
- システム内のCPUの数を返します。
NotImplementedError
を上げる可能性があります。
- multiprocessing.current_process()
現在のプロセスに対応する Process オブジェクトを返します。
- multiprocessing.freeze_support()
マルチプロセッシングを使用するプログラムがフリーズしてWindows実行可能ファイルを生成する場合のサポートを追加します。 ( py2exe 、 PyInstaller 、 cx_Freeze でテスト済みです。)
この関数は、メインモジュールの
if __name__ == '__main__'
行の直後に呼び出す必要があります。 例えば:from multiprocessing import Process, freeze_support def f(): print 'hello world!' if __name__ == '__main__': freeze_support() Process(target=f).start()
freeze_support()
行が省略されている場合、フリーズされた実行可能ファイルを実行しようとすると、RuntimeError
が発生します。freeze_support()
を呼び出しても、Windows以外のオペレーティングシステムで呼び出されても効果はありません。 さらに、モジュールがWindows上のPythonインタープリターによって正常に実行されている場合(プログラムがフリーズされていない場合)、freeze_support()
は効果がありません。
- multiprocessing.set_executable()
子プロセスを開始するときに使用するPythonインタープリターのパスを設定します。 (デフォルトでは、 sys.executable が使用されます)。 埋め込み者はおそらく次のようなことをする必要があります
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
子プロセスを作成する前に。 (Windowsのみ)
16.6.2.4。 接続オブジェクト
接続オブジェクトを使用すると、選択可能なオブジェクトまたは文字列を送受信できます。 それらは、メッセージ指向の接続されたソケットと考えることができます。
接続オブジェクトは通常、パイプを使用して作成されます。リスナーとクライアントも参照してください。
- class Connection
- send(obj)
recv()を使用して読み取る必要があるオブジェクトを接続のもう一方の端に送信します。
オブジェクトは選択可能である必要があります。 非常に大きなピクルス(OSによって異なりますが、約32 MB +)は、
ValueError
例外を発生させる可能性があります。
- recv()
send()を使用して、接続のもう一方の端から送信されたオブジェクトを返します。 受け取るものができるまでブロックします。 受信するものがなく、もう一方の端が閉じている場合は、
EOFError
を発生させます。
- fileno()
接続で使用されるファイル記述子またはハンドルを返します。
- close()
接続を閉じます。
これは、接続がガベージコレクションされるときに自動的に呼び出されます。
- poll([timeout])
読み取ることができるデータがあるかどうかを返します。
timeout が指定されていない場合、すぐに戻ります。 timeout が数値の場合、これはブロックする最大時間を秒単位で指定します。 timeout が
None
の場合、無限タイムアウトが使用されます。
- send_bytes(buffer[, offset[, size]])
バッファインターフェイスをサポートするオブジェクトからバイトデータを完全なメッセージとして送信します。
offset が指定されている場合、データは buffer のその位置から読み取られます。 size が指定されている場合、その数のバイトがバッファーから読み取られます。 非常に大きなバッファ(OSによって異なりますが、約32 MB +)は、
ValueError
例外を発生させる可能性があります。
- recv_bytes([maxlength])
接続のもう一方の端から送信されたバイトデータの完全なメッセージを文字列として返します。 受け取るものができるまでブロックします。 受信するものがなく、もう一方の端が閉じている場合は、
EOFError
を発生させます。maxlength が指定されていて、メッセージが maxlength より長い場合、
IOError
が発生し、接続が読み取れなくなります。
- recv_bytes_into(buffer[, offset])
接続のもう一方の端から送信されたバイトデータの完全なメッセージをバッファに読み込み、メッセージのバイト数を返します。 受け取るものができるまでブロックします。 受信するものがなく、もう一方の端が閉じている場合は、
EOFError
を発生させます。buffer は、書き込み可能なバッファインターフェイスを満たすオブジェクトである必要があります。 offset が指定されている場合、メッセージはその位置からバッファーに書き込まれます。 オフセットは、バッファの長さ(バイト単位)未満の負でない整数である必要があります。
バッファが短すぎる場合、
BufferTooShort
例外が発生し、完全なメッセージはe.args[0]
として利用できます。ここで、e
は例外インスタンスです。
例えば:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()メソッドは、受信したデータの選択を自動的に解除します。これは、メッセージを送信したプロセスを信頼できない限り、セキュリティリスクになる可能性があります。
したがって、接続オブジェクトがPipe()
を使用して生成された場合を除き、何らかの認証を実行した後は、 recv()および send()メソッドのみを使用する必要があります。 認証キーを参照してください。
警告
プロセスがパイプの読み取りまたは書き込みを試みているときにプロセスが強制終了されると、メッセージの境界がどこにあるかを確認できなくなる可能性があるため、パイプ内のデータが破損する可能性があります。
16.6.2.5。 同期プリミティブ
一般に、同期プリミティブは、マルチスレッドプログラムの場合ほどマルチプロセスプログラムでは必要ありません。 スレッドモジュールのドキュメントを参照してください。
マネージャーオブジェクトを使用して同期プリミティブを作成することもできることに注意してください。 Managers を参照してください。
- class multiprocessing.BoundedSemaphore([value])
制限付きセマフォオブジェクト: threading.BoundedSemaphore の類似物。
その類似物との唯一の違いが存在します。その
acquire
メソッドの最初の引数は block という名前で、と一致するように、オプションの2番目の引数 timeout をサポートします。 Lock.acquire()。ノート
Mac OS Xでは、
sem_getvalue()
がそのプラットフォームに実装されていないため、これはセマフォと区別できません。
- class multiprocessing.Condition([lock])
条件変数: threading.Condition のクローン。
lock が指定されている場合は、 multiprocessing の Lock または RLock オブジェクトである必要があります。
- class multiprocessing.Event
threading.Event のクローン。 このメソッドは、終了時に内部セマフォの状態を返すため、タイムアウトが指定されて操作がタイムアウトした場合を除いて、常に
True
を返します。バージョン2.7で変更:以前は、メソッドは常に
None
を返していました。
- class multiprocessing.Lock
非再帰的ロックオブジェクト: threading.Lock の類似物。 プロセスまたはスレッドがロックを取得すると、それ以降、プロセスまたはスレッドからロックを取得しようとすると、ロックが解放されるまでブロックされます。 どのプロセスまたはスレッドでもそれを解放できます。 スレッドに適用される threading.Lock の概念と動作は、特に明記されていない限り、プロセスまたはスレッドのいずれかに適用される multiprocessing.Lock に複製されます。
Lock は実際には、デフォルトのコンテキストで初期化された
multiprocessing.synchronize.Lock
のインスタンスを返すファクトリ関数であることに注意してください。Lock は、 context manager プロトコルをサポートしているため、 with ステートメントで使用できます。
- acquire(block=True, timeout=None)
ロック、ブロッキング、または非ブロッキングを取得します。
block 引数を
True
(デフォルト)に設定すると、メソッド呼び出しはロックがロック解除状態になるまでブロックし、次にロックに設定してTrue
を返します。 。 この最初の引数の名前は、 threading.Lock.acquire()の名前とは異なることに注意してください。block 引数を
False
に設定すると、メソッド呼び出しはブロックされません。 ロックが現在ロック状態にある場合は、False
を返します。 それ以外の場合は、ロックをロック状態に設定し、True
を返します。timeout の正の浮動小数点値で呼び出された場合、ロックを取得できない限り、 timeout で指定された最大秒数の間ブロックします。 timeout の値が負の呼び出しは、 timeout がゼロの場合と同等です。 timeout の値が
None
(デフォルト)の呼び出しでは、タイムアウト期間が無限に設定されます。 timeout 引数は、 block 引数がFalse
に設定されているため無視される場合、実際的な影響はありません。 ロックが取得されている場合はTrue
を返し、タイムアウト期間が経過している場合はFalse
を返します。 timeout 引数は、このメソッドのアナログである threading.Lock.acquire()には存在しないことに注意してください。
- release()
ロックを解除します。 これは、最初にロックを取得したプロセスまたはスレッドだけでなく、任意のプロセスまたはスレッドから呼び出すことができます。
動作は threading.Lock.release()と同じですが、ロック解除されたロックで呼び出されると、
ValueError
が発生する点が異なります。
- class multiprocessing.RLock
再帰的ロックオブジェクト: threading.RLock の類似物。 再帰ロックは、それを取得したプロセスまたはスレッドによって解放される必要があります。 プロセスまたはスレッドが再帰ロックを取得すると、同じプロセスまたはスレッドがブロックせずに再度取得できます。 そのプロセスまたはスレッドは、取得されるたびに1回解放する必要があります。
RLock は実際には、デフォルトのコンテキストで初期化された
multiprocessing.synchronize.RLock
のインスタンスを返すファクトリ関数であることに注意してください。RLock は、コンテキストマネージャープロトコルをサポートしているため、 with ステートメントで使用できます。
- acquire(block=True, timeout=None)
ロック、ブロッキング、または非ブロッキングを取得します。
block 引数を
True
に設定して呼び出された場合、ロックが現在のプロセスによってすでに所有されていない限り、ロックがロック解除状態(プロセスまたはスレッドによって所有されていない)になるまでブロックします。スレッド。 次に、現在のプロセスまたはスレッドがロックの所有権を取得し(まだ所有権がない場合)、ロック内の再帰レベルが1ずつ増加し、True
の戻り値になります。 threading.RLock.acquire()の実装と比較して、この最初の引数の動作には、引数自体の名前から始まるいくつかの違いがあることに注意してください。block 引数を
False
に設定して呼び出す場合は、ブロックしないでください。 ロックがすでに別のプロセスまたはスレッドによって取得されている(したがって所有されている)場合、現在のプロセスまたはスレッドは所有権を取得せず、ロック内の再帰レベルは変更されないため、戻り値は [X233Xになります。 ]。 ロックがロック解除状態の場合、現在のプロセスまたはスレッドが所有権を取得し、再帰レベルがインクリメントされ、True
の戻り値になります。timeout 引数の使用法と動作は、 Lock.acquire()の場合と同じです。 timeout 引数は、このメソッドのアナログである threading.RLock.acquire()には存在しないことに注意してください。
- release()
ロックを解除し、再帰レベルを下げます。 デクリメント後に再帰レベルがゼロの場合は、ロックをロック解除にリセットし(プロセスまたはスレッドが所有していない)、ロックがロック解除されるのを待って他のプロセスまたはスレッドがブロックされている場合は、そのうちの1つだけを続行します。 デクリメント後も再帰レベルがゼロ以外の場合、ロックはロックされたままで、呼び出し元のプロセスまたはスレッドによって所有されます。
このメソッドは、呼び出し元のプロセスまたはスレッドがロックを所有している場合にのみ呼び出してください。
AssertionError
は、このメソッドが所有者以外のプロセスまたはスレッドによって呼び出された場合、またはロックがロック解除(所有されていない)状態にある場合に発生します。 この状況で発生する例外のタイプは、 threading.RLock.release()で実装されている動作とは異なることに注意してください。
- class multiprocessing.Semaphore([value])
セマフォオブジェクト: threading.Semaphore の類似物。
その類似物との唯一の違いが存在します。その
acquire
メソッドの最初の引数は block という名前で、と一致するように、オプションの2番目の引数 timeout をサポートします。 Lock.acquire()。
ノート
BoundedSemaphore 、 Lock 、 RLock 、 Semaphore のacquire()
メソッドには、 スレッド。 署名はacquire(block=True, timeout=None)
で、キーワードパラメータを使用できます。 block がTrue
で、 timeout がNone
でない場合、タイムアウトを秒単位で指定します。 ブロックがFalse
の場合、タイムアウトは無視されます。
Mac OS Xでは、sem_timedwait
はサポートされていないため、タイムアウトを指定してacquire()
を呼び出すと、スリープループを使用してその関数の動作がエミュレートされます。
ノート
Ctrl-C によって生成されたSIGINTシグナルが、BoundedSemaphore.acquire()
、 Lock.acquire()、 RLockの呼び出しによってメインスレッドがブロックされている間に到着した場合。 Acquisition()、Semaphore.acquire()
、Condition.acquire()
、またはCondition.wait()
の場合、呼び出しはすぐに中断され、KeyboardInterrupt
が発生します。
これは、同等のブロッキング呼び出しの進行中にSIGINTが無視される threading の動作とは異なります。
ノート
このパッケージの機能の一部には、ホストオペレーティングシステム上で機能する共有セマフォの実装が必要です。 これがないと、multiprocessing.synchronize
モジュールが無効になり、インポートしようとするとImportError
になります。 詳細については、:issue: `3770` を参照してください。
16.6.2.7。 マネージャー
マネージャーは、異なるプロセス間で共有できるデータを作成する方法を提供します。 マネージャーオブジェクトは、共有オブジェクトを管理するサーバープロセスを制御します。 他のプロセスは、プロキシを使用して共有オブジェクトにアクセスできます。
- multiprocessing.Manager()
- プロセス間でオブジェクトを共有するために使用できる、開始された SyncManager オブジェクトを返します。 返されるマネージャーオブジェクトは、生成された子プロセスに対応し、共有オブジェクトを作成して対応するプロキシを返すメソッドを持っています。
Managerプロセスは、ガベージコレクションが行われるか、親プロセスが終了するとすぐにシャットダウンされます。 マネージャークラスは、 multiprocessing.managers モジュールで定義されています。
- class multiprocessing.managers.BaseManager([address[, authkey]])
BaseManagerオブジェクトを作成します。
作成したら、 start()または
get_server().serve_forever()
を呼び出して、マネージャーオブジェクトが開始されたマネージャープロセスを参照していることを確認する必要があります。address は、マネージャープロセスが新しい接続をリッスンするアドレスです。 アドレスが
None
の場合、任意のアドレスが選択されます。authkey は、サーバープロセスへの着信接続の有効性を確認するために使用される認証キーです。 authkey が
None
の場合、current_process().authkey
です。 それ以外の場合は、 authkey が使用され、文字列である必要があります。- start([initializer[, initargs]])
サブプロセスを開始して、マネージャーを開始します。 初期化子が
None
でない場合、サブプロセスは開始時にinitializer(*initargs)
を呼び出します。
- get_server()
Managerの制御下にある実際のサーバーを表す
Server
オブジェクトを返します。Server
オブジェクトは、serve_forever()
メソッドをサポートします。>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey='abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
には、さらにアドレス属性があります。
- connect()
ローカルマネージャオブジェクトをリモートマネージャプロセスに接続します。
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc') >>> m.connect()
- shutdown()
マネージャーが使用するプロセスを停止します。 これは、 start()を使用してサーバープロセスを開始した場合にのみ使用できます。
これは複数回呼び出すことができます。
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
タイプを登録するために使用できる、またはマネージャークラスに呼び出し可能なクラスメソッド。
typeid は、特定のタイプの共有オブジェクトを識別するために使用される「タイプ識別子」です。 これは文字列である必要があります。
callable は、このタイプ識別子のオブジェクトを作成するために使用される呼び出し可能オブジェクトです。
from_address()
クラスメソッドを使用してマネージャーインスタンスを作成する場合、または create_method 引数がFalse
の場合、これはNone
のままにしておくことができます。proxytype は、 BaseProxy のサブクラスであり、この typeid を使用して共有オブジェクトのプロキシを作成するために使用されます。
None
の場合、プロキシクラスが自動的に作成されます。日付付きは、 BaseProxy._callmethod()を使用してこのtypeidのプロキシにアクセスを許可するメソッド名のシーケンスを指定するために使用されます。 ( Exposure が
None
の場合、proxytype._exposed_
が使用されます。)公開リストが指定されていない場合、共有オブジェクトのすべての「パブリックメソッド」アクセス可能になります。 (ここで「パブリックメソッド」とは、 __ call __()メソッドを持ち、名前が'_'
で始まらない属性を意味します。)method_to_typeid は、プロキシを返す必要がある公開されたメソッドの戻りタイプを指定するために使用されるマッピングです。 メソッド名をtypeid文字列にマップします。 ( method_to_typeid が
None
の場合、存在する場合は代わりにproxytype._method_to_typeid_
が使用されます。)メソッドの名前がこのマッピングのキーでない場合、またはマッピングがNone
次に、メソッドによって返されるオブジェクトは値によってコピーされます。create_method は、 typeid という名前でメソッドを作成するかどうかを決定します。これを使用して、サーバープロセスに新しい共有オブジェクトを作成し、そのプロキシを返すように指示できます。 デフォルトでは
True
です。
BaseManager インスタンスにも、読み取り専用プロパティが1つあります。
- address
マネージャーが使用するアドレス。
- class multiprocessing.managers.SyncManager
プロセスの同期に使用できる BaseManager のサブクラス。 このタイプのオブジェクトは、
multiprocessing.Manager()
によって返されます。また、共有リストと辞書の作成もサポートしています。
- BoundedSemaphore([value])
共有 threading.BoundedSemaphore オブジェクトを作成し、そのプロキシを返します。
- Condition([lock])
共有 threading.Condition オブジェクトを作成し、そのプロキシを返します。
lock が指定されている場合は、 threading.Lock または threading.RLock オブジェクトのプロキシである必要があります。
- Event()
共有 threading.Event オブジェクトを作成し、そのプロキシを返します。
- Lock()
共有 threading.Lock オブジェクトを作成し、そのプロキシを返します。
- Namespace()
共有名前空間オブジェクトを作成し、そのプロキシを返します。
- Queue([maxsize])
共有 Queue.Queue オブジェクトを作成し、そのプロキシを返します。
- RLock()
共有 threading.RLock オブジェクトを作成し、そのプロキシを返します。
- Semaphore([value])
共有 threading.Semaphore オブジェクトを作成し、そのプロキシを返します。
- Array(typecode, sequence)
配列を作成し、そのプロキシを返します。
- Value(typecode, value)
書き込み可能な
value
属性を持つオブジェクトを作成し、そのプロキシを返します。
- dict()
dict(mapping)
dict(sequence) 共有
dict
オブジェクトを作成し、そのプロキシを返します。
- list()
list(sequence) 共有
list
オブジェクトを作成し、そのプロキシを返します。
ノート
dictおよびlistプロキシの可変値またはアイテムへの変更は、プロキシがその値またはアイテムがいつ変更されたかを知る方法がないため、マネージャを介して伝播されません。 このようなアイテムを変更するには、変更したオブジェクトをコンテナプロキシに再割り当てします。
# create a list proxy and append a mutable object (a dictionary) lproxy = manager.list() lproxy.append({}) # now mutate the dictionary d = lproxy[0] d['a'] = 1 d['b'] = 2 # at this point, the changes to d are not yet synced, but by # reassigning the dictionary, the proxy is notified of the change lproxy[0] = d
- class multiprocessing.managers.Namespace
SyncManager に登録できるタイプ。
名前空間オブジェクトにはパブリックメソッドはありませんが、書き込み可能な属性はあります。 その表現は、その属性の値を示しています。
ただし、名前空間オブジェクトにプロキシを使用する場合、
'_'
で始まる属性はプロキシの属性であり、指示対象の属性ではありません。>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print Global Namespace(x=10, y='hello')
16.6.2.7.1。 カスタマイズされたマネージャー
独自のマネージャーを作成するには、 BaseManager のサブクラスを作成し、 register() classmethodを使用して、新しいタイプまたは呼び出し可能オブジェクトをマネージャークラスに登録します。 例えば:
from multiprocessing.managers import BaseManager
class MathsClass(object):
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
maths = manager.Maths()
print maths.add(4, 3) # prints 7
print maths.mul(7, 8) # prints 56
16.6.2.7.2。 リモートマネージャーの使用
1台のマシンでマネージャーサーバーを実行し、クライアントに他のマシンからそれを使用させることができます(関係するファイアウォールがそれを許可していると仮定します)。
次のコマンドを実行すると、リモートクライアントがアクセスできる単一の共有キュー用のサーバーが作成されます。
>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
1つのクライアントは、次のようにサーバーにアクセスできます。
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
別のクライアントもそれを使用できます:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
ローカルプロセスは、クライアントで上記のコードを使用してキューにリモートアクセスすることにより、そのキューにアクセスすることもできます。
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
16.6.2.8。 プロキシオブジェクト
プロキシは、(おそらく)別のプロセスに存在する共有オブジェクトを参照するオブジェクトです。 共有オブジェクトは、プロキシの指示対象であると言われます。 複数のプロキシオブジェクトが同じ指示対象を持つ場合があります。
プロキシオブジェクトには、その指示対象の対応するメソッドを呼び出すメソッドがあります(ただし、指示対象のすべてのメソッドが必ずしもプロキシを介して利用できるわけではありません)。 プロキシは通常、その指示対象とほぼ同じ方法で使用できます。
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
str()をプロキシに適用すると、指示対象の表現が返されますが、 repr()を適用すると、プロキシの表現が返されることに注意してください。
プロキシオブジェクトの重要な機能は、プロセス間で受け渡すことができるように選択できることです。 ただし、プロキシが対応するマネージャーのプロセスに送信された場合、それを選択解除すると、指示対象自体が生成されることに注意してください。 これは、たとえば、1つの共有オブジェクトに2番目のオブジェクトを含めることができることを意味します。
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print a, b
[[../]] []
>>> b.append('hello')
>>> print a, b
[[../'hello']] ['hello']
ノート
マルチプロセッシングのプロキシタイプは、値による比較をサポートするために何もしません。 したがって、たとえば、次のようになります。
>>> manager.list([1,2,3]) == [1,2,3]
False
比較を行うときは、代わりに指示対象のコピーを使用する必要があります。
- class multiprocessing.managers.BaseProxy
プロキシオブジェクトは、 BaseProxy のサブクラスのインスタンスです。
- _callmethod(methodname[, args[, kwds]])
プロキシのリファレントのメソッドの結果を呼び出して返します。
proxy
が、指示対象がobj
であるプロキシである場合、式proxy._callmethod(methodname, args, kwds)
式を評価します
getattr(obj, methodname)(*args, **kwds)
マネージャーのプロセスで。
戻り値は、呼び出しの結果のコピーまたは新しい共有オブジェクトへのプロキシになります。 BaseManager.register()の method_to_typeid 引数のドキュメントを参照してください。
呼び出しによって例外が発生した場合は、 _callmethod()によって再発生します。 マネージャーのプロセスで他の例外が発生した場合、これは
RemoteError
例外に変換され、 _callmethod()によって発生します。特に、 methodname が exposed されていない場合、例外が発生することに注意してください。
_callmethod()の使用例:
>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getslice__', (2, 7)) # equiv to `l[2:7]` [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equiv to `l[20]` Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()
指示対象のコピーを返します。
指示対象が選択できない場合、これにより例外が発生します。
- __repr__()
プロキシオブジェクトの表現を返します。
- __str__()
指示対象の表現を返します。
16.6.2.8.1。 掃除
プロキシオブジェクトはweakrefコールバックを使用するため、ガベージコレクションが行われると、リファレントを所有するマネージャーから自身の登録が解除されます。
共有オブジェクトを参照するプロキシがなくなると、共有オブジェクトはマネージャプロセスから削除されます。
16.6.2.9。 プロセスプール
Pool
クラスで送信されたタスクを実行するプロセスのプールを作成できます。
- class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
ジョブを送信できるワーカープロセスのプールを制御するプロセスプールオブジェクト。 タイムアウトとコールバックを伴う非同期結果をサポートし、並列マップの実装があります。
processes は、使用するワーカープロセスの数です。 プロセスが
None
の場合、cpu_count()
によって返される番号が使用されます。 イニシャライザーがNone
でない場合、各ワーカープロセスは起動時にinitializer(*initargs)
を呼び出します。プールオブジェクトのメソッドは、プールを作成したプロセスによってのみ呼び出される必要があることに注意してください。
バージョン2.7の新機能: maxtasksperchild は、ワーカープロセスが終了して新しいワーカープロセスに置き換えられる前に完了できるタスクの数であり、未使用のリソースを解放できるようにします。 デフォルトの maxtasksperchild は
None
です。これは、ワーカープロセスがプールと同じくらい存続することを意味します。ノート
Pool
内のワーカープロセスは、通常、プールのワークキューの全期間にわたって存続します。 他のシステム(Apache、mod_wsgiなど)でよく見られるパターンは、ワーカーが保持するリソースを解放するために、プール内のワーカーが終了、クリーンアップ、新しいプロセスの生成を行う前に、設定された量の作業のみを完了できるようにすることです。古いものを交換します。Pool
の maxtasksperchild 引数は、この機能をエンドユーザーに公開します。- apply(func[, args[, kwds]])
apply()組み込み関数と同等です。 結果の準備ができるまでブロックされるため、 apply_async()は作業を並行して実行するのに適しています。 さらに、 func は、プールのワーカーの1つでのみ実行されます。
- apply_async(func[, args[, kwds[, callback]]])
結果オブジェクトを返す apply()メソッドのバリアント。
callback が指定されている場合、それは単一の引数を受け入れる呼び出し可能である必要があります。 結果の準備が整うと、コールバックが適用されます(呼び出しが失敗した場合を除く)。 コールバックはすぐに完了する必要があります。そうしないと、結果を処理するスレッドがブロックされます。
- map(func, iterable[, chunksize])
map()組み込み関数と同等の並列機能です(ただし、 iterable 引数は1つしかサポートしていません)。 結果の準備ができるまでブロックします。
このメソッドは、イテラブルをいくつかのチャンクに分割し、個別のタスクとしてプロセスプールに送信します。 これらのチャンクの(おおよその)サイズは、 chunksize を正の整数に設定することで指定できます。
- map_async(func, iterable[, chunksize[, callback]])
結果オブジェクトを返す map()メソッドのバリアント。
callback が指定されている場合、それは単一の引数を受け入れる呼び出し可能である必要があります。 結果の準備が整うと、コールバックが適用されます(呼び出しが失敗した場合を除く)。 コールバックはすぐに完了する必要があります。そうしないと、結果を処理するスレッドがブロックされます。
- imap(func, iterable[, chunksize])
itertools.imap()と同等です。
chunksize 引数は、 map()メソッドで使用される引数と同じです。 chunksize に大きな値を使用する非常に長い反復可能オブジェクトの場合、デフォルト値の
1
を使用するよりもはるかに速くジョブを完了することができます。また、 chunksize が
1
の場合、 imap()メソッドによって返されるイテレーターのnext()
メソッドには、オプションの timeout があります。 ]パラメータ:タイムアウト秒以内に結果を返すことができない場合、next(timeout)
はmultiprocessing.TimeoutError
を発生させます。
- imap_unordered(func, iterable[, chunksize])
imap()と同じですが、返されるイテレータからの結果の順序は任意であると見なす必要があります。 (ワーカープロセスが1つしかない場合にのみ、順序が「正しい」ことが保証されます。)
- close()
これ以上タスクがプールに送信されないようにします。 すべてのタスクが完了すると、ワーカープロセスは終了します。
- terminate()
未処理の作業を完了せずに、ワーカープロセスをただちに停止します。 プールオブジェクトがガベージコレクションされると、 terminate()がすぐに呼び出されます。
- join()
ワーカープロセスが終了するのを待ちます。 join()を使用する前に、 close()または terminate()を呼び出す必要があります。
- class multiprocessing.pool.AsyncResult
Pool.apply_async()
およびPool.map_async()
によって返される結果のクラス。- get([timeout])
到着時に結果を返します。 timeout が
None
でなく、結果が timeout 秒以内に到着しない場合、multiprocessing.TimeoutError
が発生します。 リモート呼び出しで例外が発生した場合、その例外は get()によって再発生します。
- wait([timeout])
結果が利用可能になるまで、または timeout 秒が経過するまで待ちます。
- ready()
通話が完了したかどうかを返します。
- successful()
例外を発生させずに呼び出しが完了したかどうかを返します。 結果の準備ができていない場合、
AssertionError
を上げます。
次の例は、プールの使用法を示しています。
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises multiprocessing.TimeoutError
16.6.2.10。 リスナーとクライアント
通常、プロセス間でのメッセージパッシングは、キューを使用するか、 Pipe()によって返される Connection オブジェクトを使用して行われます。
ただし、 multiprocessing.connection モジュールを使用すると、柔軟性がさらに高まります。 基本的に、ソケットまたはWindows名前付きパイプを処理するための高レベルのメッセージ指向APIを提供し、 hmac モジュールを使用したダイジェスト認証もサポートします。
- multiprocessing.connection.deliver_challenge(connection, authkey)
ランダムに生成されたメッセージを接続のもう一方の端に送信し、応答を待ちます。
応答が authkey をキーとして使用してメッセージのダイジェストと一致する場合、ウェルカムメッセージが接続のもう一方の端に送信されます。 それ以外の場合は、 AuthenticationError が発生します。
- multiprocessing.connection.answer_challenge(connection, authkey)
メッセージを受信し、 authkey をキーとしてメッセージのダイジェストを計算してから、ダイジェストを送り返します。
ウェルカムメッセージが受信されない場合、 AuthenticationError が発生します。
- multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])
アドレスアドレスを使用しているリスナーへの接続をセットアップしようとし、接続を返します。
接続のタイプはファミリ引数によって決定されますが、通常はアドレスの形式から推測できるため、通常は省略できます。 (アドレス形式を参照)
authenticate が
True
の場合、または authkey が文字列の場合、ダイジェスト認証が使用されます。 authkey がNone
の場合、認証に使用されるキーは authkey またはcurrent_process().authkey)
のいずれかになります。 認証が失敗すると、 AuthenticationError が発生します。 認証キーを参照してください。
- class multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])
接続を「リッスン」している、バインドされたソケットまたはWindowsの名前付きパイプのラッパー。
address は、リスナーオブジェクトのバインドされたソケットまたは名前付きパイプによって使用されるアドレスです。
ノート
'0.0.0.0'のアドレスが使用されている場合、そのアドレスはWindowsで接続可能なエンドポイントにはなりません。 接続可能なエンドポイントが必要な場合は、「127.0.0.1」を使用する必要があります。
family は、使用するソケット(または名前付きパイプ)のタイプです。 これは、文字列
'AF_INET'
(TCPソケットの場合)、'AF_UNIX'
(Unixドメインソケットの場合)、または'AF_PIPE'
(Windows名前付きパイプの場合)のいずれかになります。 これらのうち、最初のものだけが利用可能であることが保証されています。 ファミリがNone
の場合、ファミリはアドレスの形式から推測されます。 アドレスもNone
の場合、デフォルトが選択されます。 このデフォルトは、利用可能な最速であると想定されるファミリです。 アドレス形式を参照してください。 ファミリが'AF_UNIX'
で、アドレスがNone
の場合、ソケットは tempfile.mkstemp()を使用して作成されたプライベート一時ディレクトリに作成されることに注意してください。 。リスナーオブジェクトがソケットを使用する場合、バインドされると、 backlog (デフォルトでは1)がソケットの listen()メソッドに渡されます。
authenticate が
True
(デフォルトではFalse
)であるか、 authkey がNone
でない場合、ダイジェスト認証が使用されます。authkey が文字列の場合、認証キーとして使用されます。 それ以外の場合は、
None
である必要があります。authkey が
None
で、 authenticate がTrue
の場合、current_process().authkey
が認証キーとして使用されます。 authkey がNone
で、 authenticate がFalse
の場合、認証は行われません。 認証が失敗すると、 AuthenticationError が発生します。 認証キーを参照してください。- accept()
リスナーオブジェクトのバインドされたソケットまたは名前付きパイプで接続を受け入れ、 Connection オブジェクトを返します。 認証が試行されて失敗した場合、
AuthenticationError
が発生します。
- close()
リスナーオブジェクトのバインドされたソケットまたは名前付きパイプを閉じます。 これは、リスナーがガベージコレクションされるときに自動的に呼び出されます。 ただし、明示的に呼び出すことをお勧めします。
リスナーオブジェクトには、次の読み取り専用プロパティがあります。
- address
Listenerオブジェクトによって使用されているアドレス。
- last_accepted
最後に受け入れられた接続の送信元のアドレス。 これが利用できない場合は、
None
です。
このモジュールは、次の例外を定義します。
- exception multiprocessing.connection.ProcessError
- すべてのマルチプロセッシング例外の基本クラス。
- exception multiprocessing.connection.BufferTooShort
- 指定されたバッファオブジェクトが小さすぎてメッセージを読み取れない場合に、 Connection.recv_bytes_into()によって発生する例外。
- exception multiprocessing.connection.AuthenticationError
- 認証エラーが発生したときに発生します。
- exception multiprocessing.connection.TimeoutError
- タイムアウトの期限が切れたときにタイムアウトのあるメソッドによって発生します。
例
次のサーバーコードは、'secret password'
を認証キーとして使用するリスナーを作成します。 次に、接続を待機し、クライアントにデータを送信します。
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')
conn = listener.accept()
print 'connection accepted from', listener.last_accepted
conn.send([2.25, None, 'junk', float])
conn.send_bytes('hello')
conn.send_bytes(array('i', [42, 1729]))
conn.close()
listener.close()
次のコードはサーバーに接続し、サーバーからデータを受信します。
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
conn = Client(address, authkey='secret password')
print conn.recv() # => [2.25, None, 'junk', float]
print conn.recv_bytes() # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr) # => 8
print arr # => array('i', [42, 1729, 0, 0, 0])
conn.close()
16.6.2.10.1。 アドレス形式
'AF_INET'
アドレスは、(hostname, port)
の形式のタプルです。ここで、 hostname は文字列、 port は整数です。'AF_UNIX'
アドレスは、ファイルシステム上のファイル名を表す文字列です。- *;
'AF_PIPE'
アドレスは次の形式の文字列ですr'\.\pipe{PipeName}'
。 Client()を使用して ServerName という名前のリモートコンピューター上の名前付きパイプに接続するには、代わりにr'\ServerName\pipe{PipeName}'
の形式のアドレスを使用する必要があります。
2つの円記号で始まる文字列は、デフォルトでは'AF_UNIX'
アドレスではなく'AF_PIPE'
アドレスであると見なされることに注意してください。
16.6.2.11。 認証キー
Connection.recv()を使用すると、受信したデータは自動的に選択解除されます。 残念ながら、信頼できないソースからデータを選択解除することはセキュリティリスクです。 したがって、 Listener および Client()は、 hmac モジュールを使用してダイジェスト認証を提供します。
認証キーは、パスワードと見なすことができる文字列です。接続が確立されると、両端は、相手が認証キーを知っていることの証明を要求します。 (両端が同じキーを使用していることを示すには、接続を介してキーを送信する必要はありませんではありません。)
認証が要求されたが認証キーが指定されていない場合、current_process().authkey
の戻り値が使用されます( Process を参照)。 この値は、現在のプロセスが作成する Process オブジェクトによって自動的に継承されます。 これは、(デフォルトで)マルチプロセスプログラムのすべてのプロセスが、それらの間の接続を設定するときに使用できる単一の認証キーを共有することを意味します。
os.urandom()を使用して、適切な認証キーを生成することもできます。
16.6.2.12。 ロギング
ロギングのサポートが利用可能です。 ただし、 logging パッケージはプロセス共有ロックを使用しないため、(ハンドラーの種類によっては)異なるプロセスからのメッセージが混同される可能性があることに注意してください。
- multiprocessing.get_logger()
マルチプロセッシングで使用されるロガーを返します。 必要に応じて、新しいものが作成されます。
最初に作成されたとき、ロガーにはレベル
logging.NOTSET
があり、デフォルトのハンドラーはありません。 このロガーに送信されたメッセージは、デフォルトではルートロガーに伝播されません。Windowsでは、子プロセスは親プロセスのロガーのレベルのみを継承することに注意してください。ロガーの他のカスタマイズは継承されません。
- multiprocessing.log_to_stderr()
- この関数は get_logger()の呼び出しを実行しますが、get_loggerによって作成されたロガーを返すことに加えて、フォーマット
'[%(levelname)s/%(processName)s] %(message)s'
を使用して sys.stderr に出力を送信するハンドラーを追加します。 ]。
以下は、ロギングがオンになっているセッションの例です。
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
これらの2つのロギング機能に加えて、マルチプロセッシングは2つの追加のロギングレベル属性も公開します。 これらはSUBWARNING
とSUBDEBUG
です。 次の表は、これらが通常のレベル階層のどこに適合するかを示しています。
レベル | 数値 |
---|---|
SUBWARNING
|
25 |
SUBDEBUG
|
5 |
ロギングレベルの完全な表については、 logging モジュールを参照してください。
これらの追加のログレベルは、主にマルチプロセッシングモジュール内の特定のデバッグメッセージに使用されます。 以下は、SUBDEBUG
が有効になっていることを除いて、上記と同じ例です。
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0
16.6.2.13。 NS multiprocessing.dummy モジュール
multiprocessing.dummy は、 multiprocessing のAPIを複製しますが、 threading モジュールのラッパーにすぎません。
16.6.3。 プログラミングガイドライン
マルチプロセッシングを使用する際に遵守する必要のある特定のガイドラインとイディオムがあります。
16.6.3.1。 すべてのプラットフォーム
共有状態を避ける
可能な限り、プロセス間で大量のデータをシフトしないようにする必要があります。
threading モジュールの低レベルの同期プリミティブを使用するよりも、プロセス間の通信にキューまたはパイプを使用することをお勧めします。
ピッカビリティ
プロキシのメソッドへの引数が選択可能であることを確認してください。
プロキシのスレッドセーフ
ロックで保護しない限り、複数のスレッドからのプロキシオブジェクトを使用しないでください。
(同じプロキシを使用するさまざまなプロセスで問題が発生することはありません。)
ゾンビプロセスへの参加
Unixでは、プロセスが終了したが参加していない場合、それはゾンビになります。 新しいプロセスが開始される(または active_children()が呼び出される)たびに、まだ参加されていない完了したすべてのプロセスが参加するため、非常に多くなることはありません。 また、終了したプロセスの Process.is_alive を呼び出すとプロセスに参加します。 それでも、開始するすべてのプロセスに明示的に参加することをお勧めします。
ピクルス/アンピクルよりも継承する方が良い
Windowsでは、子プロセスがそれらを使用できるように、マルチプロセッシングの多くのタイプを選択可能にする必要があります。 ただし、通常、パイプまたはキューを使用して共有オブジェクトを他のプロセスに送信することは避けてください。 代わりに、他の場所で作成された共有リソースへのアクセスを必要とするプロセスが祖先プロセスから継承できるようにプログラムを配置する必要があります。
プロセスの終了を回避する
Process.terminate メソッドを使用してプロセスを停止すると、プロセスで現在使用されている共有リソース(ロック、セマフォ、パイプ、キューなど)が壊れたり、他のプロセスで使用できなくなったりする可能性があります。
したがって、共有リソースをまったく使用しないプロセスでのみ Process.terminate の使用を検討するのがおそらく最善です。
キューを使用するプロセスへの参加
アイテムをキューに入れたプロセスは、バッファリングされたすべてのアイテムが「フィーダー」スレッドによって基になるパイプに供給されるまで、終了するまで待機することに注意してください。 (子プロセスは、この動作を回避するために、キューの cancel_join_thread()メソッドを呼び出すことができます。)
つまり、キューを使用するときは常に、プロセスに参加する前に、キューに配置されたすべてのアイテムが最終的に削除されることを確認する必要があります。 そうしないと、アイテムをキューに入れたプロセスが終了するかどうかを確認できません。 非デーモンプロセスは自動的に参加することにも注意してください。
デッドロックが発生する例は次のとおりです。
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()
ここでの修正は、最後の2行を交換することです(または単に
p.join()
行を削除します)。
子プロセスにリソースを明示的に渡す
Unixでは、子プロセスは、グローバルリソースを使用して親プロセスで作成された共有リソースを利用できます。 ただし、子プロセスのコンストラクターに引数としてオブジェクトを渡すことをお勧めします。
コードを(潜在的に)Windowsと互換性を持たせることとは別に、これにより、子プロセスがまだ生きている限り、オブジェクトが親プロセスでガベージコレクションされないことが保証されます。 これは、オブジェクトが親プロセスでガベージコレクションされたときに一部のリソースが解放された場合に重要になる可能性があります。
だから例えば
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()
次のように書き直す必要があります
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
sys.stdin を「ファイルのようなオブジェクト」に置き換えることに注意してください
マルチプロセッシングは元々無条件に呼び出されました:
os.close(sys.stdin.fileno())
multiprocessing.Process._bootstrap()
メソッドの場合—これにより、プロセス内のプロセスで問題が発生しました。 これは次のように変更されました。sys.stdin.close() sys.stdin = open(os.devnull)
これは、プロセスが互いに衝突して不正なファイル記述子エラーが発生するという基本的な問題を解決しますが、 sys.stdin()を出力バッファリングを備えた「ファイルのようなオブジェクト」に置き換えるアプリケーションに潜在的な危険をもたらします。 この危険性は、複数のプロセスがこのファイルのようなオブジェクトに対して close()を呼び出すと、同じデータがオブジェクトに複数回フラッシュされ、破損する可能性があることです。
ファイルのようなオブジェクトを作成して独自のキャッシュを実装する場合は、キャッシュに追加するたびにpidを保存し、pidが変更されたときにキャッシュを破棄することで、フォークセーフにすることができます。 例えば:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
詳細については、:issue: `5155` 、:issue:` 5313` 、および:issue: `5331` を参照してください。
16.6.3.2。 ウィンドウズ
Windowsには os.fork()がないため、いくつかの追加の制限があります。
より多くのピック可能性
Process.__init__()
へのすべての引数が選択可能であることを確認してください。 これは、特に、バインドされたメソッドまたはバインドされていないメソッドをWindowsのtarget
引数として直接使用できないことを意味します。関数を定義し、代わりにそれを使用するだけです。また、 Process をサブクラス化する場合は、 Process.start メソッドが呼び出されたときにインスタンスが選択可能であることを確認してください。
グローバル変数
子プロセスで実行されたコードがグローバル変数にアクセスしようとすると、表示される値(存在する場合)は、 Process.start [の時点での親プロセスの値と同じではない可能性があることに注意してください。 X205X]が呼び出されました。
ただし、モジュールレベルの定数であるグローバル変数は問題を引き起こしません。
メインモジュールの安全なインポート
メインモジュールが、意図しない副作用(新しいプロセスの開始など)を引き起こすことなく、新しいPythonインタープリターによって安全にインポートできることを確認してください。
たとえば、Windowsで実行している次のモジュールは、
RuntimeError
で失敗します。from multiprocessing import Process def foo(): print 'hello' p = Process(target=foo) p.start()
代わりに、次のように
if __name__ == '__main__':
を使用して、プログラムの「エントリポイント」を保護する必要があります。from multiprocessing import Process, freeze_support def foo(): print 'hello' if __name__ == '__main__': freeze_support() p = Process(target=foo) p.start()
(プログラムがフリーズする代わりに正常に実行される場合は、
freeze_support()
行を省略できます。)これにより、新しく生成されたPythonインタープリターがモジュールを安全にインポートし、モジュールの
foo()
関数を実行できるようになります。プールまたはマネージャーがメインモジュールに作成されている場合も、同様の制限が適用されます。
16.6.4。 例
カスタマイズされたマネージャーとプロキシを作成して使用する方法のデモンストレーション:
#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo(object):
def f(self):
print 'you called Foo.f()'
def g(self):
print 'you called Foo.g()'
def _h(self):
print 'you called Foo._h()'
# A simple generator function
def baz():
for i in xrange(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ('next', '__next__')
def __iter__(self):
return self
def next(self):
return self._callmethod('next')
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print '-' * 20
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print '-' * 20
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print '-' * 20
it = manager.baz()
for i in it:
print '<%d>' % i,
print
print '-' * 20
op = manager.operator()
print 'op.add(23, 45) =', op.add(23, 45)
print 'op.pow(2, 94) =', op.pow(2, 94)
print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
print 'op._exposed_ =', op._exposed_
##
if __name__ == '__main__':
freeze_support()
test()
Pool
の使用:
#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def f(x):
return 1.0 / (x-5.0)
def pow3(x):
return x**3
def noop(x):
pass
#
# Test code
#
def test():
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
#
# Create pool
#
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)
print 'pool = %s' % pool
print
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
print
print 'Ordered results using pool.imap():'
for x in imap_it:
print '\t', x
print
print 'Unordered results using pool.imap_unordered():'
for x in imap_unordered_it:
print '\t', x
print
print 'Ordered results using pool.map() --- will block till complete:'
for x in pool.map(calculatestar, TASKS):
print '\t', x
print
#
# Simple benchmarks
#
N = 100000
print 'def pow3(x): return x**3'
t = time.time()
A = map(pow3, xrange(N))
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
B = pool.map(pow3, xrange(N))
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
L = [None] * 1000000
print 'def noop(x): pass'
print 'L = [None] * 1000000'
t = time.time()
A = map(noop, L)
print '\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
B = pool.map(noop, L)
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
del A, B, C, L
#
# Test error handling
#
print 'Testing error handling:'
try:
print pool.apply(f, (5,))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.apply()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print pool.map(f, range(10))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.map()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print list(pool.imap(f, range(10)))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, range(10))
for i in range(10):
try:
x = it.next()
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
print
#
# Testing timeouts
#
print 'Testing ApplyResult.get() with timeout:',
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
print 'Testing IMapIterator.next() with timeout:',
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
#
# Testing callback
#
print 'Testing callback:'
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
if A == B:
print '\tcallbacks succeeded\n'
else:
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
#
# Check close() methods
#
print 'Testing close():'
for worker in pool._pool:
assert worker.is_alive()
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.is_alive()
print '\tclose() succeeded\n'
#
# Check terminate() method
#
print 'Testing terminate():'
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print '\tterminate() succeeded\n'
#
# Check garbage collection
#
print 'Testing garbage collection:'
pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
results = pool = None
time.sleep(DELTA * 2)
for worker in processes:
assert not worker.is_alive()
print '\tgarbage collection succeeded\n'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as multiprocessing
else:
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
raise SystemExit(2)
test()
ロック、条件、キューなどの同期タイプ:
#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, random
from Queue import Empty
import multiprocessing # may get overwritten
#### TEST_VALUE
def value_func(running, mutex):
random.seed()
time.sleep(random.random()*4)
mutex.acquire()
print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
running.value -= 1
mutex.release()
def test_value():
TASKS = 10
running = multiprocessing.Value('i', TASKS)
mutex = multiprocessing.Lock()
for i in range(TASKS):
p = multiprocessing.Process(target=value_func, args=(running, mutex))
p.start()
while running.value > 0:
time.sleep(0.08)
mutex.acquire()
print running.value,
sys.stdout.flush()
mutex.release()
print
print 'No more running processes'
#### TEST_QUEUE
def queue_func(queue):
for i in range(30):
time.sleep(0.5 * random.random())
queue.put(i*i)
queue.put('STOP')
def test_queue():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=queue_func, args=(q,))
p.start()
o = None
while o != 'STOP':
try:
o = q.get(timeout=0.3)
print o,
sys.stdout.flush()
except Empty:
print 'TIMEOUT'
print
#### TEST_CONDITION
def condition_func(cond):
cond.acquire()
print '\t' + str(cond)
time.sleep(2)
print '\tchild is notifying'
print '\t' + str(cond)
cond.notify()
cond.release()
def test_condition():
cond = multiprocessing.Condition()
p = multiprocessing.Process(target=condition_func, args=(cond,))
print cond
cond.acquire()
print cond
cond.acquire()
print cond
p.start()
print 'main is waiting'
cond.wait()
print 'main has woken up'
print cond
cond.release()
print cond
cond.release()
p.join()
print cond
#### TEST_SEMAPHORE
def semaphore_func(sema, mutex, running):
sema.acquire()
mutex.acquire()
running.value += 1
print running.value, 'tasks are running'
mutex.release()
random.seed()
time.sleep(random.random()*2)
mutex.acquire()
running.value -= 1
print '%s has finished' % multiprocessing.current_process()
mutex.release()
sema.release()
def test_semaphore():
sema = multiprocessing.Semaphore(3)
mutex = multiprocessing.RLock()
running = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(target=semaphore_func,
args=(sema, mutex, running))
for i in range(10)
]
for p in processes:
p.start()
for p in processes:
p.join()
#### TEST_JOIN_TIMEOUT
def join_timeout_func():
print '\tchild sleeping'
time.sleep(5.5)
print '\n\tchild terminating'
def test_join_timeout():
p = multiprocessing.Process(target=join_timeout_func)
p.start()
print 'waiting for process to finish'
while 1:
p.join(timeout=1)
if not p.is_alive():
break
print '.',
sys.stdout.flush()
#### TEST_EVENT
def event_func(event):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
def test_event():
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=event_func, args=(event,))
for i in range(5)]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
#### TEST_SHAREDVALUES
def sharedvalues_func(values, arrays, shared_values, shared_arrays):
for i in range(len(values)):
v = values[i][1]
sv = shared_values[i].value
assert v == sv
for i in range(len(values)):
a = arrays[i][1]
sa = list(shared_arrays[i][:])
assert a == sa
print 'Tests passed'
def test_sharedvalues():
values = [
('i', 10),
('h', -2),
('d', 1.25)
]
arrays = [
('i', range(100)),
('d', [0.25 * i for i in range(100)]),
('H', range(1000))
]
shared_values = [multiprocessing.Value(id, v) for id, v in values]
shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
p = multiprocessing.Process(
target=sharedvalues_func,
args=(values, arrays, shared_values, shared_arrays)
)
p.start()
p.join()
assert p.exitcode == 0
####
def test(namespace=multiprocessing):
global multiprocessing
multiprocessing = namespace
for func in [ test_value, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,
test_sharedvalues ]:
print '\n\t######## %s\n' % func.__name__
func()
ignore = multiprocessing.active_children() # cleanup any old processes
if hasattr(multiprocessing, '_debug_info'):
info = multiprocessing._debug_info()
if info:
print info
raise ValueError('there should be no positive refcounts left')
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
namespace = multiprocessing
elif sys.argv[1] == 'manager':
print ' Using processes and a manager '.center(79, '-')
namespace = multiprocessing.Manager()
namespace.Process = multiprocessing.Process
namespace.current_process = multiprocessing.current_process
namespace.active_children = multiprocessing.active_children
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as namespace
else:
print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
raise SystemExit(2)
test(namespace)
キューを使用してタスクをワーカープロセスのコレクションにフィードし、結果を収集する方法を示す例:
#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue. If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
ワーカープロセスのプールが、単一のリスニングソケットを共有しながら、それぞれSimpleHTTPServer.HttpServer
インスタンスを実行する方法の例。
#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object. (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import os
import sys
from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
if sys.platform == 'win32':
import multiprocessing.reduction # make sockets pickable/inheritable
def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
class RequestHandler(SimpleHTTPRequestHandler):
# we override log_message() to show which process is handling the request
def log_message(self, format, *args):
note(format, *args)
def serve_forever(server):
note('starting server')
try:
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(address, number_of_processes):
# create a single server object -- children will each inherit a copy
server = HTTPServer(address, RequestHandler)
# create child processes to act as workers
for i in range(number_of_processes-1):
Process(target=serve_forever, args=(server,)).start()
# main process also acts as a worker
serve_forever(server)
def test():
DIR = os.path.join(os.path.dirname(__file__), '..')
ADDRESS = ('localhost', 8000)
NUMBER_OF_PROCESSES = 4
print 'Serving at http://%s:%d using %d worker processes' % \
(ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
os.chdir(DIR)
runpool(ADDRESS, NUMBER_OF_PROCESSES)
if __name__ == '__main__':
freeze_support()
test()
マルチプロセッシングとスレッディングを比較するいくつかの簡単なベンチマーク:
#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, multiprocessing, threading, Queue, gc
if sys.platform == 'win32':
_timer = time.clock
else:
_timer = time.time
delta = 1
#### TEST_QUEUESPEED
def queuespeed_func(q, c, iterations):
a = '0' * 256
c.acquire()
c.notify()
c.release()
for i in xrange(iterations):
q.put(a)
q.put('STOP')
def test_queuespeed(Process, q, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = Process(target=queuespeed_func, args=(q, c, iterations))
c.acquire()
p.start()
c.wait()
c.release()
result = None
t = _timer()
while result != 'STOP':
result = q.get()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through the queue in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_PIPESPEED
def pipe_func(c, cond, iterations):
a = '0' * 256
cond.acquire()
cond.notify()
cond.release()
for i in xrange(iterations):
c.send(a)
c.send('STOP')
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through connection in',elapsed,'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_SEQSPEED
def test_seqspeed(seq):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
a = seq[5]
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_LOCK
def test_lockspeed(l):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
l.acquire()
l.release()
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_CONDITION
def conditionspeed_func(c, N):
c.acquire()
c.notify()
for i in xrange(N):
c.wait()
c.notify()
c.release()
def test_conditionspeed(Process, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
c.acquire()
p = Process(target=conditionspeed_func, args=(c, iterations))
p.start()
c.wait()
t = _timer()
for i in xrange(iterations):
c.notify()
c.wait()
elapsed = _timer()-t
c.release()
p.join()
print iterations * 2, 'waits in', elapsed, 'seconds'
print 'average number/sec:', iterations * 2 / elapsed
####
def test():
manager = multiprocessing.Manager()
gc.disable()
print '\n\t######## testing Queue.Queue\n'
test_queuespeed(threading.Thread, Queue.Queue(),
threading.Condition())
print '\n\t######## testing multiprocessing.Queue\n'
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print '\n\t######## testing Queue managed by server process\n'
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print '\n\t######## testing multiprocessing.Pipe\n'
test_pipespeed()
print
print '\n\t######## testing list\n'
test_seqspeed(range(10))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
print '\n\t######## testing Array("i", ..., lock=False)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
print '\n\t######## testing Array("i", ..., lock=True)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
print
print '\n\t######## testing threading.Lock\n'
test_lockspeed(threading.Lock())
print '\n\t######## testing threading.RLock\n'
test_lockspeed(threading.RLock())
print '\n\t######## testing multiprocessing.Lock\n'
test_lockspeed(multiprocessing.Lock())
print '\n\t######## testing multiprocessing.RLock\n'
test_lockspeed(multiprocessing.RLock())
print '\n\t######## testing lock managed by server process\n'
test_lockspeed(manager.Lock())
print '\n\t######## testing rlock managed by server process\n'
test_lockspeed(manager.RLock())
print
print '\n\t######## testing threading.Condition\n'
test_conditionspeed(threading.Thread, threading.Condition())
print '\n\t######## testing multiprocessing.Condition\n'
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print '\n\t######## testing condition managed by a server process\n'
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()