Chapter 4. 20 Asyncio Libraries You Aren’t Using (But…Oh, Never Mind)

In this chapter, we look at case studies using the new Python features for async programming. We’ll be making use of several third-party libraries, as you will in your own projects.

The title of this chapter is a play on the title of a previous book I wrote called 20 Python Libraries You Aren’t Using (But Should) (O’Reilly). Many of those libraries will also be useful in your asyncio-based applications, but this chapter focuses on libraries that have been designed specifically for the new async features in Python.

It is difficult to present asyncio-based code in short snippets. As you have seen in all the previous code samples in the book, I’ve tried to make each example a complete, runnable program, because application lifetime management is a core consideration for using async programming correctly.

For this reason, most of the case studies in this chapter will be somewhat larger, in terms of lines of code, than is usual for such a book. My goal in using this approach is to make the case studies more useful by giving you a “whole view” of an async program rather than leaving you to figure out how detached fragments might fit together.

Note

Some of the code samples in this chapter compromise on style in order to save space. I like PEP8 as much as the next Pythonista, but practicality beats purity!

Streams (Standard Library)

Before looking at third-party libraries, let’s begin with the standard library. The streams API is the high-level interface offered for async socket programming, and as the following case study will show, it’s pretty easy to use. However, application design remains complex simply because of the nature of the domain.

The following case study shows an implementation of a message broker, with an initial naive design followed by a more considered design. Neither should be considered production-ready; my goal is to help you think about the various aspects of concurrent network programming that need to be taken into account when designing such applications.

Case Study: A Message Queue

A message queue service is a backend application that receives connections from other applications and passes messages between those connected services, often referred to as publishers and subscribers. Subscribers typically listen to specific channels for messages, and usually it is possible to configure the message distribution in different channels in two ways: messages can be distributed to all subscribers on a channel (pub-sub), or a different message can go to each subscriber one at a time (point-to-point).

Recently, I worked on a project that involved using ActiveMQ as a message broker for microservices intercommunication. At a basic level, such a broker (server):

  • Maintains persistent socket connections to multiple clients

  • Receives messages from clients with a target channel name

  • Delivers those messages to all other clients subscribed to that same channel name

I recall wondering how hard it might be to create such an application. As an added touch, ActiveMQ can perform both models of message distribution, and the two models are generally differentiated by the channel name:

  • Channel names with the prefix /topic (e.g., /topic/customer/registration) are managed with the pub-sub pattern, where all channel subscribers get all messages.

  • Channel names with the prefix /queue are handled with the point-to-point model, in which messages on a channel are distributed between channel subscribers in a round-robin fashion: each subscriber gets a unique message.

In our case study, we will build a toy message broker with these basic features. The first issue we must address is that TCP is not a message-based protocol: we just get streams of bytes on the wire. We need to create our own protocol for the structure of messages, and the simplest protocol is to prefix each message with a size header, followed by a message payload of that size. The utility library in Example 4-1 provides read and write capabilities for such messages.

Example 4-1. Message protocol: read and write
# msgproto.py
from asyncio import StreamReader, StreamWriter

async def read_msg(stream: StreamReader) -> bytes:
    size_bytes = await stream.readexactly(4)  1
    size = int.from_bytes(size_bytes, byteorder='big')  2
    data = await stream.readexactly(size)  3
    return data

async def send_msg(stream: StreamWriter, data: bytes):
    size_bytes = len(data).to_bytes(4, byteorder='big')
    stream.writelines([size_bytes, data])  4
    await stream.drain()
1

Get the first 4 bytes. This is the size prefix.

2

Those 4 bytes must be converted into an integer.

3

Now we know the payload size, so we read that off the stream.

4

Write is the inverse of read: first we send the length of the data, encoded as 4 bytes, and thereafter the data.

Now that we have a rudimentary message protocol, we can focus on the message broker application in Example 4-2.

Example 4-2. A 40-line prototype server
# mq_server.py
import asyncio
from asyncio import StreamReader, StreamWriter, gather
from collections import deque, defaultdict
from typing import Deque, DefaultDict
from msgproto import read_msg, send_msg  1

SUBSCRIBERS: DefaultDict[bytes, Deque] = defaultdict(deque) 2

async def client(reader: StreamReader, writer: StreamWriter):
  peername = writer.get_extra_info('peername')  3
  subscribe_chan = await read_msg(reader)  4
  SUBSCRIBERS[subscribe_chan].append(writer)  5
  print(f'Remote {peername} subscribed to {subscribe_chan}')
  try:
    while channel_name := await read_msg(reader):  6
      data = await read_msg(reader)  7
      print(f'Sending to {channel_name}: {data[:19]}...')
      conns = SUBSCRIBERS[channel_name]  8
      if conns and channel_name.startswith(b'/queue'):  9
          conns.rotate()  10
          conns = [conns[0]]  11
      await gather(*[send_msg(c, data) for c in conns]) 12
  except asyncio.CancelledError:
    print(f'Remote {peername} closing connection.')
    writer.close()
    await writer.wait_closed()
  except asyncio.IncompleteReadError:
    print(f'Remote {peername} disconnected')
  finally:
    print(f'Remote {peername} closed')
    SUBSCRIBERS[subscribe_chan].remove(writer)  13

async def main(*args, **kwargs):
    server = await asyncio.start_server(*args, **kwargs)
    async with server:
        await server.serve_forever()

try:
    asyncio.run(main(client, host='127.0.0.1', port=25000))
except KeyboardInterrupt:
    print('Bye!')
1

Imports from our msgproto.py module.

2

A global collection of currently active subscribers. Every time a client connects, they must first send a channel name they’re subscribing to. A deque will hold all the subscribers for a particular channel.

3

The client() coroutine function will produce a long-lived coroutine for each new connection. Think of it as a callback for the TCP server started in main(). On this line, I’ve shown how the host and port of the remote peer can be obtained, for example, for logging.

4

Our protocol for clients is the following:

  • On first connect, a client must send a message containing the channel to subscribe to (here, subscribe_chan).

  • Thereafter, for the life of the connection, a client sends a message to a channel by first sending a message containing the destination channel name, followed by a message containing the data. Our broker will send such data messages to every client subscribed to that channel name.

5

Add the StreamWriter instance to the global collection of subscribers.

6

An infinite loop, waiting for data from this client. The first message from a client must be the destination channel name.

7

Next comes the actual data to distribute to the channel.

8

Get the deque of subscribers on the target channel.

9

Some special handling if the channel name begins with the magic word /queue: in this case, we send the data to only one of the subscribers, not all of them. This can be used for sharing work between a bunch of workers, rather than the usual pub-sub notification scheme, where all subscribers on a channel get all the messages.

10

Here is why we use a deque and not a list: rotation of the deque is how we keep track of which client is next in line for /queue distribution. This seems expensive until you realize that a single deque rotation is an O(1) operation.

11

Target only whichever client is first; this changes after every rotation.

12

Create a list of coroutines for sending the message to each writer, and then unpack these into gather() so we can wait for all of the sending to complete.

This line is a bad flaw in our program, but it may not be obvious why: though it may be true that all of the sending to each subscriber will happen concurrently, what happens if we have one very slow client? In this case, the gather() will finish only when the slowest subscriber has received its data. We can’t receive any more data from the sending client until all these send_msg() coroutines finish. This slows all message distribution to the speed of the slowest subscriber.

13

When leaving the client() coroutine, we make sure to remove ourselves from the global SUBSCRIBERS collection. Unfortunately, this is an O(n) operation, which can be a little expensive for very large n. A different data structure would fix this, but for now we console ourselves with the knowledge that connections are intended to be long-lived—thus, there should be few disconnection events—and n is unlikely to be very large (say ~10,000 as a rough order-of-magnitude estimate), and this code is at least easy to understand.

So that’s our server; now we need clients, and then we can show some output. For demonstration purposes, I’ll make two kinds of clients: a sender and a listener. The server doesn’t differentiate; all clients are the same. The distinction between sender and listener behavior is only for educational purposes. Example 4-3 shows the code for the listener application.

Example 4-3. Listener: a toolkit for listening for messages on our message broker
# mq_client_listen.py
import asyncio
import argparse, uuid
from msgproto import read_msg, send_msg

async def main(args):
  me = uuid.uuid4().hex[:8] 1
  print(f'Starting up {me}')
  reader, writer = await asyncio.open_connection(
    args.host, args.port) 2
  print(f'I am {writer.get_extra_info("sockname")}')
  channel = args.listen.encode()  3
  await send_msg(writer, channel)  4
  try:
    while data := await read_msg(reader):  5
      print(f'Received by {me}: {data[:20]}')
    print('Connection ended.')
  except asyncio.IncompleteReadError:
    print('Server closed.')
  finally:
    writer.close()
    await writer.wait_closed()


if __name__ == '__main__':
  parser = argparse.ArgumentParser() 6
  parser.add_argument('--host', default='localhost')
  parser.add_argument('--port', default=25000)
  parser.add_argument('--listen', default='/topic/foo')
  try:
    asyncio.run(main(parser.parse_args()))
  except KeyboardInterrupt:
    print('Bye!')
1

The uuid standard library module is a convenient way of creating an “identity” for this listener. If you start up multiple instances, each will have its own identity, and you’ll be able to track what is happening in the logs.

2

Open a connection to the server.

3

The channel to subscribe to is an input parameter, captured in args.listen. Encode it into bytes before sending.

4

By our protocol rules (as discussed in the broker code analysis previously), the first thing to do after connecting is to send the channel name to subscribe to.

5

This loop does nothing else but wait for data to appear on the socket.

6

The command-line arguments for this program make it easy to point to a host, a port, and a channel name to listen to.

The code for the other client, the sender program shown in Example 4-4, is similar in structure to the listener module.

Example 4-4. Sender: a toolkit for sending data to our message broker
# mq_client_sender.py
import asyncio
import argparse, uuid
from itertools import count
from msgproto import send_msg

async def main(args):
    me = uuid.uuid4().hex[:8]  1
    print(f'Starting up {me}')
    reader, writer = await asyncio.open_connection(
        host=args.host, port=args.port)  2
    print(f'I am {writer.get_extra_info("sockname")}')

    channel = b'/null'  3
    await send_msg(writer, channel) 4

    chan = args.channel.encode()  5
    try:
        for i in count():  6
            await asyncio.sleep(args.interval)  7
            data = b'X'*args.size or f'Msg {i} from {me}'.encode()
            try:
                await send_msg(writer, chan)
                await send_msg(writer, data) 8
            except OSError:
                print('Connection ended.')
                break
    except asyncio.CancelledError:
        writer.close()
        await writer.wait_closed()

