18.5.4. トランスポートとプロトコル(コールバックベースのAPI)—Pythonドキュメント

提供:Dev Guides
< PythonPython/docs/3.6/library/asyncio-protocol
移動先:案内検索

18.5.4。 トランスポートとプロトコル(コールバックベースのAPI)

ソースコード: :source: `Lib / asyncio / transports.py`

ソースコード: :source: `Lib / asyncio / protocols.py`

18.5.4.1。 トランスポート

トランスポートは、さまざまな種類の通信チャネルを抽象化するために asyncio によって提供されるクラスです。 通常、トランスポートを自分でインスタンス化することはありません。 代わりに、 AbstractEventLoop メソッドを呼び出します。このメソッドは、トランスポートを作成し、基盤となる通信チャネルの開始を試み、成功したときにコールバックします。

通信チャネルが確立されると、トランスポートは常にプロトコルインスタンスとペアになります。 その後、プロトコルはさまざまな目的でトランスポートのメソッドを呼び出すことができます。

asyncio は現在、TCP、UDP、SSL、およびサブプロセスパイプのトランスポートを実装しています。 トランスポートで利用できる方法は、トランスポートの種類によって異なります。

トランスポートクラスはスレッドセーフではありません

バージョン3.6で変更:ソケットオプションTCP_NODELAYがデフォルトで設定されるようになりました。


18.5.4.1.1。 BaseTransport

class asyncio.BaseTransport

トランスポートの基本クラス。

close()

トランスポートを閉じます。 トランスポートに送信データ用のバッファーがある場合、バッファーされたデータは非同期でフラッシュされます。 これ以上のデータは受信されません。 バッファリングされたすべてのデータがフラッシュされた後、プロトコルのconnection_lost()メソッドが、 None を引数として呼び出されます。

is_closing()

トランスポートが閉じているか閉じている場合は、Trueを返します。

バージョン3.5.1の新機能。

get_extra_info(name, default=None)

オプションのトランスポート情報を返します。 name は、取得するトランスポート固有の情報を表す文字列です。 default は、情報が存在しない場合に返す値です。

このメソッドを使用すると、トランスポートの実装でチャネル固有の情報を簡単に公開できます。

set_protocol(protocol)

新しいプロトコルを設定します。 プロトコルの切り替えは、切り替えをサポートするために両方のプロトコルが文書化されている場合にのみ実行する必要があります。

バージョン3.5.3の新機能。

get_protocol()

現在のプロトコルを返します。

バージョン3.5.3の新機能。

バージョン3.5.1で変更: 'ssl_object'情報がSSLソケットに追加されました。


18.5.4.1.2。 ReadTransport

class asyncio.ReadTransport

読み取り専用トランスポートのインターフェース。

pause_reading()

トランスポートの受信側を一時停止します。 resume_reading()が呼び出されるまで、プロトコルのdata_received()メソッドにデータは渡されません。

バージョン3.6.7で変更:メソッドはべき等です。 トランスポートがすでに一時停止または閉じられているときに呼び出すことができます。

resume_reading()

受信側を再開します。 プロトコルのdata_received()メソッドは、読み取り可能なデータがある場合に再度呼び出されます。

バージョン3.6.7で変更:メソッドはべき等です。 トランスポートがすでに読み取りを行っているときに呼び出すことができます。


18.5.4.1.3。 WriteTransport

class asyncio.WriteTransport

書き込み専用トランスポートのインターフェース。

abort()

保留中の操作が完了するのを待たずに、トランスポートをすぐに閉じます。 バッファリングされたデータは失われます。 これ以上のデータは受信されません。 プロトコルのconnection_lost()メソッドは、最終的に None を引数として呼び出されます。

can_write_eof()

トランスポートが write_eof()をサポートしている場合は True を返し、サポートしていない場合は False を返します。

get_write_buffer_size()

トランスポートによって使用される出力バッファの現在のサイズを返します。

get_write_buffer_limits()

書き込みフロー制御の-および-水制限を取得します。 タプル(low, high)を返します。ここで、 low および high は正のバイト数です。

set_write_buffer_limits()を使用して制限を設定します。

バージョン3.4.2の新機能。

set_write_buffer_limits(high=None, low=None)

書き込みフロー制御の-および-水制限を設定します。

これらの2つの値(バイト数で測定)は、プロトコルのpause_writing()およびresume_writing()メソッドが呼び出されるタイミングを制御します。 指定する場合、最低水量は最高水量以下である必要があります。 も負の値にすることはできません。

