流¶
流是用于处理网络连接的高级 async/await-ready 原语。流允许发送和接收数据,而不需要使用回调或低级协议和传输。
下面是一个使用 asyncio streams 编写的 TCP echo 客户端示例:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
参见下面的 Examples 部分。
Stream 函数
下面的高级 asyncio 函数可以用来创建和处理流:
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)¶ 建立网络连接并返回一对
(reader, writer)
对象。返回的 reader 和 writer 对象是
StreamReader
和StreamWriter
类的实例。loop 参数是可选的,当从协程中等待该函数时,总是可以自动确定。
limit 确定返回的
StreamReader
实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。其余的参数直接传递到
loop.create_connection()
。3.7 新版功能: ssl_handshake_timeout 形参。
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ 启动套接字服务。
当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。该函数会接收到一对参数
(reader, writer)
,reader是类StreamReader
的实例,而writer是类StreamWriter
的实例。client_connected_cb 即可以是普通的可调用对象也可以是一个 协程函数; 如果它是一个协程函数,它将自动作为
Task
被调度。loop 参数是可选的。当在一个协程中await该方法时,该参数始终可以自动确定。
limit 确定返回的
StreamReader
实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。余下的参数将会直接传递给
loop.create_server()
.3.7 新版功能: The ssl_handshake_timeout and start_serving parameters.
Unix 套接字
-
coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ 建立一个 Unix 套接字连接并返回
(reader, writer)
这对返回值。与
open_connection()
相似,但是操作在 Unix 套接字上请看文档
loop.create_unix_connection()
.可用性: Unix。
3.7 新版功能: ssl_handshake_timeout 形参。
在 3.7 版更改: path 现在是一个 path-like object
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ 启动一个Unix socket服务。
与
start_server()
相似,但是是在 Unix 套接字上的操作。请看文档
loop.create_unix_server()
.可用性: Unix。
3.7 新版功能: The ssl_handshake_timeout and start_serving parameters.
在 3.7 版更改: path 形参现在可以是 path-like object 对象。
StreamReader¶
-
class
asyncio.
StreamReader
¶ 这个类表示一个提供api来从IO流中读取数据的读取器对象。
不推荐直接实例化 StreamReader 对象,建议使用
open_connection()
和start_server()
来获取 StreamReader 实例。-
coroutine
read
(n=-1)¶ 读取 n 个byte. 如果没有设置 n , 则自动置为
-1
,读至 EOF 并返回所有读取的byte。如果读到EOF,且内部缓冲区为空,则返回一个空的
bytes
对象。
-
coroutine
readline
()¶ 读取一行,其中“行”指的是以
\n
结尾的字节序列。如果读到EOF而没有找到
\n
,该方法返回部分读取的数据。如果读到EOF,且内部缓冲区为空,则返回一个空的
bytes
对象。
-
coroutine
readexactly
(n)¶ 精准读取 n 个 bytes,不能超过也不能少于。
如果在读取完 n 个byte之前读取到EOF,则会抛出
IncompleteReadError
异常。使用IncompleteReadError.partial
属性来获取到达流结束之前读取的 bytes 字符串。
-
coroutine
readuntil
(separator=b'\n')¶ 从流中读取数据直至遇到 分隔符
成功后,数据和指定的separator将从内部缓冲区中删除(或者说被消费掉)。返回的数据将包括在末尾的指定separator。
如果读取的数据量超过了配置的流限制,将引发
LimitOverrunError
异常,数据将留在内部缓冲区中并可以再次读取。如果在找到完整的separator之前到达EOF,则会引发
IncompleteReadError
异常,并重置内部缓冲区。IncompleteReadError.partial
属性可能包含指定separator的一部分。3.5.2 新版功能.
-
at_eof
()¶ 如果缓冲区为空并且
feed_eof()
被调用,则返回True
。
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ 这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。
It is not recommended to instantiate StreamWriter objects directly; use
open_connection()
andstart_server()
instead.-
can_write_eof
()¶ Return
True
if the underlying transport supports thewrite_eof()
method,False
otherwise.
-
write_eof
()¶ Close the write end of the stream after the buffered write data is flushed.
-
transport
¶ Return the underlying asyncio transport.
-
get_extra_info
(name, default=None)¶ Access optional transport information; see
BaseTransport.get_extra_info()
for details.
-
write
(data)¶ Write data to the stream.
This method is not subject to flow control. Calls to
write()
should be followed bydrain()
.
-
writelines
(data)¶ Write a list (or any iterable) of bytes to the stream.
This method is not subject to flow control. Calls to
writelines()
should be followed bydrain()
.
-
coroutine
drain
()¶ Wait until it is appropriate to resume writing to the stream. Example:
writer.write(data) await writer.drain()
This is a flow control method that interacts with the underlying IO write buffer. When the size of the buffer reaches the high watermark, drain() blocks until the size of the buffer is drained down to the low watermark and writing can be resumed. When there is nothing to wait for, the
drain()
returns immediately.
-
close
()¶ Close the stream.
-
is_closing
()¶ Return
True
if the stream is closed or in the process of being closed.3.7 新版功能.
-
例子¶
TCP echo client using streams¶
TCP echo client using the asyncio.open_connection()
function:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
参见
The TCP echo client protocol
example uses the low-level loop.create_connection()
method.
TCP echo server using streams¶
TCP echo server using the asyncio.start_server()
function:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
参见
The TCP echo server protocol
example uses the loop.create_server()
method.
Get HTTP headers¶
Simple example querying HTTP headers of the URL passed on the command line:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
用法:
python example.py http://example.com/path/page.html
or with HTTPS:
python example.py https://example.com/path/page.html
Register an open socket to wait for data using streams¶
Coroutine waiting until a socket receives data using the
open_connection()
function:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
参见
The register an open socket to wait for data using a protocol example uses a low-level protocol and
the loop.create_connection()
method.
The watch a file descriptor for read events example uses the low-level
loop.add_reader()
method to watch a file descriptor.