AsyncIO for Networking
Learn about the architecture of an AsyncIO-driven server for inter-process logging, highlighting effective coroutine management and responsive concurrency strategies.
Overview
AsyncIO was specifically designed for use with network sockets, so let’s implement a server using the asyncio
module. We’ve created a fairly complex server to catch log entries being sent from one process to another process using sockets. At the time, we used it as an example of a complex resource we didn’t want to set up and tear down for each test.
Example
We’ll rewrite that example, creating an asyncio
-based server that can handle requests from a (large) number of clients. It can do this by having lots of coroutines, all waiting for log records to arrive. When a record arrives, one coroutine can save the record, doing some computation, while the remaining coroutines wait.
We’ve already discussed the integration of a log catcher process with separate log-writing client application processes. Here’s an illustration of the relationships involved:
The log catcher process creates a socket server to wait for connections from all client applications. Each of the client applications uses logging.SocketHandler
to direct log messages to the waiting server. The server collects the messages and writes them to a single, central log file.
This test was based on an example that suffered from a weak implementation. To keep things simple in that chapter, the log server only worked with one application client at a time. We want to revisit the idea of a server that collects log messages. This improved implementation will handle a very large number of concurrent clients because it uses AsyncIO techniques.
Log catcher implementation
The central part of this design is a coroutine that reads log entries from a socket. This involves waiting for the bytes that comprise a header, then decoding the header to compute the size of the payload. The coroutine can read the right number of
bytes for the log message payload, and then use a separate coroutine to process
the payload. Here’s the log_catcher()
function:
SIZE_FORMAT = ">L"SIZE_BYTES = struct.calcsize(SIZE_FORMAT)async def log_catcher(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:count = 0client_socket = writer.get_extra_info("socket")size_header = await reader.read(SIZE_BYTES)while size_header:payload_size = struct.unpack(SIZE_FORMAT, size_header)bytes_payload = await reader.read(payload_size[0])await log_writer(bytes_payload)count += 1size_header = await reader.read(SIZE_BYTES)print(f"From {client_socket.getpeername()}: {count} lines")
This log_catcher()
function implements the protocol used by the logging module’s SocketHandler
class. Each log entry is a block of bytes we can decompose into a header and a payload. We need to read the first few bytes, saved in size_header
,
to get the size of the message which follows. Once we have the size, we can wait
for the payload bytes to arrive. Since the two reads are both await
expressions, other coroutines can work while this function is waiting for the header and payload bytes to arrive.
The log_catcher()
function ...