pause_writing()は、バッファサイズが high 値以上になったときに呼び出されます。 書き込みが一時停止している場合、バッファサイズが low の値以下になると、resume_writing()が呼び出されます。

デフォルトは実装固有です。 最高水位の制限のみが指定されている場合、最低水位の制限は、デフォルトで、最高水位の制限以下の実装固有の値になります。 high をゼロに設定すると、 low もゼロになり、バッファが空でなくなるたびにpause_writing()が呼び出されます。 low をゼロに設定すると、バッファが空になったときにのみresume_writing()が呼び出されます。 いずれかの制限にゼロを使用すると、I / Oと計算を同時に実行する機会が減るため、一般的に最適ではありません。

get_write_buffer_limits()を使用して制限を取得します。

write(data)

トランスポートに data バイトを書き込みます。

このメソッドはブロックしません。 データをバッファリングし、非同期で送信されるように調整します。

writelines(list_of_data)

データバイトのリスト(または反復可能なもの)をトランスポートに書き込みます。 これは、iterableによって生成された各要素で write()を呼び出すことと機能的に同等ですが、より効率的に実装できます。

write_eof()

バッファリングされたデータをフラッシュした後、トランスポートの書き込み終了を閉じます。 データは引き続き受信される場合があります。

このメソッドは、トランスポート(例: SSL)はハーフクローズをサポートしていません。


18.5.4.1.4。 DatagramTransport

DatagramTransport.sendto(data, addr=None)

data バイトを、 addr (トランスポートに依存するターゲットアドレス)で指定されたリモートピアに送信します。 addrNone の場合、データはトランスポートの作成時に指定されたターゲットアドレスに送信されます。

このメソッドはブロックしません。 データをバッファリングし、非同期で送信されるように調整します。

DatagramTransport.abort()
保留中の操作が完了するのを待たずに、トランスポートをすぐに閉じます。 バッファリングされたデータは失われます。 これ以上のデータは受信されません。 プロトコルのconnection_lost()メソッドは、最終的に None を引数として呼び出されます。


18.5.4.1.5。 BaseSubprocessTransport

class asyncio.BaseSubprocessTransport
get_pid()

サブプロセスプロセスIDを整数として返します。

get_pipe_transport(fd)

整数ファイル記述子 fd に対応する通信パイプのトランスポートを返します。

  • 0:標準入力の読み取り可能なストリーミングトランスポート( stdin )、またはサブプロセスがstdin=PIPEで作成されていない場合はなし

  • 1:標準出力の書き込み可能なストリーミングトランスポート( stdout )、またはサブプロセスがstdout=PIPEで作成されなかった場合はなし

  • 2:標準エラー( stderr )の書き込み可能なストリーミングトランスポート、またはサブプロセスがstderr=PIPEで作成されなかった場合はなし

  • その他 fdなし

get_returncode()

subprocess.Popen.returncode 属性と同様に、サブプロセスのリターンコードを整数として返すか、返されていない場合は None を返します。

kill()

subprocess.Popen.kill()のように、サブプロセスを強制終了します。

POSIXシステムでは、関数はSIGKILLをサブプロセスに送信します。 Windowsでは、このメソッドは terminate()のエイリアスです。

send_signal(signal)

subprocess.Popen.send_signal()のように、 signal 番号をサブプロセスに送信します。

terminate()

subprocess.Popen.terminate()のように、サブプロセスに停止を要求します。 このメソッドは、 close()メソッドのエイリアスです。

POSIXシステムでは、このメソッドはSIGTERMをサブプロセスに送信します。 Windowsでは、Windows API関数TerminateProcess()が呼び出されてサブプロセスが停止します。

close()

サブプロセスがまだ戻っていない場合は、 terminate()メソッドを呼び出して停止するようにサブプロセスに依頼し、すべてのパイプ( stdinstdout 、および stderr )。


18.5.4.2。 プロトコル

asyncio は、ネットワークプロトコルを実装するためにサブクラス化できる基本クラスを提供します。 これらのクラスは、トランスポート(以下を参照)と組み合わせて使用されます。プロトコルは着信データを解析し、発信データの書き込みを要求しますが、トランスポートは実際のI / Oとバッファリングを担当します。

プロトコルクラスをサブクラス化するときは、特定のメソッドをオーバーライドすることをお勧めします。 これらのメソッドはコールバックです。特定のイベント(たとえば、一部のデータが受信されたとき)でトランスポートによって呼び出されます。 トランスポートを実装しているのでない限り、自分で呼び出すべきではありません。

ノート