if __name__ == '__main__':
  parser = argparse.ArgumentParser()  9
  parser.add_argument('--host', default='localhost')
  parser.add_argument('--port', default=25000, type=int)
  parser.add_argument('--channel', default='/topic/foo')
  parser.add_argument('--interval', default=1, type=float)
  parser.add_argument('--size', default=0, type=int)
  try:
    asyncio.run(main(parser.parse_args()))
  except KeyboardInterrupt:
    print('Bye!')
1

As with the listener, claim an identity.

2

Reach out and make a connection.

3

According to our protocol rules, the first thing to do after connecting to the server is to give the name of the channel to subscribe to; however, since we are a sender, we don’t really care about subscribing to any channels. Nevertheless, the protocol requires it, so just provide a null channel to subscribe to (we won’t actually listen for anything).

4

Send the channel to subscribe to.

5

The command-line parameter args.channel provides the channel to which we want to send messages. It must be converted to bytes first before sending.

6

Using itertools.count() is like a while True loop, except that we get an iteration variable to use. We use this in the debugging messages since it makes it a bit easier to track which message got sent from where.

7

The delay between sent messages is an input parameter, args.interval. The next line generates the message payload. It’s either a bytestring of specified size (args.size) or a descriptive message. This flexibility is just for testing.

8

Note that two messages are sent here: the first is the destination channel name, and the second is the payload.

9

As with the listener, there are a bunch of command-line options for tweaking the sender: channel determines the target channel to send to, while interval controls the delay between sends. The size parameter controls the size of each message payload.

We now have a broker, a listener, and a sender; it’s time to see some output. To produce the following code snippets, I started up the server, then two listeners, and then a sender. Then, after a few messages had been sent, I stopped the server with Ctrl-C. The server output is shown in Example 4-5, the sender output in Example 4-6, and the listener output in Examples 4-7 and 4-8.

Example 4-5. Message broker (server) output
$ mq_server.py
Remote ('127.0.0.1', 55382) subscribed to b'/queue/blah'
Remote ('127.0.0.1', 55386) subscribed to b'/queue/blah'
Remote ('127.0.0.1', 55390) subscribed to b'/null'
Sending to b'/queue/blah': b'Msg 0 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 1 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 2 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 3 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 4 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 5 from 6b5a8e1d'...
^CBye!
Remote ('127.0.0.1', 55382) closing connection.
Remote ('127.0.0.1', 55382) closed
Remote ('127.0.0.1', 55390) closing connection.
Remote ('127.0.0.1', 55390) closed
Remote ('127.0.0.1', 55386) closing connection.
Remote ('127.0.0.1', 55386) closed
Example 4-6. Sender (client) output
$ mq_client_sender.py --channel /queue/blah
Starting up 6b5a8e1d
I am ('127.0.0.1', 55390)
Connection ended.
Example 4-7. Listener 1 (client) output
$ mq_client_listen.py --listen /queue/blah
Starting up 9ae04690
I am ('127.0.0.1', 55382)
Received by 9ae04690: b'Msg 1 from 6b5a8e1d'
Received by 9ae04690: b'Msg 3 from 6b5a8e1d'
Received by 9ae04690: b'Msg 5 from 6b5a8e1d'
Server closed.
Example 4-8. Listener 2 (client) output
$ mq_client_listen.py --listen /queue/blah
Starting up bd4e3baa
I am ('127.0.0.1', 55386)
Received by bd4e3baa: b'Msg 0 from 6b5a8e1d'
Received by bd4e3baa: b'Msg 2 from 6b5a8e1d'
Received by bd4e3baa: b'Msg 4 from 6b5a8e1d'
Server closed.

Our toy message broker works. The code is also pretty easy to understand, given such a complex problem domain, but unfortunately, the design of the broker code itself is problematic.

The problem is that, for a particular client, we send messages to subscribers in the same coroutine as where new messages are received. This means that if any subscriber is slow to consume what we’re sending, it might take a long time for that await gather(...) line in Example 4-2 to complete, and we cannot receive and process more messages while we wait.

Instead, we need to decouple the receiving of messages from the sending of messages. In the next case study, we refactor our code to do exactly that.

Case Study: Improving the Message Queue

In this case study, we improve the design of our toy message broker. The listener and sender programs remain as is. The specific improvement in the new broker design is to decouple sending and receiving messages; this will resolve the problem where a slow subscriber would also slow down receiving new messages, as discussed in the previous section. The new code, shown in Example 4-9, is a bit longer but not terribly so.

Example 4-9. Message broker: improved design
# mq_server_plus.py
import asyncio
from asyncio import StreamReader, StreamWriter, Queue
from collections import deque, defaultdict
from contextlib import suppress
from typing import Deque, DefaultDict, Dict
from msgproto import read_msg, send_msg

SUBSCRIBERS: DefaultDict[bytes, Deque] = defaultdict(deque)
SEND_QUEUES: DefaultDict[StreamWriter, Queue] = defaultdict(Queue)
CHAN_QUEUES: Dict[bytes, Queue] = {}  1

async def client(reader: StreamReader, writer: StreamWriter):
  peername = writer.get_extra_info('peername')
  subscribe_chan = await read_msg(reader)
  SUBSCRIBERS[subscribe_chan].append(writer)  2
  send_task = asyncio.create_task(
      send_client(writer, SEND_QUEUES[writer]))  3
  print(f'Remote {peername} subscribed to {subscribe_chan}')
  try:
    while channel_name := await read_msg(reader):
      data = await read_msg(reader)
      if channel_name not in CHAN_QUEUES:  4
        CHAN_QUEUES[channel_name] = Queue(maxsize=10)  5
        asyncio.create_task(chan_sender(channel_name))  6
      await CHAN_QUEUES[channel_name].put(data)  7
  except asyncio.CancelledError:
    print(f'Remote {peername} connection cancelled.')
  except asyncio.IncompleteReadError:
    print(f'Remote {peername} disconnected')
  finally:
    print(f'Remote {peername} closed')
    await SEND_QUEUES[writer].put(None)  8
    await send_task  9
    del SEND_QUEUES[writer]  10
    SUBSCRIBERS[subscribe_chan].remove(writer)

async def send_client(writer: StreamWriter, queue: Queue):  11
    while True:
        try:
            data = await queue.get()
        except asyncio.CancelledError:
            continue

        if not data:
            break

        try:
            await send_msg(writer, data)
        except asyncio.CancelledError:
            await send_msg(writer, data)

    writer.close()
    await writer.wait_closed()

async def chan_sender(name: bytes):
    with suppress(asyncio.CancelledError):
        while True:
            writers = SUBSCRIBERS[name]
            if not writers:
                await asyncio.sleep(1)
                continue  12
            if name.startswith(b'/queue'):  13
                writers.rotate()
                writers = [writers[0]]
            if not (msg := await CHAN_QUEUES[name].get()): 14
                break
            for writer in writers:
                if not SEND_QUEUES[writer].full():
                    print(f'Sending to {name}: {msg[:19]}...')
                    await SEND_QUEUES[writer].put(msg)  15

async def main(*args, **kwargs):
    server = await asyncio.start_server(*args, **kwargs)
    async with server:
        await server.serve_forever()
try:
    asyncio.run(main(client, host='127.0.0.1', port=25000))
except KeyboardInterrupt:
    print('Bye!')
1

In the previous implementation, there were only SUBSCRIBERS; now there are SEND_QUEUES and CHAN_QUEUES as global collections. This is a consequence of completely decoupling the receiving and sending of data. SEND_QUEUES has one queue entry for each client connection: all data that must be sent to that client must be placed onto that queue. (If you peek ahead, the send_client() coroutine will pull data off SEND_QUEUES and send it.)

2

Up until this point in the client() coroutine function, the code is the same as in the simple server: the subscribed channel name is received, and we add the StreamWriter instance for the new client to the global SUBSCRIBERS collection.

3

This is new: we create a long-lived task that will do all the sending of data to this client. The task will run independently as a separate coroutine and will pull messages off the supplied queue, SEND_QUEUES[writer], for sending.

4

Now we’re inside the loop where we receive data. Remember that we always receive two messages: one for the destination channel name, and one for the data. We’re going to create a new, dedicated Queue for every destination channel, and that’s what CHAN_QUEUES is for: when any client wants to push data to a channel, we’re going to put that data onto the appropriate queue and then go immediately back to listening for more data. This approach decouples the distribution of messages from the receiving of messages from this client.

5

If there isn’t already a queue for the target channel, make one.

6

Create a dedicated and long-lived task for that channel. The coroutine chan_sender() will be responsible for taking data off the channel queue and distributing that data to subscribers.

7

Place the newly received data onto the specific channel’s queue. If the queue fills up, we’ll wait here until there is space for the new data. Waiting here means we won’t be reading any new data off the socket, which means that the client will have to wait on sending new data into the socket on its side. This isn’t necessarily a bad thing, since it communicates so-called back-pressure to this client. (Alternatively, you could choose to drop messages here if the use case is OK with that.)

8

When the connection is closed, it’s time to clean up. The long-lived task we created for sending data to this client, send_task, can be shut down by placing None onto its queue, SEND_QUEUES[writer] (check the code for send_client()). It’s important to use a value on the queue, rather than outright cancellation, because there may already be data on that queue and we want that data to be sent out before send_client() is ended.

9

Wait for that sender task to finish…

10

…then remove the entry in the SEND_QUEUES collection (and in the next line, we also remove the sock from the SUBSCRIBERS collection as before).

11

The send_client() coroutine function is very nearly a textbook example of pulling work off a queue. Note how the coroutine will exit only if None is placed onto the queue. Note also how we suppress CancelledError inside the loop: this is because we want this task to be closed only by receiving a None on the queue. This way, all pending data on the queue can be sent out before shutdown.

12

chan_sender() is the distribution logic for a channel: it sends data from a dedicated channel Queue instance to all the subscribers on that channel. But what happens if there are no subscribers for this channel yet? We’ll just wait a bit and try again. (Note, though, that the queue for this channel, CHAN_QUEUES[name], will keep filling up.)

