トランスポートとプロトコル
序文
トランスポートとプロトコルは、loop.create_connection()
などの低レベルイベントループAPIによって使用されます。 コールバックベースのプログラミングスタイルを使用し、ネットワークまたはIPCプロトコルの高性能実装を可能にします(例: HTTP)。
基本的に、トランスポートとプロトコルはライブラリとフレームワークでのみ使用する必要があり、高レベルの非同期アプリケーションでは使用しないでください。
このドキュメントページでは、トランスポートとプロトコルの両方について説明しています。
序章
最高レベルでは、トランスポートはどのようにバイトが送信されるかに関係し、プロトコルはどのバイトを送信するか(そしてある程度はいつ)を決定します。
同じことを別の言い方で言うと、トランスポートはソケット(または同様のI / Oエンドポイント)の抽象化であり、プロトコルはトランスポートの観点からのアプリケーションの抽象化です。
さらに別の見方は、トランスポートインターフェイスとプロトコルインターフェイスが一緒になって、ネットワークI / Oとプロセス間I / Oを使用するための抽象的なインターフェイスを定義することです。
トランスポートオブジェクトとプロトコルオブジェクトの間には常に1:1の関係があります。プロトコルはトランスポートメソッドを呼び出してデータを送信し、トランスポートはプロトコルメソッドを呼び出して受信したデータを渡します。
コネクション型イベントループメソッド(loop.create_connection()
など)のほとんどは、通常、で表される、受け入れられた接続の Protocol オブジェクトを作成するために使用される protocol_factory 引数を受け入れます。 ] Transport オブジェクト。 このようなメソッドは通常、(transport, protocol)
のタプルを返します。
コンテンツ
このドキュメントページには、次のセクションが含まれています。
- Transports セクションには、非同期 BaseTransport 、 ReadTransport 、 WriteTransport 、 Transport 、 DatagramTransport が記載されています。 、および SubprocessTransport クラス。
- Protocols セクションには、asyncio BaseProtocol 、 Protocol 、 BufferedProtocol 、 DatagramProtocol 、および SubprocessProtocol が記載されています。 ] クラス。
- 例セクションでは、トランスポート、プロトコル、および低レベルのイベントループAPIを操作する方法を紹介します。
トランスポート
ソースコード: :source: `Lib / asyncio / transports.py`
トランスポートは、さまざまな種類の通信チャネルを抽象化するために asyncio によって提供されるクラスです。
トランスポートオブジェクトは、常に asyncioイベントループによってインスタンス化されます。
asyncioは、TCP、UDP、SSL、およびサブプロセスパイプのトランスポートを実装します。 トランスポートで利用できる方法は、トランスポートの種類によって異なります。
トランスポートクラスはスレッドセーフではありません。
トランスポート階層
- class asyncio.BaseTransport
- すべてのトランスポートの基本クラス。 すべての非同期トランスポートが共有するメソッドが含まれています。
- class asyncio.WriteTransport(BaseTransport)
書き込み専用接続のベーストランスポート。
WriteTransport クラスのインスタンスは、
loop.connect_write_pipe()
イベントループメソッドから返され、loop.subprocess_exec()
などのサブプロセス関連のメソッドでも使用されます。
- class asyncio.ReadTransport(BaseTransport)
読み取り専用接続のベーストランスポート。
ReadTransport クラスのインスタンスは、
loop.connect_read_pipe()
イベントループメソッドから返され、loop.subprocess_exec()
などのサブプロセス関連のメソッドでも使用されます。
- class asyncio.Transport(WriteTransport, ReadTransport)
TCP接続などの双方向トランスポートを表すインターフェイス。
ユーザーはトランスポートを直接インスタンス化しません。 ユーティリティ関数を呼び出し、プロトコルファクトリおよびトランスポートとプロトコルの作成に必要なその他の情報を渡します。
Transport クラスのインスタンスは、
loop.create_connection()
、loop.create_unix_connection()
、loop.create_server()
、loop.sendfile()
などのイベントループメソッドから返されるか、イベントループメソッドによって使用されます。 。
- class asyncio.DatagramTransport(BaseTransport)
データグラム(UDP)接続のトランスポート。
DatagramTransport クラスのインスタンスは、
loop.create_datagram_endpoint()
イベントループメソッドから返されます。
- class asyncio.SubprocessTransport(BaseTransport)
親とその子OSプロセス間の接続を表す抽象化。
SubprocessTransport クラスのインスタンスは、イベントループメソッド
loop.subprocess_shell()
およびloop.subprocess_exec()
から返されます。
ベーストランスポート
- BaseTransport.close()
トランスポートを閉じます。
トランスポートに送信データ用のバッファーがある場合、バッファーされたデータは非同期でフラッシュされます。 これ以上のデータは受信されません。 バッファリングされたすべてのデータがフラッシュされた後、プロトコルの protocol.connection_lost()メソッドが、 None を引数として呼び出されます。
- BaseTransport.is_closing()
- トランスポートが閉じているか閉じている場合は、
True
を返します。
- BaseTransport.get_extra_info(name, default=None)
トランスポートまたはそれが使用する基礎となるリソースに関する情報を返します。
name は、取得するトランスポート固有の情報を表す文字列です。
default は、情報が利用できない場合、またはトランスポートが特定のサードパーティイベントループ実装または現在のプラットフォームでのクエリをサポートしていない場合に返す値です。
たとえば、次のコードは、トランスポートの基になるソケットオブジェクトを取得しようとします。
sock = transport.get_extra_info('socket') if sock is not None: print(sock.getsockopt(...))
一部のトランスポートで照会できる情報のカテゴリー:
ソケット:
'peername'
:ソケットが接続されているリモートアドレス、 socket.socket.getpeername()の結果(エラーの場合はNone
)'socket'
: socket.socket インスタンス'sockname'
:ソケット自体のアドレス、 socket.socket.getsockname()の結果
SSLソケット:
'compression'
:文字列として使用されている圧縮アルゴリズム。接続が圧縮されていない場合はNone
。 ssl.SSLSocket.compression()の結果'cipher'
:使用されている暗号の名前、その使用を定義するSSLプロトコルのバージョン、および使用されている秘密ビットの数を含む3つの値のタプル。 ssl.SSLSocket.cipher()の結果'peercert'
:ピア証明書; ssl.SSLSocket.getpeercert()の結果'sslcontext'
: ssl.SSLContext インスタンス'ssl_object'
: ssl.SSLObject または ssl.SSLSocket インスタンス
パイプ:
'pipe'
:パイプオブジェクト
サブプロセス:
'subprocess'
: subprocess.Popen インスタンス
- BaseTransport.set_protocol(protocol)
新しいプロトコルを設定します。
プロトコルの切り替えは、切り替えをサポートするために両方のプロトコルが文書化されている場合にのみ実行する必要があります。
- BaseTransport.get_protocol()
- 現在のプロトコルを返します。
読み取り専用トランスポート
- ReadTransport.is_reading()
トランスポートが新しいデータを受信している場合は、
True
を返します。バージョン3.7の新機能。
- ReadTransport.pause_reading()
トランスポートの受信側を一時停止します。 resume_reading()が呼び出されるまで、プロトコルの protocol.data_received()メソッドにデータは渡されません。
バージョン3.7で変更:メソッドはべき等です。 トランスポートがすでに一時停止または閉じられているときに呼び出すことができます。
- ReadTransport.resume_reading()
受信側を再開します。 プロトコルの protocol.data_received()メソッドは、読み取り可能なデータがある場合に再度呼び出されます。
バージョン3.7で変更:メソッドはべき等です。 トランスポートがすでに読み取りを行っているときに呼び出すことができます。
書き込み専用トランスポート
- WriteTransport.abort()
- 保留中の操作が完了するのを待たずに、トランスポートをすぐに閉じます。 バッファリングされたデータは失われます。 これ以上のデータは受信されません。 プロトコルの protocol.connection_lost()メソッドは、最終的に None を引数として呼び出されます。
- WriteTransport.can_write_eof()
- トランスポートが write_eof()をサポートしている場合は True を返し、サポートしていない場合は False を返します。
- WriteTransport.get_write_buffer_size()
- トランスポートによって使用される出力バッファの現在のサイズを返します。
- WriteTransport.get_write_buffer_limits()
書き込みフロー制御用の high および low ウォーターマークを取得します。 タプル
(low, high)
を返します。ここで、 low および high は正のバイト数です。set_write_buffer_limits()を使用して制限を設定します。
バージョン3.4.2の新機能。
- WriteTransport.set_write_buffer_limits(high=None, low=None)
書き込みフロー制御用に high および low ウォーターマークを設定します。
これらの2つの値(バイト数で測定)は、プロトコルの protocol.pause_writing()および protocol.resume_writing()メソッドが呼び出されるタイミングを制御します。 指定する場合、最低透かしは最高透かし以下である必要があります。 高も低も負の値にすることはできません。
pause_writing()は、バッファサイズが high 値以上になったときに呼び出されます。 書き込みが一時停止されている場合、バッファサイズが low 値以下になると、 resume_writing()が呼び出されます。
デフォルトは実装固有です。 最高水準点のみが指定されている場合、最低水準点はデフォルトで最高水準点以下の実装固有の値になります。 high をゼロに設定すると、 low もゼロになり、バッファーが空でなくなるたびに pause_writing()が呼び出されます。 low をゼロに設定すると、 resume_writing()は、バッファーが空になったときにのみ呼び出されます。 いずれかの制限にゼロを使用すると、I / Oと計算を同時に実行する機会が減るため、一般的に最適ではありません。
get_write_buffer_limits()を使用して制限を取得します。
- WriteTransport.write(data)
トランスポートに data バイトを書き込みます。
このメソッドはブロックしません。 データをバッファリングし、非同期で送信されるように調整します。
- WriteTransport.writelines(list_of_data)
- データバイトのリスト(または反復可能なもの)をトランスポートに書き込みます。 これは、iterableによって生成された各要素で write()を呼び出すことと機能的に同等ですが、より効率的に実装できます。
- WriteTransport.write_eof()
バッファリングされたすべてのデータをフラッシュした後、トランスポートの書き込み終了を閉じます。 データは引き続き受信される場合があります。
このメソッドは、トランスポート(例: SSL)はハーフクローズ接続をサポートしていません。
データグラムトランスポート
- DatagramTransport.sendto(data, addr=None)
data バイトを、 addr (トランスポートに依存するターゲットアドレス)で指定されたリモートピアに送信します。 addr が None の場合、データはトランスポートの作成時に指定されたターゲットアドレスに送信されます。
このメソッドはブロックしません。 データをバッファリングし、非同期で送信されるように調整します。
- DatagramTransport.abort()
- 保留中の操作が完了するのを待たずに、トランスポートをすぐに閉じます。 バッファリングされたデータは失われます。 これ以上のデータは受信されません。 プロトコルの protocol.connection_lost()メソッドは、最終的に None を引数として呼び出されます。
サブプロセストランスポート
- SubprocessTransport.get_pid()
- サブプロセスプロセスIDを整数として返します。
- SubprocessTransport.get_pipe_transport(fd)
- 整数ファイル記述子 fd に対応する通信パイプのトランスポートを返します。
- SubprocessTransport.get_returncode()
- サブプロセスの戻りコードを整数として返すか、返されていない場合は None を返します。これは、 subprocess.Popen.returncode 属性と同様です。
- SubprocessTransport.kill()
サブプロセスを強制終了します。
POSIXシステムでは、関数はSIGKILLをサブプロセスに送信します。 Windowsでは、このメソッドは terminate()のエイリアスです。
subprocess.Popen.kill()も参照してください。
- SubprocessTransport.send_signal(signal)
- subprocess.Popen.send_signal()のように、 signal 番号をサブプロセスに送信します。
- SubprocessTransport.terminate()
サブプロセスを停止します。
POSIXシステムでは、このメソッドはSIGTERMをサブプロセスに送信します。 Windowsでは、Windows API関数TerminateProcess()が呼び出されてサブプロセスが停止します。
subprocess.Popen.terminate()も参照してください。
- SubprocessTransport.close()
kill()メソッドを呼び出して、サブプロセスを強制終了します。
サブプロセスがまだ返されていない場合は、 stdin 、 stdout 、および stderr パイプのトランスポートを閉じます。
プロトコル
ソースコード: :source: `Lib / asyncio / protocols.py`
asyncioは、ネットワークプロトコルを実装するために使用する必要がある一連の抽象基本クラスを提供します。 これらのクラスは、トランスポートと一緒に使用することを目的としています。
抽象基本プロトコルクラスのサブクラスは、一部またはすべてのメソッドを実装する場合があります。 これらのメソッドはすべてコールバックです。たとえば、一部のデータが受信されたときに、特定のイベントでトランスポートによって呼び出されます。 基本プロトコルメソッドは、対応するトランスポートによって呼び出される必要があります。
基本プロトコル
- class asyncio.BaseProtocol
- すべてのプロトコルが共有するメソッドを持つ基本プロトコル。
- class asyncio.Protocol(BaseProtocol)
- ストリーミングプロトコル(TCP、Unixソケットなど)を実装するための基本クラス。
- class asyncio.BufferedProtocol(BaseProtocol)
- 受信バッファを手動で制御してストリーミングプロトコルを実装するための基本クラス。
- class asyncio.DatagramProtocol(BaseProtocol)
- データグラム(UDP)プロトコルを実装するための基本クラス。
- class asyncio.SubprocessProtocol(BaseProtocol)
- 子プロセス(単方向パイプ)と通信するプロトコルを実装するための基本クラス。
基本プロトコル
すべての非同期プロトコルは、ベースプロトコルコールバックを実装できます。
接続コールバック
接続コールバックは、接続が成功するたびに1回だけ、すべてのプロトコルで呼び出されます。 他のすべてのプロトコルコールバックは、これら2つのメソッド間でのみ呼び出すことができます。
- BaseProtocol.connection_made(transport)
接続が確立されたときに呼び出されます。
transport 引数は、接続を表すトランスポートです。 プロトコルは、そのトランスポートへの参照を保存する責任があります。
- BaseProtocol.connection_lost(exc)
接続が失われたとき、または閉じられたときに呼び出されます。
引数は、例外オブジェクトまたは None のいずれかです。 後者は、通常のEOFが受信されたか、接続が接続のこちら側で中止または閉じられたことを意味します。
フロー制御のコールバック
フロー制御コールバックは、プロトコルによって実行された書き込みを一時停止または再開するためにトランスポートによって呼び出されます。
詳細については、 set_write_buffer_limits()メソッドのドキュメントを参照してください。
- BaseProtocol.pause_writing()
- トランスポートのバッファが最高水準点を超えたときに呼び出されます。
- BaseProtocol.resume_writing()
- トランスポートのバッファが最低水準点を下回ったときに呼び出されます。
バッファサイズが最高水準点と等しい場合、 pause_writing()は呼び出されません。バッファサイズは厳密に超過する必要があります。
逆に、 resume_writing()は、バッファサイズが最低水準点以下の場合に呼び出されます。 これらの終了条件は、いずれかのマークがゼロのときに物事が期待どおりに進むようにするために重要です。
ストリーミングプロトコル
loop.create_server()
、loop.create_unix_server()
、loop.create_connection()
、loop.create_unix_connection()
、loop.connect_accepted_socket()
、loop.connect_read_pipe()
、loop.connect_write_pipe()
ストリーミングプロトコルを返すファクトリを受け入れます。
- Protocol.data_received(data)
一部のデータが受信されたときに呼び出されます。 data は、着信データを含む空でないバイトオブジェクトです。
データがバッファリングされるか、チャンク化されるか、再構築されるかは、トランスポートによって異なります。 一般に、特定のセマンティクスに依存するのではなく、構文解析を一般的かつ柔軟にする必要があります。 ただし、データは常に正しい順序で受信されます。
このメソッドは、接続が開いているときに任意の回数呼び出すことができます。
ただし、 protocol.eof_received()は最大で1回呼び出されます。 eof_received()が呼び出されると、
data_received()
は呼び出されなくなります。
- Protocol.eof_received()
もう一方の端がそれ以上データを送信しないことを通知したときに呼び出されます(たとえば、もう一方の端もasyncioを使用している場合は、 transport.write_eof()を呼び出します)。
このメソッドはfalse値(
None
を含む)を返す可能性があり、その場合、トランスポートはそれ自体を閉じます。 逆に、このメソッドが真の値を返す場合、使用されるプロトコルがトランスポートを閉じるかどうかを決定します。 デフォルトの実装はNone
を返すため、暗黙的に接続を閉じます。SSLを含む一部のトランスポートは、ハーフクローズ接続をサポートしていません。この場合、このメソッドからtrueを返すと、接続がクローズされます。
ステートマシン:
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
バッファリングされたストリーミングプロトコル
バージョン3.7の新機能。
バッファリングされたプロトコルは、ストリーミングプロトコルをサポートする任意のイベントループメソッドで使用できます。
BufferedProtocol
の実装により、受信バッファーの明示的な手動割り当てと制御が可能になります。 イベントループは、プロトコルによって提供されるバッファーを使用して、不要なデータコピーを回避できます。 これにより、大量のデータを受信するプロトコルのパフォーマンスが大幅に向上する可能性があります。 高度なプロトコル実装により、バッファ割り当ての数を大幅に減らすことができます。
次のコールバックは、 BufferedProtocol インスタンスで呼び出されます。
- BufferedProtocol.get_buffer(sizehint)
新しい受信バッファを割り当てるために呼び出されます。
sizehint は、返されるバッファーの推奨最小サイズです。 sizehint が示唆するよりも小さいまたは大きいバッファを返すことは許容されます。 -1に設定すると、バッファサイズは任意になります。 サイズがゼロのバッファを返すとエラーになります。
get_buffer()
は、バッファプロトコルを実装するオブジェクトを返す必要があります。
- BufferedProtocol.buffer_updated(nbytes)
受信したデータでバッファが更新されたときに呼び出されます。
nbytes は、バッファーに書き込まれた合計バイト数です。
- BufferedProtocol.eof_received()
- protocol.eof_received()メソッドのドキュメントを参照してください。
get_buffer()は、接続中に任意の回数呼び出すことができます。 ただし、 protocol.eof_received()は最大で一度呼び出され、呼び出された場合、 get_buffer()および buffer_updated()はその後呼び出されません。
ステートマシン:
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
データグラムプロトコル
データグラムプロトコルインスタンスは、loop.create_datagram_endpoint()
メソッドに渡されるプロトコルファクトリによって構築する必要があります。
- DatagramProtocol.datagram_received(data, addr)
- データグラムが受信されたときに呼び出されます。 data は、着信データを含むバイトオブジェクトです。 addr は、データを送信するピアのアドレスです。 正確な形式はトランスポートによって異なります。
- DatagramProtocol.error_received(exc)
前の送信または受信操作で OSError が発生したときに呼び出されます。 exc は OSError インスタンスです。
このメソッドは、トランスポート(例: UDP)は、データグラムを受信者に配信できなかったことを検出します。 ただし、多くの状況では、配信できないデータグラムはサイレントにドロップされます。
ノート
BSDシステム(macOS、FreeBSDなど)では、大量のパケットの書き込みによって引き起こされた送信障害を検出する信頼できる方法がないため、データグラムプロトコルのフロー制御はサポートされていません。
ソケットは常に「準備完了」と表示され、余分なパケットはドロップされます。 errno
が errno.ENOBUFS に設定された OSError が発生する場合と発生しない場合があります。 発生した場合は、 DatagramProtocol.error_received()に報告されますが、それ以外の場合は無視されます。
サブプロセスプロトコル
サブプロセスプロトコルインスタンスは、loop.subprocess_exec()
およびloop.subprocess_shell()
メソッドに渡されるプロトコルファクトリによって構築する必要があります。
- SubprocessProtocol.pipe_data_received(fd, data)
子プロセスがstdoutまたはstderrパイプにデータを書き込むときに呼び出されます。
fd は、パイプの整数ファイル記述子です。
data は、受信したデータを含む空でないバイトオブジェクトです。
- SubprocessProtocol.pipe_connection_lost(fd, exc)
子プロセスと通信しているパイプの1つが閉じられたときに呼び出されます。
fd は、閉じられた整数ファイル記述子です。
- SubprocessProtocol.process_exited()
- 子プロセスが終了したときに呼び出されます。
例
TCPエコーサーバー
loop.create_server()
メソッドを使用してTCPエコーサーバーを作成し、受信したデータを送り返し、接続を閉じます。
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
TCPエコークライアント
loop.create_connection()
メソッドを使用するTCPエコークライアントは、データを送信し、接続が閉じられるまで待機します。
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
UDPエコーサーバー
loop.create_datagram_endpoint()
メソッドを使用するUDPエコーサーバーは、受信したデータを送り返します。
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
UDPエコークライアント
loop.create_datagram_endpoint()
メソッドを使用するUDPエコークライアントは、データを送信し、応答を受信するとトランスポートを閉じます。
import asyncio
class EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Hello World!"
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, on_con_lost),
remote_addr=('127.0.0.1', 9999))
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
既存のソケットの接続
プロトコルでloop.create_connection()
メソッドを使用してソケットがデータを受信するまで待ちます。
import asyncio
import socket
class MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport;
# connection_lost() will be called automatically.
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Create a pair of connected sockets
rsock, wsock = socket.socketpair()
# Register the socket to wait for data.
transport, protocol = await loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Simulate the reception of data from the network.
loop.call_soon(wsock.send, 'abc'.encode())
try:
await protocol.on_con_lost
finally:
transport.close()
wsock.close()
asyncio.run(main())
も参照してください
読み取りイベントのファイル記述子を監視するの例では、低レベルの loop.add_reader()メソッドを使用してFDを登録します。
ストリームを使用してデータを待機するためのオープンソケットの登録の例では、コルーチンのopen_connection()
関数によって作成された高レベルのストリームを使用します。
loop.subprocess_exec()およびSubprocessProtocol
サブプロセスの出力を取得し、サブプロセスの終了を待機するために使用されるサブプロセスプロトコルの例。
サブプロセスは、loop.subprocess_exec()
メソッドによって作成されます。
import asyncio
import sys
class DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
code = 'import datetime; print(datetime.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
transport, protocol = await loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', code,
stdin=None, stderr=None)
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await exit_future
# Close the stdout pipe.
transport.close()
# Read the output which was collected by the
# pipe_data_received() method of the protocol.
data = bytes(protocol.output)
return data.decode('ascii').rstrip()
date = asyncio.run(get_date())
print(f"Current date: {date}")
高レベルAPIを使用して記述された同じ例も参照してください。