すべてのコールバックにはデフォルトの実装があり、空です。 したがって、関心のあるイベントのコールバックを実装するだけで済みます。


18.5.4.2.1。 プロトコルクラス

class asyncio.Protocol
ストリーミングプロトコルを実装するための基本クラス( TCPおよびSSLトランスポート)。
class asyncio.DatagramProtocol
データグラムプロトコルを実装するための基本クラス( UDPトランスポート)。
class asyncio.SubprocessProtocol
子プロセスと通信するプロトコルを実装するための基本クラス(一方向パイプのセットを介して)。


18.5.4.2.2。 接続コールバック

これらのコールバックは、 ProtocolDatagramProtocol 、および SubprocessProtocol インスタンスで呼び出すことができます。

BaseProtocol.connection_made(transport)

接続が確立されたときに呼び出されます。

transport 引数は、接続を表すトランスポートです。 あなたはそれをどこかに保存する責任があります(例えば 属性として)必要に応じて。

BaseProtocol.connection_lost(exc)

接続が失われたとき、または閉じられたときに呼び出されます。

引数は、例外オブジェクトまたは None のいずれかです。 後者は、通常のEOFが受信されたか、接続が接続のこちら側で中止または閉じられたことを意味します。

connection_made()および connection_lost()は、接続が成功するたびに1回だけ呼び出されます。 他のすべてのコールバックは、これら2つのメソッド間で呼び出されるため、プロトコル実装でのリソース管理が容易になります。

次のコールバックは、 SubprocessProtocol インスタンスでのみ呼び出すことができます。

SubprocessProtocol.pipe_data_received(fd, data)
子プロセスがstdoutまたはstderrパイプにデータを書き込むときに呼び出されます。 fd は、パイプの整数ファイル記述子です。 data は、データを含む空でないバイトオブジェクトです。
SubprocessProtocol.pipe_connection_lost(fd, exc)
子プロセスと通信しているパイプの1つが閉じられたときに呼び出されます。 fd は、閉じられた整数ファイル記述子です。
SubprocessProtocol.process_exited()
子プロセスが終了したときに呼び出されます。


18.5.4.2.3。 ストリーミングプロトコル

次のコールバックは、 Protocol インスタンスで呼び出されます。

Protocol.data_received(data)

一部のデータが受信されたときに呼び出されます。 data は、着信データを含む空でないバイトオブジェクトです。

ノート

データがバッファリングされるか、チャンク化されるか、再構築されるかは、トランスポートによって異なります。 一般に、特定のセマンティクスに依存するのではなく、構文解析を一般的で十分に柔軟にする必要があります。 ただし、データは常に正しい順序で受信されます。

Protocol.eof_received()

もう一方の端がそれ以上データを送信しないことを通知したときに呼び出されます(たとえば、もう一方の端も非同期を使用している場合は、write_eof()を呼び出すことによって)。

このメソッドはfalse値(Noneを含む)を返す可能性があり、その場合、トランスポートはそれ自体を閉じます。 逆に、このメソッドが真の値を返す場合、トランスポートを閉じるのはプロトコル次第です。 デフォルトの実装はNoneを返すため、暗黙的に接続を閉じます。

ノート

SSLなどの一部のトランスポートは、ハーフクローズ接続をサポートしていません。その場合、このメソッドからtrueを返しても、接続のクローズが妨げられることはありません。

data_received()は、接続中に任意の回数呼び出すことができます。 ただし、eof_received()は最大で一度呼び出され、呼び出された場合、data_received()はその後呼び出されません。

ステートマシン:

start-> connection_made() [-> data_received() *] [-> eof_received()?]-> connection_lost()[X123X ]->終了


18.5.4.2.4。 データグラムプロトコル

次のコールバックは、 DatagramProtocol インスタンスで呼び出されます。

DatagramProtocol.datagram_received(data, addr)
データグラムが受信されたときに呼び出されます。 data は、着信データを含むバイトオブジェクトです。 addr は、データを送信するピアのアドレスです。 正確な形式はトランスポートによって異なります。
DatagramProtocol.error_received(exc)

前の送信または受信操作で OSError が発生したときに呼び出されます。 excOSError インスタンスです。

このメソッドは、トランスポート(例: UDP)は、データグラムを受信者に配信できなかったことを検出します。 ただし、多くの状況では、配信できないデータグラムはサイレントにドロップされます。


18.5.4.2.5。 フロー制御コールバック

これらのコールバックは、 ProtocolDatagramProtocol 、および SubprocessProtocol インスタンスで呼び出すことができます。