13

As in our previous broker implementation, we do something special for channels whose name begins with /queue: we rotate the deque and send only to the first entry. This acts like a crude load-balancing system because each subscriber gets different messages off the same queue. For all other channels, all subscribers get all the messages.

14

We’ll wait here for data on the queue, and exit if None is received. Currently, this isn’t triggered anywhere (so these chan_sender() coroutines live forever), but if logic were added to clean up these channel tasks after, say, some period of inactivity, that’s how it would be done.

15

Data has been received, so it’s time to send to subscribers. We do not do the sending here: instead, we place the data onto each subscriber’s own send queue. This decoupling is necessary to make sure that a slow subscriber doesn’t slow down anyone else receiving data. And furthermore, if the subscriber is so slow that their send queue fills up, we don’t put that data on their queue; i.e., it is lost.

The preceding design produces the same output as the earlier, simplistic implementation, but now we can be sure that a slow listener will not interfere with message distribution to other listeners.

These two case studies show a progression in thinking around the design of a message distribution system. A key aspect was the realization that sending and receiving data might be best handled in separate coroutines, depending on the use case. In such instances, queues can be very useful for moving data between those different coroutines and for providing buffering to decouple them.

The more important goal of these case studies was to show how the streams API in asyncio makes it very easy to build socket-based applications.

Twisted

The Twisted project predates—dramatically—the asyncio standard library, and has been flying the flag of async programming in Python for around 14 years now. The project provides not only the basic building blocks, like an event loop, but also primitives like deferreds that are a bit like the futures in asyncio. The design of asyncio has been heavily influenced by Twisted and the extensive experience of its leaders and maintainers.

Note that asyncio does not replace Twisted. Twisted includes high-quality implementations of a huge number of internet protocols, including not only the usual HTTP but also XMPP, NNTP, IMAP, SSH, IRC, and FTP (both servers and clients). And the list goes on: DNS? Check. SMTP? Check. POP3? Check. The availability of these excellent internet protocol implementations continues to make Twisted compelling.

At the code level, the main difference between Twisted and asyncio, apart from history and historical context, is that for a long time Python lacked language support for coroutines, and this meant that Twisted and projects like it had to figure out ways of dealing with asynchronicity that worked with standard Python syntax.

For most of Twisted’s history, callbacks were the means by which async programming was done, with all the nonlinear complexity that entails; however, when it became possible to use generators as makeshift coroutines, it suddenly became possible to lay out code in Twisted in a linear fashion using its @defer.inlineCallbacks decorator, as shown in Example 4-10.

Example 4-10. Even more Twisted with inlined callbacks
@defer.inlineCallbacks  1
def f():
    yield
    defer.returnValue(123)  2

@defer.inlineCallbacks
def my_coro_func():
    value = yield f()  3
    assert value == 123
1

Ordinarily, Twisted requires creating instances of Deferred and adding callbacks to those instances as the method of constructing async programs. A few years ago, the @inlineCallbacks decorator was added, which repurposes generators as coroutines.

2

While @inlineCallbacks did allow you to write code that was linear in appearance (unlike callbacks), some hacks were required, such as this call to defer.returnValue(), which is how you have to return values from @inlineCallbacks coroutines.

3

Here we can see the yield that makes this function a generator. For @inlineCallbacks to work, there must be at least one yield present in the function being decorated.

Since native coroutines appeared in Python 3.5, the Twisted team (and Amber Brown in particular) have been working to add support for running Twisted on the asyncio event loop.

This is an ongoing effort, and my goal in this section is not to convince you to create all your applications as Twisted-asyncio hybrids, but rather to make you aware that work is currently being done to provide significant interoperability between the two.

For those of you with experience using Twisted, Example 4-11 might be jarring.

Example 4-11. Support for asyncio in Twisted
# twisted_asyncio.py
from time import ctime
from twisted.internet import asyncioreactor
asyncioreactor.install()  1
from twisted.internet import reactor, defer, task  2

async def main():  3
    for i in range(5):
        print(f'{ctime()} Hello {i}')
        await task.deferLater(reactor, 1, lambda: None)  4

defer.ensureDeferred(main())  5
reactor.run()  6
1

This is how you tell Twisted to use the asyncio event loop as its main reactor. Note that this line must come before the reactor is imported from twisted.internet on the following line.

2

Anyone familiar with Twisted programming will recognize these imports. We don’t have space to cover them in depth here, but in a nutshell, the reactor is the Twisted version of the asyncio loop, and defer and task are namespaces for tools to work with scheduling coroutines.

3

Seeing async def here, in a Twisted program, looks odd, but this is indeed what the new support for async/await gives us: the ability to use native coroutines directly in Twisted programs.

4

In the older @inlineCallbacks world, you would have used yield from here, but now we can use await, the same as in asyncio code. The other part of this line, deferLater(), is an alternative way to do the same thing as asyncio.sleep(1). We await a future where, after one second, a do-nothing callback will fire.

5

ensureDeferred() is a Twisted version of scheduling a coroutine. This would be analogous to loop.create_task() or asyncio.ensure_future().

6

Running the reactor is the same as loop.run_forever() in asyncio.

Running this script produces the following output:

$ twisted_asyncio.py
Mon Oct 16 16:19:49 2019 Hello 0
Mon Oct 16 16:19:50 2019 Hello 1
Mon Oct 16 16:19:51 2019 Hello 2
Mon Oct 16 16:19:52 2019 Hello 3
Mon Oct 16 16:19:53 2019 Hello 4

There’s much more to learn about Twisted. In particular, it’s well worth your time to go through the list of networking protocols it implements. There is still some work to be done, but the future looks very bright for interoperation between Twisted and asyncio.

asyncio has been designed in such a way that we can look forward to a future where it will be possible to incorporate code from many async frameworks, such as Twisted and Tornado, into a single application, with all code running on the same event loop.

The Janus Queue

The Janus queue (installed with pip install janus) provides a solution for communication between threads and coroutines. In the Python standard library, there are two kinds of queues:

queue.Queue

A blocking queue, commonly used for communication and buffering between threads

asyncio.Queue

An async-compatible queue, commonly used for communication and buffering between coroutines

Unfortunately, neither is useful for communication between threads and coroutines! This is where Janus comes in: it is a single queue that exposes both APIs, a blocking one and an async one. Example 4-12 generates data from inside a thread, places that data on a queue, and then consumes that data from a coroutine.

Example 4-12. Connecting coroutines and threads with a Janus queue
# janus_demo.py
import asyncio
import random
import time

import janus

async def main():
    loop = asyncio.get_running_loop()
    queue = janus.Queue(loop=loop)  1
    future = loop.run_in_executor(None, data_source, queue)
    while (data := await queue.async_q.get()) is not None:  2
        print(f'Got {data} off queue')  3
    print('Done.')

def data_source(queue):
    for i in range(10):
        r = random.randint(0, 4)
        time.sleep(r)  4
        queue.sync_q.put(r)  5
    queue.sync_q.put(None)

asyncio.run(main())
1

Create a Janus queue. Note that just like an asyncio.Queue, the Janus queue will be associated with a specific event loop. As usual, if you don’t provide the loop parameter, the standard get_event_loop() call will be used internally.

2

Our main() coroutine function simply waits for data on a queue. This line will suspend until there is data, exactly until there is data, exactly like calling get() on an asyncio.Queue instance. The queue object has two faces: this one is called async_q and provides the async-compatible queue API.

3

Print a message.

4

Inside the data_source() function, a random int is generated, which is used both as a sleep duration and a data value. Note that the time.sleep() call is blocking, so this function must be executed in a thread.

5

Place the data onto the Janus queue. This shows the other face of the Janus queue: sync_q, which provides the standard, blocking Queue API.

Here’s the output:

$ <name>
Got 2 off queue
Got 4 off queue
Got 4 off queue
Got 2 off queue
Got 3 off queue
Got 4 off queue
Got 1 off queue
Got 1 off queue
Got 0 off queue
Got 4 off queue
Done.

If you can, it’s better to aim for having short executor jobs, and in these cases, a queue (for communication) won’t be necessary. This isn’t always possible, though, and in such situations, the Janus queue can be the most convenient solution to buffer and distribute data between threads and coroutines.

aiohttp

aiohttp brings all things HTTP to asyncio, including support for HTTP clients and servers, as well as WebSocket support. Let’s jump straight into code examples, starting with simplicity itself: “Hello World.”

Case Study: Hello World

Example 4-13 shows a minimal web server using aiohttp.

Example 4-13. Minimal aiohttp example
from aiohttp import web

async def hello(request):
    return web.Response(text="Hello, world")

app = web.Application()  1
app.router.add_get('/', hello)  2
web.run_app(app, port=8080)  3
1

An Application instance is created.

2

A route is created, with the target coroutine hello() given as the handler.

3

The web application is run.

Observe that there is no mention of loops, tasks, or futures in this code: the developers of the aiohttp framework have hidden all that away from us, leaving a very clean API. This is going to be common in most frameworks that build on top of asyncio, which has been designed to allow framework designers to choose only the bits they need, and encapsulate them in their preferred API.

Case Study: Scraping the News

aiohttp can be used both as a server and a client library, like the very popular (but blocking!) requests library. I wanted to showcase aiohttp by using an example that incorporates both features.

In this case study, we’ll implement a website that does web scraping behind the scenes. The application will scrape two news websites and combine the headlines into one page of results. Here is the strategy:

  1. A browser client makes a web request to http://localhost:8080/news.

  2. Our web server receives the request, and then on the backend fetches HTML data from multiple news websites.

  3. Each page’s data is scraped for headlines.

  4. The headlines are sorted and formatted into the response HTML that we send back to the browser client.

Figure 4-1 shows the output.

uaip 0401
Figure 4-1. The final product of our news scraper: headlines from CNN are shown in one color, and Al Jazeera in another

Web scraping has become quite difficult nowadays. For example, if you try requests.get('http://edition.cnn.com'), you’re going to find that the response contains very little usable data! It has become increasingly necessary to be able to execute JavaScript locally in order to obtain data, because many sites use JavaScript to load their actual content. The process of executing such JavaScript to produce the final, complete HTML output is called rendering.

To accomplish rendering, we use a neat project called Splash, which describes itself as a “JavaScript rendering service.” It can run in a Docker container and provides an API for rendering other sites. Internally, it uses a (JavaScript-capable) WebKit engine to fully load and render a website. This is what we’ll use to obtain website data. Our aiohttp server, shown in Example 4-14, will call this Splash API to obtain the page data.

Tip

To obtain and run the Splash container, run these commands in your shell:

$ docker pull scrapinghub/splash
$ docker run --rm -p 8050:8050 scrapinghub/splash

Our server backend will call the Splash API at http://localhost:8050.

Example 4-14. Code for the news scraper
from asyncio import gather, create_task
from string import Template
from aiohttp import web, ClientSession
from bs4 import BeautifulSoup

async def news(request):  1
    sites = [
        ('http://edition.cnn.com', cnn_articles),  2
        ('http://www.aljazeera.com', aljazeera_articles),
    ]
    tasks = [create_task(news_fetch(*s)) for s in sites] 3
    await gather(*tasks)  4

    items = {  5
        text: (  6
            f'<div class="box {kind}">'
            f'<span>'
            f'<a href="{href}">{text}</a>'
            f'</span>'
            f'</div>'
        )
        for task in tasks for href, text, kind in task.result()
    }
    content = ''.join(items[x] for x in sorted(items))

    page = Template(open('index.html').read())  7
    return web.Response(
        body=page.safe_substitute(body=content),  8
        content_type='text/html',
    )

async def news_fetch(url, postprocess):
    proxy_url = (
        f'http://localhost:8050/render.html?'  9
        f'url={url}&timeout=60&wait=1'
    )
    async with ClientSession() as session:
        async with session.get(proxy_url) as resp:  10
            data = await resp.read()
            data = data.decode('utf-8')
    return postprocess(url, data)  11

def cnn_articles(url, page_data):  12
    soup = BeautifulSoup(page_data, 'lxml')
    def match(tag):
        return (
            tag.text and tag.has_attr('href')
            and tag['href'].startswith('/')
            and tag['href'].endswith('.html')
            and tag.find(class_='cd__headline-text')
        )
    headlines = soup.find_all(match)  13
    return [(url + hl['href'], hl.text, 'cnn')
            for hl in headlines]

def aljazeera_articles(url, page_data):  14
    soup = BeautifulSoup(page_data, 'lxml')
    def match(tag):
        return (
            tag.text and tag.has_attr('href')
            and tag['href'].startswith('/news')
            and tag['href'].endswith('.html')
        )
    headlines = soup.find_all(match)
    return [(url + hl['href'], hl. text, 'aljazeera')
            for hl in headlines]

app = web.Application()
app.router.add_get('/news', news)
web.run_app(app, port=8080)
1

The news() function is the handler for the /news URL on our server. It returns the HTML page showing all the headlines.

2

Here, we have only two news websites to be scraped: CNN and Al Jazeera. More could easily be added, but then additional postprocessors would also have to be added, just like the cnn_articles() and aljazeera_articles() functions that are customized to extract headline data.

3

For each news site, we create a task to fetch and process the HTML page data for its front page. Note that we unpack the tuple ((*s)) since the news_fetch() coroutine function takes both the URL and the postprocessing function as parameters. Each news_fetch() call will return a list of tuples as headline results, in the form <article URL>, <article title>.

4

All the tasks are gathered together into a single Future (gather() returns a future representing the state of all the tasks being gathered), and then we immediately await the completion of that future. This line will suspend until the future completes.

5

Since all the news_fetch() tasks are now complete, we collect all of the results into a dictionary. Note how nested comprehensions are used to iterate over tasks, and then over the list of tuples returned by each task. We also use f-strings to substitute data directly, including even the kind of page, which will be used in CSS to color the div background.

6

In this dictionary, the key is the headline title, and the value is an HTML string for a div that will be displayed in our result page.

7

Our web server is going to return HTML. We’re loading HTML data from a local file called index.html. This file is presented in Example B-1 if you want to re-create the case study yourself.

8

We substitute the collected headline div into the template and return the page to the browser client. This generates the page shown in Figure 4-1.

9

Here, inside the news_fetch() coroutine function, we have a tiny template for hitting the Splash API (which, for me, is running in a local Docker container on port 8050). This demonstrates how aiohttp can be used as an HTTP client.

10

The standard way is to create a ClientSession() instance, and then use the get() method on the session instance to perform the REST call. In the next line, the response data is obtained. Note that because we’re always operating on coroutines, with async with and await, this coroutine will never block: we’ll be able to handle many thousands of these requests, even though this operation (news_fetch()) might be relatively slow since we’re doing web calls internally.

11

After the data is obtained, we call the postprocessing function. For CNN, it’ll be cnn_articles(), and for Al Jazeera it’ll be aljazeera_articles().

12

We have space only for a brief look at the postprocessing. After getting the page data, we use the Beautiful Soup 4 library for extracting headlines.

13

The match() function will return all matching tags (I’ve manually checked the HTML source of these news websites to figure out which combination of filters extracts the best tags), and then we return a list of tuples matching the format <article URL>, <article title>.

14

This is the analogous postprocessor for Al Jazeera. The match() condition is slightly different, but it is otherwise the same as the CNN one.

Generally, you’ll find that aiohttp has a simple API and “stays out of your way” while you develop your applications.

In the next section, we’ll look at using ZeroMQ with asyncio, which has the curious effect of making socket programming quite enjoyable.

ØMQ (ZeroMQ)

Programming is a science dressed up as art, because most of us don’t understand the physics of software and it’s rarely, if ever, taught. The physics of software is not algorithms, data structures, languages, and abstractions. These are just tools we make, use, and throw away. The real physics of software is the physics of people. Specifically, it’s about our limitations when it comes to complexity and our desire to work together to solve large problems in pieces. This is the science of programming: make building blocks that people can understand and use easily, and people will work together to solve the very largest problems.

Pieter Hintjens, ZeroMQ: Messaging for Many Applications

ØMQ (or ZeroMQ) is a popular language-agnostic library for networking applications: it provides “smart” sockets. When you create ØMQ sockets in code, they resemble regular sockets, with recognizable method names like recv() and send() and so on—but internally these sockets handle some of the more annoying and tedious tasks required for working with conventional sockets.

One of the features it provides is management of message passing, so you don’t have to invent your own protocol and count bytes on the wire to figure out when all the bytes for a particular message have arrived—you simply send whatever you consider to be a “message,” and the whole thing arrives on the other end intact.

Another great feature is automatic reconnection logic. If the server goes down and comes back up later, the client ØMQ socket will automatically reconnect. And even better, messages your code sends into the socket will be buffered during the disconnected period, so they will all still be sent out when the server returns. These are some of the reasons ØMQ is sometimes referred to as brokerless messaging: it provides some of the features of message broker software directly in the socket objects themselves.

ØMQ sockets are already implemented as asynchronous internally (so they can maintain many thousands of concurrent connections, even when used in threaded code), but this is hidden from us behind the ØMQ API. Nevertheless, support for Asyncio has been added to the PyZMQ Python bindings for the ØMQ library, and in this section we’re going to look at several examples of how you might incorporate these smart sockets into your Python applications.

Case Study: Multiple Sockets

Here’s a head-scratcher: if ØMQ provides sockets that are already asynchronous, in a way that is usable with threading, what is the point of using ØMQ with asyncio? The answer is cleaner code.

To demonstrate, let’s look at a tiny case study in which you use multiple ØMQ sockets in the same application. First, Example 4-15 shows the blocking version (this example is taken from the zguide, the official guide for ØMQ).

Example 4-15. The traditional ØMQ approach
# poller.py
import zmq

context = zmq.Context()
receiver = context.socket(zmq.PULL)  1
receiver.connect("tcp://localhost:5557")

subscriber = context.socket(zmq.SUB)  2
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')

poller = zmq.Poller()  3
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

while True:
    try:
        socks = dict(poller.poll())  4
    except KeyboardInterrupt:
        break

    if receiver in socks:
        message = receiver.recv_json()
        print(f'Via PULL: {message}')

    if subscriber in socks:
        message = subscriber.recv_json()
        print(f'Via SUB: {message}')
1

ØMQ sockets have types. This is a PULL socket. You can think of it as a receive-only kind of socket that will be fed by some other send-only socket, which will be a PUSH type.

2

The SUB socket is another kind of receive-only socket, and it will be fed a PUB socket which is send-only.

3

If you need to move data between multiple sockets in a threaded ØMQ application, you’re going to need a poller. This is because these sockets are not thread-safe, so you cannot recv() on different sockets in different threads.1

4

It works similarly to the select() system call. The poller will unblock when there is data ready to be received on one of the registered sockets, and then it’s up to you to pull the data off and do something with it. The big if block is how you detect the correct socket.

Using a poller loop plus an explicit socket-selection block makes the code look a little clunky, but this approach avoids thread-safety problems by guaranteeing the same socket is not used from different threads.

Example 4-16 shows the server code.

Example 4-16. Server code
# poller_srv.py
import zmq, itertools, time

context = zmq.Context()
pusher = context.socket(zmq.PUSH)
pusher.bind("tcp://*:5557")

publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")

for i in itertools.count():
    time.sleep(1)
    pusher.send_json(i)
    publisher.send_json(i)

This code is not important for the discussion, but briefly: there’s a PUSH socket and a PUB socket, as I said earlier, and a loop inside that sends data to both sockets every second. Here’s sample output from poller.py (note: both programs must be running):

$ poller.py
Via PULL: 0
Via SUB: 0
Via PULL: 1
Via SUB: 1
Via PULL: 2
Via SUB: 2
Via PULL: 3
Via SUB: 3

The code works; however, our interest here is not whether the code runs, but rather whether asyncio has anything to offer for the structure of poller.py. The key thing to understand is that our asyncio code is going to run in a single thread, which means that it’s fine to handle different sockets in different coroutines—and indeed, this is exactly what we’ll do.

Of course, someone had to do the hard work to add support for coroutines into pyzmq (the Python client library for ØMQ) itself for this to work, so it wasn’t free. But we can take advantage of that hard work to improve on the “traditional” code structure, as shown in Example 4-17.

Example 4-17. Clean separation with asyncio
# poller_aio.py
import asyncio
import zmq
from zmq.asyncio import Context

context = Context()

async def do_receiver():
    receiver = context.socket(zmq.PULL)  1
    receiver.connect("tcp://localhost:5557")
    while message := await receiver.recv_json():  2
        print(f'Via PULL: {message}')