BaseProtocol.pause_writing()
トランスポートのバッファが最高水準点を超えたときに呼び出されます。
BaseProtocol.resume_writing()
トランスポートのバッファが最低水位標を下回ったときに呼び出されます。

pause_writing()resume_writing()の呼び出しはペアになっています– pause_writing()は、バッファーが最高水準点を厳密に超えたときに1回呼び出されます(後続の書き込みによってバッファーサイズがさらに大きくなった場合でも)。そして最終的にresume_writing()は、バッファサイズが最低水準点に達したときに1回呼び出されます。

ノート

バッファサイズが最高水準点に等しい場合、pause_writing()は呼び出されません。厳密にオーバーする必要があります。 逆に、resume_writing()は、バッファサイズが最低水準点以下の場合に呼び出されます。 これらの終了条件は、いずれかのマークがゼロのときに物事が期待どおりに進むようにするために重要です。


ノート

BSDシステム(OS X、FreeBSDなど)では、 DatagramProtocol のフロー制御はサポートされていません。これは、大量のパケットの書き込みによる送信エラーを簡単に検出できないためです。 ソケットは常に「準備完了」と表示され、余分なパケットはドロップされます。 errnoが errno.ENOBUFS に設定された OSError が発生する場合と発生しない場合があります。 発生した場合は、 DatagramProtocol.error_received()に報告されますが、それ以外の場合は無視されます。


18.5.4.2.6。 コルーチンとプロトコル

コルーチンは ensure_future()を使用してプロトコルメソッドでスケジュールできますが、実行順序について保証はありません。 プロトコルは、プロトコルメソッドで作成されたコルーチンを認識しないため、それらを待機しません。

信頼できる実行順序を得るには、yield fromのコルーチンでストリームオブジェクトを使用します。 たとえば、StreamWriter.drain()コルーチンを使用して、書き込みバッファーがフラッシュされるまで待機できます。


18.5.4.3。 プロトコルの例

18.5.4.3.1。 TCPエコークライアントプロトコル

AbstractEventLoop.create_connection()メソッドを使用するTCPエコークライアントは、データを送信し、接続が閉じられるまで待機します。

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

イベントループは2回実行されています。 run_until_complete()メソッドは、サーバーがリッスンしていない場合に例外を発生させるために、例外を処理して実行中のループを停止するための短いコルーチンを作成する代わりに、この短い例で推奨されます。 run_until_complete()の終了時に、ループは実行されなくなったため、エラーが発生した場合にループを停止する必要はありません。

も参照してください

ストリームを使用する TCPエコークライアントの例では、asyncio.open_connection()関数を使用します。


18.5.4.3.2。 TCPエコーサーバープロトコル

AbstractEventLoop.create_server()メソッドを使用するTCPエコーサーバーは、受信したデータを送り返し、接続を閉じます。

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Transport.close()は、データがまだソケットに送信されていない場合でも、 WriteTransport.write()の直後に呼び出すことができます。どちらのメソッドも非同期です。 yield fromは、これらの転送方法がコルーチンではないため、必要ありません。

も参照してください

ストリームを使用する TCPエコーサーバーの例では、asyncio.start_server()関数を使用します。


18.5.4.3.3。 UDPエコークライアントプロトコル

AbstractEventLoop.create_datagram_endpoint()メソッドを使用するUDPエコークライアントは、データを送信し、回答を受け取ったらトランスポートを閉じます。

import asyncio

class EchoClientProtocol:
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        loop = asyncio.get_event_loop()
        loop.stop()

loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
    lambda: EchoClientProtocol(message, loop),
    remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

18.5.4.3.4。 UDPエコーサーバープロトコル

AbstractEventLoop.create_datagram_endpoint()メソッドを使用するUDPエコーサーバーは、受信したデータを送り返します。

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
    EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

transport.close()
loop.close()

18.5.4.3.5。 プロトコルを使用してデータを待機するために開いているソケットを登録します

ソケットがプロトコルでAbstractEventLoop.create_connection()メソッドを使用してデータを受信するまで待ってから、イベントループを閉じます

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

class MyProtocol(asyncio.Protocol):
    transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport (it will call connection_lost())
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed, stop the event loop
        loop.stop()

# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

も参照してください

読み取りイベントのファイル記述子を監視するの例では、低レベルの AbstractEventLoop.add_reader()メソッドを使用してソケットのファイル記述子を登録します。

ストリームを使用してデータを待機するためのオープンソケットの登録の例では、コルーチンのopen_connection()関数によって作成された高レベルのストリームを使用します。