async def do_subscriber():
    subscriber = context.socket(zmq.SUB)  3
    subscriber.connect("tcp://localhost:5556")
    subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
    while message := await subscriber.recv_json():  4
        print(f'Via SUB: {message}')

async def main():
    await asyncio.gather(
        do_receiver(),
        do_subscriber(),
    )

asyncio.run(main())
1

This code sample does the same as Example 4-15, except that now we’re taking advantage of coroutines to restructure everything. Now we can deal with each socket in isolation. I’ve created two coroutine functions, one for each socket; this one is for the PULL socket.

2

I’m using the asyncio support in pyzmq, which means that all send() and recv() calls must use the await keyword. The Poller no longer appears anywhere, because it’s been integrated into the asyncio event loop itself.

3

This is the handler for the SUB socket. The structure is very similar to the PULL socket’s handler, but that need not have been the case. If more complex logic had been required, I’d have been able to easily add it here, fully encapsulated within the SUB-handler code only.

4

Again, the asyncio-compatible sockets require the await keyword to send and receive.

The output is the same as before, so I won’t show it.

The use of coroutines has, in my opinion, a staggeringly positive effect on the code layout in these examples. In real production code with lots of ØMQ sockets, the coroutine handlers for each could even be in separate files, providing more opportunities for better code structure. And even for programs with a single read/write socket, it is very easy to use separate coroutines for reading and writing, if necessary.

The improved code looks a lot like threaded code, and indeed, for the specific example shown here, the same refactor will work for threading: run blocking do_receiver() and do_subscriber() functions in separate threads. But do you really want to deal with even the potential for race conditions, especially as your application grows in features and complexity over time?

There is lots to explore here, and as I said before, these magic sockets are a lot of fun to play with. In the next case study, we’ll look at a more practical use of ØMQ.

Case Study: Application Performance Monitoring

With the modern, containerized, microservice-based deployment practices of today, some things that used to be trivial, such as monitoring your apps’ CPU and memory usage, have become somewhat more complicated than just running top. Several commercial products have emerged over the last few years to deal with these problems, but their cost can be prohibitive for small startup teams and hobbyists.

In this case study, I’ll exploit ØMQ and asyncio to build a toy prototype for distributed application monitoring. Our design has three parts:

Application layer

This layer contains all our applications. Examples might be a “customers” microservice, a “bookings” microservice, an “emailer” microservice, and so on. I will add a ØMQ “transmitting” socket to each of our applications. This socket will send performance metrics to a central server.

Collection layer

The central server will expose a ØMQ socket to collect the data from all the running application instances. The server will also serve a web page to show performance graphs over time and will live-stream the data as it comes in.

Visualization layer

This is the web page being served. We’ll display the collected data in a set of charts, and the charts will live-update in real time. To simplify the code samples, I will use the convenient Smoothie Charts JavaScript library, which provides all the necessary client-side features.

The backend app (application layer) that produces metrics is shown in Example 4-18.

Example 4-18. The application layer: producing metrics
import argparse
import asyncio
from random import randint, uniform
from datetime import datetime as dt
from datetime import timezone as tz
from contextlib import suppress
import zmq, zmq.asyncio, psutil

ctx = zmq.asyncio.Context()

async def stats_reporter(color: str):  1
    p = psutil.Process()
    sock = ctx.socket(zmq.PUB)  2
    sock.setsockopt(zmq.LINGER, 1)
    sock.connect('tcp://localhost:5555')  3
    with suppress(asyncio.CancelledError):  4
        while True:  5
            await sock.send_json(dict(  6
                color=color,
                timestamp=dt.now(tz=tz.utc).isoformat(),  7
                cpu=p.cpu_percent(),
                mem=p.memory_full_info().rss / 1024 / 1024
            ))
            await asyncio.sleep(1)
    sock.close()  8

async def main(args):
    asyncio.create_task(stats_reporter(args.color))
    leak = []
    with suppress(asyncio.CancelledError):
        while True:
            sum(range(randint(1_000, 10_000_000)))  9
            await asyncio.sleep(uniform(0, 1))
            leak += [0] * args.leak

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--color', type=str)  10
    parser.add_argument('--leak', type=int, default=0)
    args = parser.parse_args()
    try:
        asyncio.run(main(args))
    except KeyboardInterrupt:
        print('Leaving...')
        ctx.term()  11
1

This coroutine function will run as a long-lived coroutine, continually sending out data to the server process.

2

Create a ØMQ socket. As you know, there are different flavors of socket; this one is a PUB type, which allows one-way messages to be sent to another ØMQ socket. This socket has—as the ØMQ guide says—superpowers. It will automatically handle all reconnection and buffering logic for us.

3

Connect to the server.

4

Our shutdown sequence is driven by KeyboardInterrupt, farther down. When that signal is received, all the tasks will be cancelled. Here I handle the raised CancelledError with the handy suppress() context manager from the contextlib standard library module.

5

Iterate forever, sending out data to the server.

6

Since ØMQ knows how to work with complete messages, and not just chunks off a bytestream, it opens the door to a bunch of useful wrappers around the usual sock.send() idiom: here, I use one of those helper methods, send_json(), which will automatically serialize the argument into JSON. This allows us to use a dict() directly.

7

A reliable way to transmit datetime information is via the ISO 8601 format. This is especially true if you have to pass datetime data between software written in different languages, since the vast majority of language implementations will be able to work with this standard.

8

To end up here, we must have received the CancelledError exception resulting from task cancellation. The ØMQ socket must be closed to allow program shutdown.

9

The main() function symbolizes the actual microservice application. Fake work is produced with this sum over random numbers, just to give us some nonzero data to view in the visualization layer a bit later.

10

I’m going to create multiple instances of this application, so it will be convenient to be able to distinguish between them (later, in the graphs) with a --color parameter.

11

Finally, the ØMQ context can be terminated.

The primary point of interest is the stats_reporter() function. This is what streams out metrics data (collected by the useful psutil library). The rest of the code can be assumed to be a typical microservice application.

The server code in Example 4-19 collects all the data and serves it to a web client.

Example 4-19. The collection layer: this server collects process stats
# metric-server.py
import asyncio
from contextlib import suppress
import zmq
import zmq.asyncio
import aiohttp
from aiohttp import web
from aiohttp_sse import sse_response
from weakref import WeakSet
import json

# zmq.asyncio.install()
ctx = zmq.asyncio.Context()
connections = WeakSet()  1

async def collector():
    sock = ctx.socket(zmq.SUB)  2
    sock.setsockopt_string(zmq.SUBSCRIBE, '')  3
    sock.bind('tcp://*:5555')  4
    with suppress(asyncio.CancelledError):
        while data := await sock.recv_json():  5
            print(data)
            for q in connections:
                await q.put(data)  6
    sock.close()

async def feed(request):  7
    queue = asyncio.Queue()
    connections.add(queue)  8
    with suppress(asyncio.CancelledError):
        async with sse_response(request) as resp:  9
            while data := await queue.get():  10
                print('sending data:', data)
                resp.send(json.dumps(data))  11
    return resp

async def index(request):  12
    return aiohttp.web.FileResponse('./charts.html')

async def start_collector(app):  13
    app['collector'] = app.loop.create_task(collector())

async def stop_collector(app):
    print('Stopping collector...')
    app['collector'].cancel()  14
    await app['collector']
    ctx.term()

if __name__ == '__main__':
    app = web.Application()
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/feed', feed)
    app.on_startup.append(start_collector)  15
    app.on_cleanup.append(stop_collector)
    web.run_app(app, host='127.0.0.1', port=8088)
1

One half of this program will receive data from other applications, and the other half will provide data to browser clients via server-sent events (SSEs). I use a WeakSet() to keep track of all the currently connected web clients. Each connected client will have an associated Queue() instance, so this connections identifier is really a set of queues.

2

Recall that in the application layer, I used a zmq.PUB socket; here in the collection layer, I use its partner, the zmq.SUB socket type. This ØMQ socket can only receive, not send.

3

For the zmq.SUB socket type, providing a subscription name is required, but for our purposes, we’ll just take everything that comes in—hence the empty topic name.

4

I bind the zmq.SUB socket. Think about that for second. In pub-sub configurations, you usually have to make the pub end the server (bind()) and the sub end the client (connect()). ØMQ is different: either end can be the server. For our use case, this is important, because each of our application-layer instances will be connecting to the same collection server domain name, and not the other way around.

5

The support for asyncio in pyzmq allows us to await data from our connected apps. And not only that, but the incoming data will be automatically deserialized from JSON (yes, this means data is a dict()).

6

Recall that our connections set holds a queue for every connected web client. Now that data has been received, it’s time to send it to all the clients: the data is placed onto each queue.

7

The feed() coroutine function will create coroutines for each connected web client. Internally, server-sent events are used to push data to the web clients.

8

As described earlier, each web client will have its own queue instance, in order to receive data from the collector() coroutine. The queue instance is added to the connections set, but because connections is a weak set, the entry will automatically be removed from connections when the queue goes out of scope—i.e., when a web client disconnects. Weakrefs are great for simplifying these kinds of bookkeeping tasks.

9

The aiohttp_sse package provides the sse_response() context manager. This gives us a scope inside which to push data to the web client.

10

We remain connected to the web client, and wait for data on this specific client’s queue.

11

As soon as the data comes in (inside collector()), it will be sent to the connected web client. Note that I reserialize the data dict here. An optimization to this code would be to avoid deserializing JSON in collector(), and instead use sock.recv_string() to avoid the serialization round trip. Of course, in a real scenario, you might want to deserialize in the collector, and perform some validation on the data before sending it to the browser client. So many choices!

12

The index() endpoint is the primary page load, and here we serve a static file called charts.html.

13

The aiohttp library provides facilities for us to hook in additional long-lived coroutines we might need. With the collector() coroutine, we have exactly that situation, so I create a startup coroutine, start_collector(), and a shutdown coroutine. These will be called during specific phases of aiohttp’s startup and shutdown sequence. Note that I add the collector task to the app itself, which implements a mapping protocol so that you can use it like a dict.

14

I obtain our collector() coroutine off the app identifier and call cancel() on that.

15

Finally, you can see where the custom startup and shutdown coroutines are hooked in: the app instance provides hooks to which our custom coroutines may be appended.

All that remains is the visualization layer, shown in Example 4-20. I’m using the Smoothie Charts library to generate scrolling charts, and the complete HTML for our main (and only) web page, charts.html, is provided in the Example B-1. There is too much HTML, CSS, and JavaScript to present in this section, but I do want to highlight a few points about how the server-sent events are handled in JavaScript in the browser client.

Example 4-20. The visualization layer, which is a fancy way of saying “the browser”
<snip>
var evtSource = new EventSource("/feed");  1
evtSource.onmessage = function(e) {
    var obj = JSON.parse(e.data);  2
    if (!(obj.color in cpu)) {
        add_timeseries(cpu, cpu_chart, obj.color);
    }
    if (!(obj.color in mem)) {
        add_timeseries(mem, mem_chart, obj.color);
    }
    cpu[obj.color].append(
        Date.parse(obj.timestamp), obj.cpu);  3
    mem[obj.color].append(
        Date.parse(obj.timestamp), obj.mem);
};
<snip>
1

Create a new EventSource() instance on the /feed URL. The browser will connect to /feed on our server, (metric_server.py). Note that the browser will automatically try to reconnect if the connection is lost. Server-sent events are often overlooked, but in many situations their simplicity makes them preferable to WebSockets.

2

The onmessage event will fire every time the server sends data. Here the data is parsed as JSON.

3

The cpu identifier is a mapping of a color to a TimeSeries() instance (for more on this, see Example B-1). Here, we obtain that time series and append data to it. We also obtain the timestamp and parse it to get the correct format required by the chart.

Now we can run the code. To get the whole show moving, a bunch of command-line instructions are required, the first of which is to start up the data collector process:

$ metric-server.py
======== Running on http://127.0.0.1:8088 ========
(Press CTRL+C to quit)

The next step is to start up all the microservice instances. These will send their CPU and memory usage metrics to the collector. Each will be identified by a different color, which is specified on the command line. Note how two of the microservices are told to leak some memory:

$ backend-app.py --color red &
$ backend-app.py --color blue --leak 10000 &
$ backend-app.py --color green --leak 100000 &

Figure 4-2 shows our final product in a browser. You’ll have to take my word for it that the graphs really do animate. You’ll notice in the preceding command lines that I added some memory leakage to blue, and a lot to green. I even had to restart the green service a few times to prevent it from climbing over 100 MB.

uaip 0402
Figure 4-2. We’d better get an SRE on green ASAP!

What is especially interesting about this project is this: any of the running instances in any part of this stack can be restarted, and no reconnect-handling code is necessary. The ØMQ sockets, along with the EventSource() JavaScript instance in the browser, magically reconnect and pick up where they left off.

In the next section, we turn our attention to databases and to how asyncio might be used to design a system for cache invalidation.

asyncpg and Sanic

The asyncpg library provides client access to the PostgreSQL database, but differentiates itself from other asyncio-compatible Postgres client libraries with its emphasis on speed. asyncpg is authored by Yury Selivanov, one of the core asyncio Python developers, who is also the author of the uvloop project. It has no third-party dependencies, although Cython is required if you’re installing from source.

asyncpg achieves its speed by working directly against the PostgreSQL binary protocol, and other advantages to this low-level approach include support for prepared statements and scrollable cursors.

We’ll be looking at a case study using asyncpg for cache invalidation, but before that it will be useful to get a basic understanding of the API asyncpg provides. For all of the code in this section, we’ll need a running instance of PostgreSQL. This is most easily done with Docker, using the following command:

$ docker run -d --rm -p 55432:5432 postgres

Note that I’ve exposed port 55432 rather than the default, 5432, just in case you already have a running instance of the database on the default port. Example 4-21 briefly demonstrates how to use asyncpg to talk to PostgreSQL.

Example 4-21. Basic demo of asyncpg
# asyncpg-basic.py
import asyncio
import asyncpg
import datetime
from util import Database  1

async def main():
    async with Database('test', owner=True) as conn:  2
        await demo(conn)

async def demo(conn: asyncpg.Connection):
    await conn.execute('''
        CREATE TABLE users(
            id serial PRIMARY KEY,
            name text,
            dob date
        )'''
    )  3

    pk = await conn.fetchval(  4
        'INSERT INTO users(name, dob) VALUES($1, $2) '
        'RETURNING id', 'Bob', datetime.date(1984, 3, 1)
    )

    async def get_row():  5
        return await conn.fetchrow(  6
            'SELECT * FROM users WHERE name = $1',
            'Bob'
        )
    print('After INSERT:', await get_row())  7

    await conn.execute(
        'UPDATE users SET dob = $1 WHERE id=1',
        datetime.date(1985, 3, 1)  8
    )
    print('After UPDATE:', await get_row())

    await conn.execute(
        'DELETE FROM users WHERE id=1'
    )
    print('After DELETE:', await get_row())

if __name__ == '__main__':
    asyncio.run(main())
1

I’ve hidden some boilerplate away in a tiny util module to simplify things and keep the core message.

2

The Database class gives us a context manager that will create a new database for us—in this, case named test—and will destroy that database when the context manager exits. This turns out to be very useful when experimenting with ideas in code. Because no state is carried over between experiments, you start from a clean database every time. Note that this is an async with context manager; we’ll talk more about that later, but for now, the focal area of this demo is what happens inside the demo() coroutine.

3

The Database context manager has provided us with a Connection instance, which is immediately used to create a new table, users.

4

I use fetchval() to insert a new record. While I could have used execute() to do the insertion, the benefit of using fetchval() is that I can obtain the id of the newly inserted record, which I store in the pk identifier.

Note that I use parameters ($1 and $2) for passing data to the SQL query. Never use string interpolation or concatenation to build queries, as this is a security risk!

5

In the remainder of this demo, I’m going to be manipulating data in the users table, so here I make a new utility coroutine function that fetches a record in the table. This will be called several times.

6

When retrieving data, it is far more useful to use the fetch-based methods, because these will return Record objects. asyncpg will automatically cast datatypes to the most appropriate types for Python.

7

I immediately use the get_row() helper to display the newly inserted record.

8

I modify data by using the UPDATE command for SQL. It’s a tiny modification: the year value in the date of birth is changed by one year. As before, this is performed with the connection’s execute() method. The remainder of the code demo follows the same structure as seen so far, and a DELETE, followed by another print(), happens a few lines down.

Here’s the output of running this script:

$ asyncpg-basic.py
After INSERT: <Record id=1 name='Bob' dob=datetime.date(1984, 3, 1)>
After UPDATE: <Record id=1 name='Bob' dob=datetime.date(1985, 3, 1)>
After DELETE: None

Note how the date value retrieved in our Record object has been converted to a Python date object: asyncpg has automatically converted the datatype from the SQL type to its Python counterpart. A large table of type conversions in the asyncpg documentation describes all the type mappings that are built into the library.

The preceding code is very simple, perhaps even crudely so if you’re used to the convenience of object-relational mappers (ORMs) like SQLAlchemy or the Django web framework’s built-in ORM. At the end of this chapter, I mention several third-party libraries that provide access to ORMs or ORM-like features for asyncpg.

Example 4-22 shows my boilerplate Database object in the utils module; you may find it useful to make something similar for your own experiments.

Example 4-22. Useful tooling for your asyncpg experiments
# util.py
import argparse, asyncio, asyncpg
from asyncpg.pool import Pool

DSN = 'postgresql://{user}@{host}:{port}'
DSN_DB = DSN + '/{name}'
CREATE_DB = 'CREATE DATABASE {name}'
DROP_DB = 'DROP DATABASE {name}'

class Database:
    def __init__(self, name, owner=False, **kwargs):
        self.params = dict(
            user='postgres', host='localhost',
            port=55432, name=name)  1
        self.params.update(kwargs)
        self.pool: Pool = None
        self.owner = owner
        self.listeners = []

    async def connect(self) -> Pool:
        if self.owner:
            await self.server_command(
                CREATE_DB.format(**self.params))  3

        self.pool = await asyncpg.create_pool(  4
            DSN_DB.format(**self.params))
        return self.pool

    async def disconnect(self):
        """Destroy the database"""
        if self.pool:
            releases = [self.pool.release(conn)
                        for conn in self.listeners]
            await asyncio.gather(*releases)
            await self.pool.close()  5
        if self.owner:
            await self.server_command(  6
                DROP_DB.format(**self.params))

    async def __aenter__(self) -> Pool:  2
        return await self.connect()

    async def __aexit__(self, *exc):
        await self.disconnect()

    async def server_command(self, cmd):  7
        conn = await asyncpg.connect(
            DSN.format(**self.params))
        await conn.execute(cmd)
        await conn.close()

    async def add_listener(self, channel, callback):  8
        conn: asyncpg.Connection = await self.pool.acquire()
        await conn.add_listener(channel, callback)
        self.listeners.append(conn)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--cmd', choices=['create', 'drop'])
    parser.add_argument('--name', type=str)
    args = parser.parse_args()
    d = Database(args.name, owner=True)
    if args.cmd == 'create':
        asyncio.run(d.connect())
    elif args.cmd == 'drop':
        asyncio.run(d.disconnect())
    else:
        parser.print_help()
1

The Database class is just a fancy context manager for creating and deleting a database from a PostgreSQL instance. The database name is passed into the constructor.

2

(Note: The sequence of callouts in the code is intentionally different from this list.) This is an asynchronous context manager. Instead of the usual __enter__() and __exit__() methods, I use their __aenter__() and __aexit__() counterparts.

3

Here, in the entering side, I’ll create the new database and return a connection to that new database. server_command() is another helper method defined a few lines down. I use it to run the command for creating our new database.

4

I then make a connection to the newly created database. Note that I’ve hardcoded several details about the connection: this is intentional, as I wanted to keep the code samples small. You could easily generalize this by making fields for the username, hostname, and port.

5

In the exiting side of the context manager, I close the connection and…

6

…destroy the database.

7

For completeness, this is our utility method for running commands against the PostgreSQL server itself. It creates a connection for that purpose, runs the given command, and exits.

8

This function creates a long-lived socket connection to the database that will listen for events. This mechanism will be featured in the upcoming case study.

Caution

In point 8 for the preceding code, I created a dedicated connection for each channel I want to listen on. This is expensive since it means that a PostgreSQL worker will be completely tied up for every channel being listened to. A much better design would be to use one connection for multiple channels. Once you have worked through this example, try to modify the code to use a single connection for multiple channel listeners.

Now that you have an understanding of the basic building blocks of asyncpg, we can explore it further with a really fun case study: using PostgreSQL’s built-in support for sending event notifications to perform cache invalidation!

Case Study: Cache Invalidation

There are two hard things in computer science: cache invalidation, naming things, and off-by-one errors.

Phil Karlton

It is common in web services and web applications that the persistence layer, i.e., the backing database (DB), becomes the performance bottleneck sooner than any other part of the stack. The application layer can usually be scaled horizontally by running more instances, whereas it’s trickier to do that with a database.

This is why it’s common practice to look at design options that can limit excessive interaction with the database. The most common option is to use caching to “remember” previously fetched database results and replay them when asked, thus avoiding subsequent calls to the DB for the same information.

However, what happens if one of your app instances writes new data to the database while another app instance is still returning the old, stale data from its internal cache? This is a classic cache invalidation problem, and it can be very difficult to resolve in a robust way.

Our attack strategy is as follows:

  1. Each app instance has an in-memory cache of DB queries.

  2. When one writes new data to the database, the database alerts all of the connected app instances of the new data.

  3. Each app instance then updates its internal cache accordingly.

This case study will highlight how PostgreSQL, with its built-in support for event updates via the LISTEN and NOTIFY commands, can simply tell us when its data has changed.

asyncpg already has support for the LISTEN/NOTIFY API. This feature of PostgreSQL allows your app to subscribe to events on a named channel and to post events to named channels. PostgreSQL can almost become a lighter version of RabbitMQ or ActiveMQ!

This case study has more moving parts than usual, and that makes it awkward to present in the usual linear format. Instead, we’ll begin by looking at the final product, and work backward toward the underlying implementation.

Our app provides a JSON-based API server for managing the favorite dishes of patrons at our robotic restaurant. The backing database will have only one table, patron, with only two fields: name and fav_dish. Our API will allow the usual set of four operations: create, read, update, and delete (CRUD).

The following is a sample interaction with our API using curl, illustrating how to create a new entry in our database (I haven’t yet shown how to start up the server running on localhost:8000; that will come later):

$ curl -d '{"name": "Carol", "fav_dish": "SPAM Bruschetta"}' \
    -H "Content-Type: application/json" \
    -X POST \
    http://localhost:8000/patron
{"msg":"ok","id":37}

The -d parameter is for data,2 -H is for the HTTP headers, -X is for the HTTP request method (alternatives include GET, DELETE, PUT, and a few others), and the URL is for our API server. We’ll get to the code for that shortly.

In the output, we see that the creation was ok, and the id being returned is the primary key of the new record in the database.

In the next few shell snippets, we’ll run through the other three operations: read, update, and delete. We can read the patron record we just created with this command:

$ curl -X GET http://localhost:8000/patron/37
{"id":37,"name":"Carol","fav_dish":"SPAM Bruschetta"}

Reading the data is pretty straightforward. Note that the id of the desired record must be supplied in the URL.

Next, we’ll update the record and check the results:

$ curl -d '{"name": "Eric", "fav_dish": "SPAM Bruschetta"}' \
    -H "Content-Type: application/json" \
    -X PUT \
    http://localhost:8000/patron/37
$ curl -X GET http://localhost:8000/patron/37
{"msg":"ok"}
{"id":37,"name":"Eric","fav_dish":"SPAM Bruschetta"}

Updating a resource is similar to creating one, with two key differences:

  • The HTTP request method (-X) is PUT, not POST.

  • The URL now requires the id field to specify which resource to update.

Finally, we can delete the record and verify its deletion with the following commands:

$ curl -X DELETE http://localhost:8000/patron/37
$ curl -X GET http://localhost:8000/patron/37
{"msg":"ok"}
null

As you can see, null is returned when you try to GET a record that doesn’t exist.

So far this all looks quite ordinary, but our objective is not only to make a CRUD API—we want to look at cache invalidation. So, let’s turn our attention toward the cache. Now that we have a basic understanding of our app’s API, we can look at the application logs to see timing data for each request: this will tell us which requests are cached, and which hit the DB.

When the server is first started up, the cache is empty; it’s a memory cache, after all. We’re going to start up our server, and then in a separate shell run two GET requests in quick succession:

$ curl -X GET http://localhost:8000/patron/29
$ curl -X GET http://localhost:8000/patron/29
{"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"}
{"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"}

We expect that the first time we retrieve our record, there’s going to be a cache miss, and the second time, a hit. We can see evidence of this in the log for the API server itself (the first Sanic web server, running on localhost:8000):

$ sanic_demo.py
2019-09-29 16:20:33 - (sanic)[DEBUG]:
                 ▄▄▄▄▄
        ▀▀▀██████▄▄▄       _______________
      ▄▄▄▄▄  █████████▄  /                 \
     ▀▀▀▀█████▌ ▀▐▄ ▀▐█ |   Gotta go fast!  |
   ▀▀█████▄▄ ▀██████▄██ | _________________/
   ▀▄▄▄▄▄  ▀▀█▄▀█════█▀ |/
        ▀▀▀▄  ▀▀███ ▀       ▄▄
     ▄███▀▀██▄████████▄ ▄▀▀▀▀▀▀█▌
   ██▀▄▄▄██▀▄███▀ ▀▀████      ▄██
▄▀▀▀▄██▄▀▀▌████▒▒▒▒▒▒███     ▌▄▄▀
▌    ▐▀████▐███▒▒▒▒▒▐██▌
▀▄▄▄▄▀   ▀▀████▒▒▒▒▄██▀
          ▀▀█████████▀
        ▄▄██▀██████▀█
      ▄██▀     ▀▀▀  █
     ▄█             ▐▌
 ▄▄▄▄█▌              ▀█▄▄▄▄▀▀▄
▌     ▐                ▀▀▄▄▄▀
 ▀▀▄▄▀

2019-09-29 16:20:33 (sanic): Goin' Fast @ http://0.0.0.0:8000
2019-09-29 16:20:33 (sanic): Starting worker [10366]  1
2019-09-29 16:25:27 (perf): id=37 Cache miss  2
2019-09-29 16:25:27 (perf): get Elapsed: 4.26 ms 3
2019-09-29 16:25:27 (perf): get Elapsed: 0.04 ms 4
1

Everything up to this line is the default sanic startup log message.

2

As described, the first GET results in a cache miss because the server has only just started.

3

This is from our first curl -X GET. I’ve added some timing functionality to the API endpoints. Here we can see that the handler for the GET request took ~4 ms.

4

The second GET returns data from the cache, and the much faster (100x faster!) timing data.

So far, nothing unusual. Many web apps use caching in this way.

Now let’s start up a second app instance on port 8001 (the first instance was on port 8000):

$ sanic_demo.py --port 8001
<snip>
2017-10-02 08:09:56 - (sanic): Goin' Fast @ http://0.0.0.0:8001
2017-10-02 08:09:56 - (sanic): Starting worker [385]

Both instances, of course, connect to the same database. Now, with both API server instances running, let’s modify the data for patron John, who clearly lacks sufficient Spam in his diet. Here we perform an UPDATE against the first app instance at port 8000:

$ curl -d '{"name": "John Cleese", "fav_dish": "SPAM on toast"}' \
    -H "Content-Type: application/json" \
    -X PUT \
    http://localhost:8000/patron/29
{"msg":"ok"}

Immediately after this update event on only one of the app instances, both API servers, 8000 and 8001, report the event in their respective logs:

2019-10-02 08:35:49 - (perf)[INFO]: Got DB event:
{
    "table": "patron",
    "id": 29,
    "type": "UPDATE",
    "data": {
        "old": {
            "id": 29,
            "name": "John Cleese",
            "fav_dish": "Gravy on Toast"
        },
        "new": {
            "id": 29,
            "name": "John Cleese",
            "fav_dish": "SPAM on toast"
        },
        "diff": {
            "fav_dish": "SPAM on toast"
        }
    }
}

The database has reported the update event back to both app instances. We haven’t done any requests against app instance 8001 yet, though—does this mean that the new data is already cached there?

To check, we can do a GET on the second server, at port 8001:

$ curl -X GET http://localhost:8001/patron/29
{"id":29,"name":"John Cleese","fav_dish":"SPAM on toast"}

The timing info in the log output shows that we do indeed obtain the data directly from the cache, even though this is our first request:

2019-10-02 08:46:45 - (perf)[INFO]: get Elapsed: 0.04 ms

The upshot is that when the database changes, all connected app instances get notified, allowing them to update their caches.

With this explanation out of the way, we can now look at the asyncpg code implementation required to make our cache invalidation actually work. The basic design for the server code shown in Example 4-23 is the following:

  1. We have a simple web API using the new, asyncio-compatible Sanic web framework.

  2. The data will be stored in a backend PostgreSQL instance, but the API will be served via multiple instances of the web API app servers.

  3. The app servers will cache data from the database.

  4. The app servers will subscribe to events via asyncpg in specific tables on the DB, and will receive update notifications when the data in the DB table has been changed. This allows the app servers to update their individual in-memory caches.

Example 4-23. API server with Sanic
# sanic_demo.py
import argparse
from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import json
from util import Database  1
from perf import aelapsed, aprofiler  2
import model

app = Sanic()  3

@aelapsed
async def new_patron(request):  4
    data = request.json  5
    id = await model.add_patron(app.pool, data)  6
    return json(dict(msg='ok', id=id))  7

class PatronAPI(HTTPMethodView, metaclass=aprofiler):  8
    async def get(self, request, id):
        data = await model.get_patron(app.pool, id)  9
        return json(data)

    async def put(self, request, id):
        data = request.json
        ok = await model.update_patron(app.pool, id, data)
        return json(dict(msg='ok' if ok else 'bad'))  10

    async def delete(self, request, id):
        ok = await model.delete_patron(app.pool, id)
        return json(dict(msg='ok' if ok else 'bad'))

@app.listener('before_server_start')  11
async def db_connect(app, loop):
    app.db = Database('restaurant', owner=False)  12
    app.pool = await app.db.connect()  13
    await model.create_table_if_missing(app.pool)  14
    await app.db.add_listener('chan_patron', model.db_event)  15

@app.listener('after_server_stop')  16
async def db_disconnect(app, loop):
    await app.db.disconnect()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--port', type=int, default=8000)
    args = parser.parse_args()
    app.add_route(
        new_patron, '/patron', methods=['POST'])  17
    app.add_route(
        PatronAPI.as_view(), '/patron/<id:int>')  18
    app.run(host="0.0.0.0", port=args.port)
1

The Database utility helper, as described earlier. This will provide the methods required to connect to the database.

2

Two more tools I’ve cobbled together to log the elapsed time of each API endpoint. I used this in the previous discussion to detect when a GET was being returned from the cache. The implementations for aelapsed() and aprofiler() are not important for this case study, but you can obtain them in Example B-1.

3

We create the main Sanic app instance.

4

This coroutine function is for creating new patron entries. In an add_route() call toward the bottom of the code, new_patron() is associated with the endpoint /patron, only for the POST HTTP method. The @aelapsed decorator is not part of the Sanic API: it’s my own invention, merely to log out timings for each call.

5

Sanic provides immediate deserialization of received JSON data by using the .json attribute on the request object.

6

The model module, which I imported, is the model for our patron table in the database. I’ll go through that in more detail in the next code listing; for now, just understand that all the database queries and SQL are in this model module. Here I’m passing the connection pool for the database, and the same pattern is used for all the interaction with the database model in this function and in the PatronAPI class further down.

7

A new primary key, id, will be created, and this is returned back to the caller as JSON.

8

While creation is handled in the new_patron() function, all other interactions are handled in this class-based view, which is a convenience provided by Sanic. All the methods in this class are associated with the same URL, /patron/<id:int>, which you can see in the add_route() function near the bottom. Note that the id URL parameter will be passed to each of the methods, and this parameter is required for all three endpoints.

You can safely ignore the metaclass argument: all it does is wrap each method with the @aelapsed decorator so that timings will be printed in the logs. Again, this is not part of the Sanic API; it’s my own invention for logging timing data.

9

As before, model interaction is performed inside the model module.

10

If the model reports failure for doing the update, I modify the response data. I’ve included this for readers who have not yet seen Python’s version of the ternary operator.

11

The @app.listener decorators are hooks provided by Sanic to give you a place to add extra actions during the startup and shutdown sequence. This one, before_server_start, is invoked before the API server is started up. This seems like a good place to initialize our database connection.

12

Use the Database helper to create a connection to our PostgreSQL instance. The DB we’re connecting to is restaurant.

13

Obtain a connection pool to our database.

14

Use our model (for the patron table) to create the table if it’s missing.

15

Use our model to create a dedicated_listener for database events, listening on the channel chan_patron. The callback function for these events is model.db_event(), which I’ll go through in the next listing. The callback will be called every time the database updates the channel.

16

after_server_stop is the hook for tasks that must happen during shutdown. Here we disconnect from the database.

17

This add_route() call sends POST requests for the /patron URL to the new_patron() coroutine function.

18

This add_route() call sends all requests for the /patron/<id:int> URL to the PatronAPI class-based view. The method names in that class determine which one is called: a GET HTTP request will call the PatronAPI.get() method, and so on.

The preceding code contains all the HTTP handling for our server, as well as startup and shutdown tasks like setting up a connection pool to the database and, crucially, setting up a db-event listener on the chan_patron channel on the DB server.

Example 4-24 presents the model for the patron table in the database.

Example 4-24. DB model for the “patron” table
# model.py
import logging
from json import loads, dumps
from triggers import (
    create_notify_trigger, add_table_triggers)  1
from boltons.cacheutils import LRU  2

logger = logging.getLogger('perf')

CREATE_TABLE = ('CREATE TABLE IF NOT EXISTS patron('  3
                'id serial PRIMARY KEY, name text, '
                'fav_dish text)')
INSERT = ('INSERT INTO patron(name, fav_dish) '
          'VALUES ($1, $2) RETURNING id')
SELECT = 'SELECT * FROM patron WHERE id = $1'
UPDATE = 'UPDATE patron SET name=$1, fav_dish=$2 WHERE id=$3'
DELETE = 'DELETE FROM patron WHERE id=$1'
EXISTS = "SELECT to_regclass('patron')"

CACHE = LRU(max_size=65536)  4

async def add_patron(conn, data: dict) -> int:  5
    return await conn.fetchval(
        INSERT, data['name'], data['fav_dish'])

async def update_patron(conn, id: int, data: dict) -> bool:
    result = await conn.execute(  6
        UPDATE, data['name'], data['fav_dish'], id)
    return result == 'UPDATE 1'

async def delete_patron(conn, id: int):  7
    result = await conn.execute(DELETE, id)
    return result == 'DELETE 1'

async def get_patron(conn, id: int) -> dict:  8
    if id not in CACHE:
        logger.info(f'id={id} Cache miss')
        record = await conn.fetchrow(SELECT, id)  9
        CACHE[id] = record and dict(record.items())
    return CACHE[id]

def db_event(conn, pid, channel, payload):  10
    event = loads(payload)  11
    logger.info('Got DB event:\n' + dumps(event, indent=4))
    id = event['id']
    if event['type'] == 'INSERT':
        CACHE[id] = event['data']
    elif event['type'] == 'UPDATE':
        CACHE[id] = event['data']['new']  12
    elif event['type'] == 'DELETE':
        CACHE[id] = None

async def create_table_if_missing(conn):  13
    if not await conn.fetchval(EXISTS):
        await conn.fetchval(CREATE_TABLE)
        await create_notify_trigger(
            conn, channel='chan_patron')
        await add_table_triggers(
            conn, table='patron')
1

You have to add triggers to the database in order to get notifications when data changes. I’ve created these handy helpers to create the trigger function itself (with create_notify_trigger) and to add the trigger to a specific table (with add_table_triggers). The SQL required to do this is somewhat out of scope for this book, but it’s still crucial to understanding how this case study works. I’ve included the annotated code for these triggers in Appendix B.

2

The third-party boltons package provides a bunch of useful tools, not the least of which is the LRU cache, a more versatile option than the @lru_cache decorator in the functools standard library module.3

3

This block of text holds all the SQL for the standard CRUD operations. Note that I’m using native PostgreSQL syntax for the parameters: $1, $2, and so on. There is nothing novel here, and it won’t be discussed further.

4

Create the cache for this app instance.

5

I called this function from the Sanic module inside the new_patron() endpoint for adding new patrons. Inside the function, I use the fetchval() method to insert new data. Why fetchval() and not execute()? Because fetchval() returns the primary key of the new inserted record!4

6

Update an existing record. When this succeeds, PostgreSQL will return UPDATE 1, so I use that as a check to verify that the update succeeded.

7

Deletion is very similar to updating.

8

This is the read operation. This is the only part of our CRUD interface that cares about the cache. Think about that for a second: we don’t update the cache when doing an insert, update, or delete. This is because we rely on the async notification from the database (via the installed triggers) to update the cache if any data is changed.

9

Of course, we do still want to use the cache after the first GET.

10

The db_event() function is the callback that asyncpg will make when there are events on our DB notification channel, chan_patron. This specific parameter list is required by asyncpg. conn is the connection on which the event was sent, pid is the process ID of the PostgreSQL instance that sent the event, channel is the name of the channel (which in this case will be chan_patron), and the payload is the data being sent on the channel.

11

Deserialize the JSON data to a dict.

12

The cache population is generally quite straightforward, but note that update events contain both new and old data, so we need to make sure to cache the new data only.

13

This is a small utility function I’ve made to easily re-create a table if it’s missing. This is really useful if you need to do this frequently—such as when writing the code samples for this book!

This is also where the database notification triggers are created and added to our patron table. See Example B-1 for annotated listing of these functions.

That brings us to the end of this case study. We’ve seen how Sanic makes it very simple to create an API server, and we’ve seen how to use asyncpg for performing queries via a connection pool, and how to use PostgreSQL’s async notification features to receive callbacks over a dedicated, long-lived database connection.

Many people prefer to use object-relational mappers to work with databases, and in this area, SQLAlchemy is the leader. There is growing support for using SQLAlchemy together with asyncpg in third-party libraries like asyncpgsa and GINO. Another popular ORM, Peewee, is given support for asyncio through the aiopeewee package.

Other Libraries and Resources

There are many other libraries for asyncio not covered in this book. To find out more, you can check out the aio-libs project, which manages nearly 40 libraries, and the Awesome asyncio project, which bookmarks many other projects compatible with the asyncio module.

One library that bears special mention is aiofiles. As you may recall from our earlier discussions, I said that to achieve high concurrency in Asyncio, it is vitally important that the loop never block. In this context, our focus on blocking operations has been exclusively network-based I/O, but it turns out that disk access is also a blocking operation that will impact your performance at very high concurrency levels. The solution to this is aiofiles, which provides a convenient wrapper for performing disk access in a thread. This works because Python releases the GIL during file operations so your main thread (running the asyncio loop) is unaffected.

The most important domain for Asyncio is going to be network programming. For this reason, it’s not a bad idea to learn a little about socket programming, and even after all these years, Gordon McMillan’s “Socket Programming HOWTO”, included with the standard Python documentation, is one of the best introductions you’ll find.

I learned Asyncio from a wide variety of sources, many of which have already been mentioned in earlier sections. Everyone learns differently, so it’s worth exploring different types of learning materials. Here are a few others that I found useful:

  • Robert Smallshire’s “Get to Grips with Asyncio in Python 3” talk, presented at NDC London in January 2017. This is by far the best YouTube video on Asyncio I’ve come across. The talk may be somewhat advanced for a beginner, but it really does give a clear description of how Asyncio is designed.

  • Nikolay Novik’s “Building Apps with Asyncio” slides, presented at PyCon UA 2016. The information is dense, but a lot of practical experience is captured in these slides.

  • Endless sessions in the Python REPL, trying things out and “seeing what happens.”

I encourage you to continue learning, and if a concept doesn’t stick, keep looking for new sources until you find an explanation that works for you.

1 Actually, you can as long as the sockets being used in different threads are created, used, and destroyed entirely in their own threads. It is possible but hard to do, and many people struggle to get this right. This is why the recommendation to use a single thread and a polling mechanism is so strong.

2 The recipe for this dish, and recipes for other fine Spam-based fare, can be found on the UKTV website.

3 Obtain boltons with pip install boltons.

4 You also need the RETURNING id part of the SQL, though!

Get Using Asyncio in Python now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.