Capítulo 4. 20 bibliotecas Asyncio que no estás utilizando (pero... Oh, no importa)

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

En este capítulo, veremos casos prácticos en los que se utilizan las nuevas funciones de Python para la programación asíncrona. Utilizaremos varias bibliotecas de terceros, como harás tú en tus propios proyectos.

El título de este capítulo es un juego de palabras con el título de un libro anterior que escribí, titulado20 bibliotecas de Python que no estás utilizando (pero deberías) (O'Reilly). Muchas de esas bibliotecas también serán útiles en tus aplicaciones basadas en asyncio, pero este capítulo se centra en las bibliotecas diseñadas específicamente para las nuevas funciones asíncronas de Python.

Es difícil presentar código basado en asyncio en breves fragmentos. Como has visto en todos los ejemplos de código anteriores del libro, he intentado que cada ejemplo sea un programa completo y ejecutable, porque la gestión de la vida útil de la aplicación es una consideración fundamental para utilizar correctamente la programación asíncrona.

Por esta razón, la mayoría de los casos prácticos de este capítulo serán algo mayores, en términos de líneas de código, de lo que es habitual en un libro de este tipo. Mi objetivo al utilizar este enfoque es hacer que los casos prácticos sean más útiles, ofreciéndote una "visión completa" de un programa asíncrono, en lugar de dejarte que descubras cómo pueden encajar fragmentos sueltos.

Nota

Algunos de los ejemplos de código de este capítulo renuncian al estilo para ahorrar espacio. Me gusta PEP8 tanto como al siguiente Pythonista, ¡pero la practicidad vence a la pureza!

Flujos (Biblioteca estándar)

Antes de examinar las bibliotecas de terceros, empecemos por la biblioteca estándar. La API de flujoses la interfaz de alto nivel que se ofrece para la programación de sockets asíncronos y, como mostrará el siguiente caso práctico, es bastante fácil de utilizar. Sin embargo, el diseño de la aplicación sigue siendo complejo simplemente por la naturaleza del dominio.

El siguiente caso práctico muestra la implementación de un corredor de mensajes, con un diseño inicial ingenuo seguido de un diseño más considerado. Ninguno de los dos debe considerarse listo para la producción; mi objetivo es ayudarte a reflexionar sobre los diversos aspectos de la programación concurrente en red que hay que tener en cuenta al diseñar este tipo de aplicaciones.

Caso práctico: Una cola de mensajes

Un servicio de cola de mensajes es una aplicación backend que recibe conexiones de otras aplicaciones y pasa mensajes entre esos servicios conectados, a menudo denominados editores y suscriptores. Los suscriptores suelen estar a la escucha de canales específicos para recibir mensajes, y normalmente es posible configurar la distribución de mensajes en diferentes canales de dos maneras: los mensajes pueden distribuirse a todos los suscriptores de un canal(pub-sub), o un mensaje diferente puede ir a cada suscriptor de uno en uno(punto a punto).

Recientemente, he trabajado en un proyecto que implicaba el uso deActiveMQ como broker de mensajes para la intercomunicación de microservicios. A un nivel básico, un broker de este tipo (servidor):

  • Mantiene conexiones de socket persistentes con varios clientes

  • Recibe mensajes de clientes con un nombre de canal de destino

  • Entrega esos mensajes a todos los demás clientes suscritos a ese mismo nombre de canal

Recuerdo que me preguntaba lo difícil que sería crear una aplicación así. Como toque añadido, ActiveMQ puede realizar ambos modelos de distribución de mensajes, y los dos modelos se diferencian generalmente por el nombre del canal:

  • Los nombres de canal con el prefijo /topic (por ejemplo, /topic/customer/registration) se gestionan con el patrón pub-sub, en el que todos los suscriptores del canal reciben todos los mensajes.

  • Los nombres de canal con el prefijo /queue se gestionan con el modelopunto a punto, en el que los mensajes de un canal se distribuyen entre los suscriptores del canal de forma rotatoria: cada suscriptor recibe un mensaje único.

En nuestro caso práctico, construiremos un intermediario de mensajes de juguete con estas características básicas. La primera cuestión que debemos abordar es que TCP no es un protocolo basado en mensajes: sólo recibimos flujos de bytes en el cable. Tenemos que crear nuestro propio protocolo para la estructura de los mensajes, y el protocolo más sencillo es anteponer a cada mensaje una cabecera de tamaño, seguida de una carga útil de ese tamaño. La biblioteca de utilidades del Ejemplo 4-1 proporciona capacidades de lectura y escritura para este tipo de mensajes.

Ejemplo 4-1. Protocolo de mensajes: lectura y escritura
# msgproto.py
from asyncio import StreamReader, StreamWriter

async def read_msg(stream: StreamReader) -> bytes:
    size_bytes = await stream.readexactly(4)  1
    size = int.from_bytes(size_bytes, byteorder='big')  2
    data = await stream.readexactly(size)  3
    return data

async def send_msg(stream: StreamWriter, data: bytes):
    size_bytes = len(data).to_bytes(4, byteorder='big')
    stream.writelines([size_bytes, data])  4
    await stream.drain()
1

Obtén los 4 primeros bytes. Es el prefijo del tamaño.

2

Esos 4 bytes deben convertirse en un número entero.

3

Ahora sabemos el tamaño de la carga útil, así que lo leemos del flujo.

4

La escritura es la inversa de la lectura: primero enviamos la longitud de los datos, codificada como 4 bytes, y después los datos.

Ahora que ya tenemos un protocolo de mensajes rudimentario, podemos centrarnos en la aplicación del corredor de mensajes del Ejemplo 4-2.

Ejemplo 4-2. Un servidor prototipo de 40 líneas
# mq_server.py
import asyncio
from asyncio import StreamReader, StreamWriter, gather
from collections import deque, defaultdict
from typing import Deque, DefaultDict
from msgproto import read_msg, send_msg  1

SUBSCRIBERS: DefaultDict[bytes, Deque] = defaultdict(deque) 2

async def client(reader: StreamReader, writer: StreamWriter):
  peername = writer.get_extra_info('peername')  3
  subscribe_chan = await read_msg(reader)  4
  SUBSCRIBERS[subscribe_chan].append(writer)  5
  print(f'Remote {peername} subscribed to {subscribe_chan}')
  try:
    while channel_name := await read_msg(reader):  6
      data = await read_msg(reader)  7
      print(f'Sending to {channel_name}: {data[:19]}...')
      conns = SUBSCRIBERS[channel_name]  8
      if conns and channel_name.startswith(b'/queue'):  9
          conns.rotate()  10
          conns = [conns[0]]  11
      await gather(*[send_msg(c, data) for c in conns]) 12
  except asyncio.CancelledError:
    print(f'Remote {peername} closing connection.')
    writer.close()
    await writer.wait_closed()
  except asyncio.IncompleteReadError:
    print(f'Remote {peername} disconnected')
  finally:
    print(f'Remote {peername} closed')
    SUBSCRIBERS[subscribe_chan].remove(writer)  13

async def main(*args, **kwargs):
    server = await asyncio.start_server(*args, **kwargs)
    async with server:
        await server.serve_forever()

try:
    asyncio.run(main(client, host='127.0.0.1', port=25000))
except KeyboardInterrupt:
    print('Bye!')
1

Importa de nuestro módulo msgproto.py.

2

Una colección global de los suscriptores actualmente activos. Cada vez que un cliente se conecta, debe enviar primero el nombre del canal al que se suscribe. Un deque contendrá todos los suscriptores de un canal concreto.

3

La función client() coroutine producirá una coroutine de larga duración para cada nueva conexión. Piensa en ella como una llamada de retorno para el servidor TCP iniciado en main(). En esta línea, he mostrado cómo se pueden obtener el host y el puerto del par remoto, por ejemplo, para el registro.

4

Nuestro protocolo para los clientes es el siguiente

  • En la primera conexión, un cliente debe enviar un mensaje que contenga el canal al que desea suscribirse (aquí, subscribe_chan).

  • A partir de entonces, durante toda la vida de la conexión, un cliente envía un mensaje a un canal enviando primero un mensaje que contenga el nombre del canal de destino, seguido de un mensaje que contenga los datos. Nuestro intermediario enviará esos mensajes de datos a todos los clientes suscritos a ese nombre de canal.

5

Añade la instancia StreamWriter a la colección global de suscriptores.

6

Un bucle infinito, esperando datos de este cliente. El primer mensaje de un cliente debe ser el nombre del canal de destino.

7

A continuación vienen los datos reales que hay que distribuir al canal.

8

Obtén el deque de suscriptores del canal de destino.

9

Un manejo especial si el nombre del canal empieza por la palabra mágica/queue: en este caso, enviamos los datos sólo a uno de los suscriptores, no a todos. Esto puede utilizarse para compartir trabajo entre un grupo de trabajadores, en lugar del esquema habitual de notificación pub-sub, en el que todos los suscriptores de un canal reciben todos los mensajes.

10

Ésta es la razón por la que utilizamos un deque y no una lista: la rotación del deque es la forma en que hacemos un seguimiento de qué cliente es el siguiente en la cola para la distribución de /queue. Esto parece costoso hasta que te das cuenta de que una sola rotación del deque es una operación O(1).

11

Apunta sólo al cliente que esté primero; esto cambia después de cada rotación.

12

Crea una lista de coroutines para enviar el mensaje a cada escritor, y luego desempaquétalas en gather() para que podamos esperar a que se complete todo el envío.

Esta línea es un fallo de nuestro programa, pero puede que no sea obvio por qué: aunque sea cierto que todos los envíos a cada abonado se producirán de forma concurrente, ¿qué ocurre si tenemos un cliente muy lento? En este caso, el gather() sólo terminará cuando el abonado más lento haya recibido sus datos. No podremos recibir más datos del cliente emisor hasta que terminen todas estas coroutines de send_msg(). Esto ralentiza toda la distribución de mensajes a la velocidad del abonado más lento.

13

Al salir de la coroutina client(), nos aseguramos de eliminarnos de la colección global SUBSCRIBERS. Desgraciadamente, se trata de una operación O(n), que puede resultar un poco cara para n muy grandes. Una estructura de datos diferente lo solucionaría, pero por ahora nos consolamos sabiendo que las conexiones están pensadas para durar mucho -por tanto, debería haber pocos eventos de desconexión- y es poco probable que n sea muy grande (digamos ~10.000 como estimación aproximada de orden de magnitud), y este código es, al menos, fácil de entender.

Así que ése es nuestro servidor; ahora necesitamos clientes, y entonces podremos mostrar alguna salida. A efectos de demostración, crearé dos tipos de clientes: unemisor y un oyente. El servidor no hace diferencias; todos los clientes son iguales. La distinción entre el comportamiento del emisor y del oyente sólo tiene fines educativos. El Ejemplo 4-3 muestra el código de la aplicación oyente.

Ejemplo 4-3. Listener: un conjunto de herramientas para escuchar mensajes en nuestro corredor de mensajes
# mq_client_listen.py
import asyncio
import argparse, uuid
from msgproto import read_msg, send_msg

async def main(args):
  me = uuid.uuid4().hex[:8] 1
  print(f'Starting up {me}')
  reader, writer = await asyncio.open_connection(
    args.host, args.port) 2
  print(f'I am {writer.get_extra_info("sockname")}')
  channel = args.listen.encode()  3
  await send_msg(writer, channel)  4
  try:
    while data := await read_msg(reader):  5
      print(f'Received by {me}: {data[:20]}')
    print('Connection ended.')
  except asyncio.IncompleteReadError:
    print('Server closed.')
  finally:
    writer.close()
    await writer.wait_closed()


if __name__ == '__main__':
  parser = argparse.ArgumentParser() 6
  parser.add_argument('--host', default='localhost')
  parser.add_argument('--port', default=25000)
  parser.add_argument('--listen', default='/topic/foo')
  try:
    asyncio.run(main(parser.parse_args()))
  except KeyboardInterrupt:
    print('Bye!')
1

El módulo de la biblioteca estándar uuid es una forma cómoda de crear una "identidad" para este oyente. Si pones en marcha varias instancias, cada una tendrá su propia identidad, y podrás hacer un seguimiento de lo que ocurre en los registros.

2

Abre una conexión con el servidor.

3

El canal al que suscribirse es un parámetro de entrada, capturado enargs.listen. Codifícalo en bytes antes de enviarlo.

4

Según las reglas de nuestro protocolo (como ya se ha comentado en el análisis del código del intermediario), lo primero que hay que hacer tras conectarse es enviar el nombre del canal al que suscribirse.

5

Este bucle no hace otra cosa que esperar a que aparezcan datos en la toma.

6

Los argumentos de la línea de comandos de este programa facilitan indicar un host, un puerto y un nombre de canal para escuchar.

El código del otro cliente, el programa emisor que se muestra en el Ejemplo 4-4, tiene una estructura similar a la del módulo oyente.

Ejemplo 4-4. Remitente: un conjunto de herramientas para enviar datos a nuestro corredor de mensajes
# mq_client_sender.py
import asyncio
import argparse, uuid
from itertools import count
from msgproto import send_msg

async def main(args):
    me = uuid.uuid4().hex[:8]  1
    print(f'Starting up {me}')
    reader, writer = await asyncio.open_connection(
        host=args.host, port=args.port)  2
    print(f'I am {writer.get_extra_info("sockname")}')

    channel = b'/null'  3
    await send_msg(writer, channel) 4

    chan = args.channel.encode()  5
    try:
        for i in count():  6
            await asyncio.sleep(args.interval)  7
            data = b'X'*args.size or f'Msg {i} from {me}'.encode()
            try:
                await send_msg(writer, chan)
                await send_msg(writer, data) 8
            except OSError:
                print('Connection ended.')
                break
    except asyncio.CancelledError:
        writer.close()
        await writer.wait_closed()

if __name__ == '__main__':
  parser = argparse.ArgumentParser()  9
  parser.add_argument('--host', default='localhost')
  parser.add_argument('--port', default=25000, type=int)
  parser.add_argument('--channel', default='/topic/foo')
  parser.add_argument('--interval', default=1, type=float)
  parser.add_argument('--size', default=0, type=int)
  try:
    asyncio.run(main(parser.parse_args()))
  except KeyboardInterrupt:
    print('Bye!')
1

Al igual que con el oyente, reclama una identidad.

2

Acércate y establece una conexión.

3

Según las reglas de nuestro protocolo, lo primero que hay que hacer tras conectarse al servidor es dar el nombre del canal al que suscribirse; sin embargo, como somos un emisor, en realidad no nos interesa suscribirnos a ningún canal. No obstante, el protocolo lo exige, así que basta con proporcionar un canal nulo al que suscribirse (en realidad no escucharemos nada).

4

Envía el canal al que suscribirte.

5

El parámetro de la línea de comandos args.channel proporciona el canal al quequeremos enviar los mensajes. Hay que convertirlo primero a bytes antes de enviarlo.

6

Utilizar itertools.count() es como un bucle while True, salvo que obtenemos una variable de iteración para utilizarla. La utilizamos en los mensajes de depuración, ya que facilita un poco el seguimiento de qué mensaje se envió desde dónde.

7

El retardo entre mensajes enviados es un parámetro de entrada,args.interval. La siguiente línea genera la carga útil del mensaje. Puede ser una cadena de bytes del tamaño especificado (args.size) o un mensaje descriptivo. Esta flexibilidad es sólo para pruebas.

8

Ten en cuenta que aquí se envían dos mensajes: el primero es el nombre del canal de destino y el segundo es la carga útil.

9

Al igual que con el oyente, hay un montón de opciones de línea de comandos para ajustar el remitente: channel determina el canal de destino al que enviar, mientras que interval controla el retardo entre envíos. El parámetro size controla el tamaño de la carga útil de cada mensaje.

Ahora tenemos un intermediario, un oyente y un remitente; es hora de ver algún resultado. Para producir los siguientes fragmentos de código, puse en marcha el servidor, luego dos oyentes y después un remitente. Después de enviar unos cuantos mensajes, detuve el servidor con Ctrl-C. La salida del servidor se muestra en el Ejemplo 4-5, la del emisor en el Ejemplo 4-6 y la del oyente en los Ejemplos 4-7 y 4-8.

Ejemplo 4-5. Salida del agente de mensajes (servidor)
$ mq_server.py
Remote ('127.0.0.1', 55382) subscribed to b'/queue/blah'
Remote ('127.0.0.1', 55386) subscribed to b'/queue/blah'
Remote ('127.0.0.1', 55390) subscribed to b'/null'
Sending to b'/queue/blah': b'Msg 0 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 1 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 2 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 3 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 4 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 5 from 6b5a8e1d'...
^CBye!
Remote ('127.0.0.1', 55382) closing connection.
Remote ('127.0.0.1', 55382) closed
Remote ('127.0.0.1', 55390) closing connection.
Remote ('127.0.0.1', 55390) closed
Remote ('127.0.0.1', 55386) closing connection.
Remote ('127.0.0.1', 55386) closed
Ejemplo 4-6. Salida del remitente (cliente)
$ mq_client_sender.py --channel /queue/blah
Starting up 6b5a8e1d
I am ('127.0.0.1', 55390)
Connection ended.
Ejemplo 4-7. Salida del oyente 1 (cliente)
$ mq_client_listen.py --listen /queue/blah
Starting up 9ae04690
I am ('127.0.0.1', 55382)
Received by 9ae04690: b'Msg 1 from 6b5a8e1d'
Received by 9ae04690: b'Msg 3 from 6b5a8e1d'
Received by 9ae04690: b'Msg 5 from 6b5a8e1d'
Server closed.
Ejemplo 4-8. Salida del oyente 2 (cliente)
$ mq_client_listen.py --listen /queue/blah
Starting up bd4e3baa
I am ('127.0.0.1', 55386)
Received by bd4e3baa: b'Msg 0 from 6b5a8e1d'
Received by bd4e3baa: b'Msg 2 from 6b5a8e1d'
Received by bd4e3baa: b'Msg 4 from 6b5a8e1d'
Server closed.

Nuestro corredor de mensajes de juguete funciona. El código también es bastante fácil de entender, dado un dominio de problemas tan complejo, pero por desgracia, el diseño del propio código del corredor es problemático.

El problema es que, para un cliente concreto, enviamos mensajes a los suscriptores en la misma coroutina en la que se reciben nuevos mensajes. Esto significa que si algún suscriptor tarda en consumir lo que estamos enviando, puede que esa línea await gather(...) del Ejemplo 4-2 tarde mucho tiempo en completarse, y no podremos recibir y procesar más mensajes mientras esperamos.

En su lugar, necesitamos desacoplar la recepción de mensajes del envío de mensajes. En el siguiente caso práctico, refactorizamos nuestro código para hacer exactamente eso.

Caso práctico: Mejorar la cola de mensajes

En este caso práctico, mejoramos el diseño de nuestro corredor de mensajes de juguete. Los programas oyente y emisor permanecen tal cual. La mejora específica en el nuevo diseño del agente es desacoplar el envío y la recepción de mensajes; esto resolverá el problema por el que un suscriptor lento también ralentizaría la recepción de nuevos mensajes, como se ha comentado en la sección anterior. El nuevo código, que se muestra en el Ejemplo 4-9, es un poco más largo, pero no demasiado.

Ejemplo 4-9. Agente de mensajes: diseño mejorado
# mq_server_plus.py
import asyncio
from asyncio import StreamReader, StreamWriter, Queue
from collections import deque, defaultdict
from contextlib import suppress
from typing import Deque, DefaultDict, Dict
from msgproto import read_msg, send_msg

SUBSCRIBERS: DefaultDict[bytes, Deque] = defaultdict(deque)
SEND_QUEUES: DefaultDict[StreamWriter, Queue] = defaultdict(Queue)
CHAN_QUEUES: Dict[bytes, Queue] = {}  1

async def client(reader: StreamReader, writer: StreamWriter):
  peername = writer.get_extra_info('peername')
  subscribe_chan = await read_msg(reader)
  SUBSCRIBERS[subscribe_chan].append(writer)  2
  send_task = asyncio.create_task(
      send_client(writer, SEND_QUEUES[writer]))  3
  print(f'Remote {peername} subscribed to {subscribe_chan}')
  try:
    while channel_name := await read_msg(reader):
      data = await read_msg(reader)
      if channel_name not in CHAN_QUEUES:  4
        CHAN_QUEUES[channel_name] = Queue(maxsize=10)  5
        asyncio.create_task(chan_sender(channel_name))  6
      await CHAN_QUEUES[channel_name].put(data)  7
  except asyncio.CancelledError:
    print(f'Remote {peername} connection cancelled.')
  except asyncio.IncompleteReadError:
    print(f'Remote {peername} disconnected')
  finally:
    print(f'Remote {peername} closed')
    await SEND_QUEUES[writer].put(None)  8
    await send_task  9
    del SEND_QUEUES[writer]  10
    SUBSCRIBERS[subscribe_chan].remove(writer)

async def send_client(writer: StreamWriter, queue: Queue):  11
    while True:
        try:
            data = await queue.get()
        except asyncio.CancelledError:
            continue

        if not data:
            break

        try:
            await send_msg(writer, data)
        except asyncio.CancelledError:
            await send_msg(writer, data)

    writer.close()
    await writer.wait_closed()

async def chan_sender(name: bytes):
    with suppress(asyncio.CancelledError):
        while True:
            writers = SUBSCRIBERS[name]
            if not writers:
                await asyncio.sleep(1)
                continue  12
            if name.startswith(b'/queue'):  13
                writers.rotate()
                writers = [writers[0]]
            if not (msg := await CHAN_QUEUES[name].get()): 14
                break
            for writer in writers:
                if not SEND_QUEUES[writer].full():
                    print(f'Sending to {name}: {msg[:19]}...')
                    await SEND_QUEUES[writer].put(msg)  15

async def main(*args, **kwargs):
    server = await asyncio.start_server(*args, **kwargs)
    async with server:
        await server.serve_forever()
try:
    asyncio.run(main(client, host='127.0.0.1', port=25000))
except KeyboardInterrupt:
    print('Bye!')
1

En la implementación anterior, sólo existíaSUBSCRIBERS; ahora existen SEND_QUEUES y CHAN_QUEUES como colecciones globales. Esto es consecuencia de desacoplar completamente la recepcióny el envío de datos. SEND_QUEUES tiene una entrada de cola por cada conexión de cliente: todos los datos que deban enviarse a ese cliente deben colocarse en esa cola. (Si te adelantas, la coroutina send_client() sacará datos de SEND_QUEUES y los enviará).

2

Hasta este punto de la función coroutine client(), el código es el mismo que en el servidor simple: se recibe el nombre del canal suscrito, y añadimos la instancia de StreamWriter instancia del nuevo cliente a la colección global SUBSCRIBERS.

3

Esto es nuevo: creamos una tarea de larga duración que realizará todo el envío de datos a este cliente. La tarea se ejecutará independientemente como una coroutine separada y sacará mensajes de la cola suministrada, SEND_QUEUES[writer], para enviarlos.

4

Ahora estamos dentro del bucle donde recibimos los datos. Recuerda que siempre recibimos dos mensajes: uno para el nombre del canal de destino y otro para los datos. Vamos a crear un Queue nuevo y dedicado para cada canal de destino, y para eso está CHAN_QUEUES: cuando cualquier cliente quiera enviar datos a un canal, vamos a poner esos datos en la cola apropiada y luego volveremos inmediatamente a escuchar más datos. Este enfoque desacopla la distribución de mensajes de la recepción de mensajes de este cliente.

5

Si todavía no existe una cola para el canal de destino, crea una.

6

Crea una tarea dedicada y de larga duración para ese canal. La coroutinachan_sender() se encargará de sacar datos de la cola del canal y distribuirlos a los suscriptores.

7

Coloca los nuevos datos recibidos en la cola del canal específico. Si la cola se llena, esperaremos aquí hasta que haya espacio para los nuevos datos. Esperar aquí significa que no leeremos ningún dato nuevo del socket, lo que significa que el cliente tendrá que esperar para enviar nuevos datos al socket por su parte. Esto no es necesariamente algo malo, ya que comunica la llamada contrapresión a este cliente. (Alternativamente, podrías optar por soltar mensajes aquí si el caso de uso está de acuerdo con ello).

8

Cuando se cierra la conexión, es hora de limpiar. La tarea de larga duración que creamos para enviar datos a este cliente, send_task, puede cerrarse colocando None en su cola, SEND_QUEUES[writer] (comprueba el código de send_client()). Es importante utilizar un valor en la cola, en lugar de la cancelación directa, porque puede que ya haya datos en esa cola y queremos que esos datos se envíen antes de que send_client() finalice.

9

Espera a que termine la tarea del remitente...

10

...y eliminamos la entrada de la colección SEND_QUEUES (y en la línea siguiente, eliminamos también la entrada sock de la colección SUBSCRIBERS, como antes).

11

La función de la corrutina send_client() es casi un ejemplo de libro de texto de cómo sacar trabajo de una cola. Observa cómo la corrutina saldrá sólo si None se coloca en la cola. Observa también cómo suprimimos CancelledError dentrodel bucle: esto se debe a que queremos que esta tarea se cierre sólo al recibir un None en la cola. De esta forma, todos los datos pendientes en la cola pueden enviarse antes de cerrarse.

12

chan_sender() es la lógica de distribución de un canal: envía datos desde una instancia dedicada del canal Queue a todos los suscriptores de ese canal. Pero, ¿qué ocurre si todavía no hay suscriptores para este canal? Esperaremos un poco y volveremos a intentarlo. (Aunque ten en cuenta que la cola de este canal, CHAN_QUEUES[name], seguirá llenándose).

13

Como en nuestra implementación anterior del corredor, hacemos algo especial para los canales cuyo nombre empieza por /queue: rotamos el deque y enviamos sólo a la primera entrada. Esto actúa como un burdo sistema de equilibrio de carga, ya que cada suscriptor recibe mensajes diferentes de la misma cola. Para todos los demás canales, todos los suscriptores reciben todos los mensajes.

14

Aquí esperaremos datos en la cola, y saldremos si se recibe None. Actualmente, esto no se activa en ningún sitio (por lo que estas coroutines de chan_sender()viven para siempre), pero si se añadiera lógica para limpiar estas tareas del canal después de, digamos, algún periodo de inactividad, así es como se haría.

15

Se han recibido los datos, así que es hora de enviarlos a los suscriptores. Aquí no hacemos el envío: en su lugar, colocamos los datos en la cola de envío propia de cada suscriptor. Este desacoplamiento es necesario para garantizar que un suscriptor lento no ralentice a nadie más que reciba datos. Y además, si el abonado es tan lento que su cola de envío se llena, no ponemos esos datos en su cola; es decir, se pierden.

El diseño anterior produce el mismo resultado que la implementación simplista anterior, pero ahora podemos estar seguros de que un oyente lento no interferirá en la distribución de mensajes a otros oyentes.

Estos dos casos prácticos muestran una progresión en el pensamiento en torno al diseño de un sistema de distribución de mensajes. Un aspecto clave fue darse cuenta de que el envío y la recepción de datos podrían gestionarse mejor en coroutines separadas, dependiendo del caso de uso. En tales casos, las colas pueden ser muy útiles para mover datos entre esas distintas coroutines y para proporcionar almacenamiento intermedio para desacoplarlas.

El objetivo más importante de estos casos prácticos era mostrar cómo la API de flujos de asyncio hace que sea muy fácil crear aplicaciones basadas en sockets.

Retorcido

El proyecto Twisted es anterior -dramáticamente- a la biblioteca estándarasyncio, y lleva enarbolando la bandera de la programación asíncrona en Python desde hace unos 14 años. El proyecto proporciona no sólo los bloques de construcción básicos, como un bucle de eventos, sino también primitivas como los deferreds, que son un poco como los futuros de asyncio. El diseño de asyncio se ha visto muy influido por Twisted y por la amplia experiencia de sus líderes y mantenedores.

Ten en cuenta que asyncio no sustituye a Twisted. Twisted incluye implementaciones de alta calidad de un enorme número de protocolos de Internet, incluyendo no sólo el habitual HTTP, sino también XMPP, NNTP, IMAP, SSH, IRC y FTP (tanto servidores como clientes). Y la lista continúa: ¿DNS? Comprobado ¿SMTP? Comprobado. ¿POP3? Comprobado. La disponibilidad de estas excelentes implementaciones del protocolo de Internet sigue haciendo que Twisted sea convincente.

A nivel de código, la principal diferencia entre Twisted yasyncio, aparte de la historia y el contexto histórico, es que durante mucho tiempo Python careció de soporte de lenguaje para coroutines, y esto significó que Twisted y proyectos similares tuvieron que idear formas de tratar la asincronía que funcionaran con la sintaxis estándar de Python.

Durante la mayor parte de la historia de Twisted, las devoluciones de llamada eran el medio por el que se realizaba la programación asíncrona, con toda la complejidad no lineal que ello conlleva; sin embargo, cuando se hizo posible utilizar generadores como coroutines improvisadas, de repente se hizo posible maquetar código en Twisted de forma lineal utilizando su decorador @defer.inlineCallbacks, como se muestra en el Ejemplo 4-10.

Ejemplo 4-10. Aún más Twisted con devoluciones de llamada inlined
@defer.inlineCallbacks  1
def f():
    yield
    defer.returnValue(123)  2

@defer.inlineCallbacks
def my_coro_func():
    value = yield f()  3
    assert value == 123
1

Normalmente, Twisted requiere crear instancias de Deferred y añadir devoluciones de llamada a esas instancias como método para construir programas asíncronos. Hace unos años, se añadió el decorador @inlineCallbacks , que reutiliza los generadores como coroutines.

2

Aunque @inlineCallbacks te permitía escribir código lineal en apariencia (a diferencia de las devoluciones de llamada), se necesitaban algunos hacks, como esta llamada a defer.returnValue(), que es cómo tienes que devolver valores de@inlineCallbacks coroutines.

3

Aquí podemos ver el yield que hace de esta función un generador. Para que@inlineCallbacks funcione, debe haber al menos un yield presente en la función decorada.

Desde que aparecieron las coroutines nativas en Python 3.5, el equipo de Twisted (y Amber Brown en particular) han estado trabajando para añadir soporte para ejecutar Twisted en el bucle de eventos asyncio.

Se trata de un esfuerzo en curso, y mi objetivo en esta sección no es convencerte de que crees todas tus aplicaciones como híbridos Twisted-asyncio, sino más bien hacerte consciente de que actualmente se está trabajando para proporcionar una interoperabilidad significativa entre ambos.

Para los que tengáis experiencia con Twisted, el Ejemplo 4-11puede resultar chocante.

Ejemplo 4-11. Soporte para asyncio en Twisted
# twisted_asyncio.py
from time import ctime
from twisted.internet import asyncioreactor
asyncioreactor.install()  1
from twisted.internet import reactor, defer, task  2

async def main():  3
    for i in range(5):
        print(f'{ctime()} Hello {i}')
        await task.deferLater(reactor, 1, lambda: None)  4

defer.ensureDeferred(main())  5
reactor.run()  6
1

Así es como le dices a Twisted que utilice el bucle de eventos asyncio como su reactor principal. Ten en cuenta que esta línea debe ir antes de que se importe el reactor de twisted.internet en la línea siguiente.

2

Cualquiera que esté familiarizado con la programación Twisted reconocerá estas importaciones. No tenemos espacio para cubrirlas en profundidad aquí, pero en pocas palabras, reactor es la versión Twisted del bucle asyncio, y defer y task son espacios de nombres para herramientas que trabajan con coroutines de programación.

3

Ver async def aquí, en un programa Twisted, parece extraño, pero este es en realidad lo que nos ofrece el nuevo soporte para async/await: la posibilidad de utilizar coroutines nativas directamente en programas Twisted.

4

En el mundo antiguo de @inlineCallbacks, habrías utilizado yield from aquí, pero ahora podemos utilizar await, igual que en el código asyncio. La otra parte de esta línea, deferLater(), es una forma alternativa de hacer lo mismo que asyncio.sleep(1). await un futuro en el que, al cabo de un segundo, se disparará una llamada de retorno de no hacer nada.

5

ensureDeferred() es una versión Twisted de programar una coroutina. Esto sería análogo a loop.create_task() o asyncio.ensure_future().

6

Ejecutar el reactor es lo mismo que loop.run_forever() en asyncio.

La ejecución de este script produce el siguiente resultado:

$ twisted_asyncio.py
Mon Oct 16 16:19:49 2019 Hello 0
Mon Oct 16 16:19:50 2019 Hello 1
Mon Oct 16 16:19:51 2019 Hello 2
Mon Oct 16 16:19:52 2019 Hello 3
Mon Oct 16 16:19:53 2019 Hello 4

Hay mucho más que aprender sobre Twisted. En particular, merece la pena que repases la lista de protocolos de red que implementa. Aún queda trabajo por hacer, pero el futuro parece muy prometedor para la interoperabilidad entre Twisted y asyncio.

asyncio se ha diseñado de tal manera que podemos esperar un futuro en el que será posible incorporar código de muchos marcos asíncronos, como Twisted y Tornado, en una sola aplicación, con todo el código ejecutándose en el mismo bucle de eventos.

La cola de Janus

La cola Janus (instalada con pip install janus) proporciona una solución para la comunicación entre hilos y coroutines. En la biblioteca estándar de Python, existen dos tipos de colas:

queue.Queue

Una cola de bloqueo, comúnmente utilizada para la comunicación y almacenamiento en búfer entre hilos

asyncio.Queue

Una cola compatible con async, utilizada habitualmente para la comunicación y almacenamiento en búfer entre coroutines

Por desgracia, ¡ninguna de las dos es útil para la comunicación entre hilos y coroutines! Aquí es donde entra Janus: es una cola única que expone ambas APIs, una de bloqueo y otra asíncrona.El Ejemplo 4-12 genera datos desde dentro de un hilo, coloca esos datos en una cola y luego los consume desde una coroutina.

Ejemplo 4-12. Conectar coroutines e hilos con una cola Janus
# janus_demo.py
import asyncio
import random
import time

import janus

async def main():
    loop = asyncio.get_running_loop()
    queue = janus.Queue(loop=loop)  1
    future = loop.run_in_executor(None, data_source, queue)
    while (data := await queue.async_q.get()) is not None:  2
        print(f'Got {data} off queue')  3
    print('Done.')

def data_source(queue):
    for i in range(10):
        r = random.randint(0, 4)
        time.sleep(r)  4
        queue.sync_q.put(r)  5
    queue.sync_q.put(None)

asyncio.run(main())
1

Crea una cola Janus. Ten en cuenta que, al igual que un asyncio.Queue, la cola Janus estará asociada a un bucle de eventos específico. Como de costumbre, si no proporcionas el parámetro loop, se utilizará internamente la llamada estándarget_event_loop().

2

Nuestra función coroutine main() simplemente espera datos en una cola. Esta línea se suspenderá hasta que haya datos, exactamente hasta que haya datos, exactamente igual que llamar a get() en una instancia de asyncio.Queue. El objeto cola tiene dos caras: ésta se llamaasync_q y proporciona la API de cola compatible con asíncronos.

3

Imprime un mensaje.

4

Dentro de la función data_source(), se genera un int aleatorio, que se utiliza como duración del sueño y como valor de los datos. Ten en cuenta que la llamada a time.sleep() es bloqueante, por lo que esta función debe ejecutarse en un hilo.

5

Coloca los datos en la cola Janus. Esto muestra la otracara de la cola Janus: sync_q que proporciona la API estándar y bloqueante Queue.

Este es el resultado:

$ <name>
Got 2 off queue
Got 4 off queue
Got 4 off queue
Got 2 off queue
Got 3 off queue
Got 4 off queue
Got 1 off queue
Got 1 off queue
Got 0 off queue
Got 4 off queue
Done.

Si puedes, es mejor aspirar a tener trabajos de ejecución cortos, y en estos casos, no será necesaria una cola (para la comunicación). Sin embargo, esto no siempre es posible, y en tales situaciones, la cola Janus puede ser la solución más conveniente para almacenar y distribuir datos entre hilos y coroutines.

aiohttp

aiohttp lleva todo lo relacionado con HTTP a asyncio, incluida la compatibilidad con clientes y servidores HTTP, así como con WebSocket. Saltemos directamente a los ejemplos de código, empezando por la simplicidad misma: "Hola Mundo".

Caso práctico: Hola Mundo

El Ejemplo 4-13 muestra un servidor web mínimo utilizando aiohttp.

Ejemplo 4-13. Ejemplo mínimo de aiohttp
from aiohttp import web

async def hello(request):
    return web.Response(text="Hello, world")

app = web.Application()  1
app.router.add_get('/', hello)  2
web.run_app(app, port=8080)  3
1

Se crea una instancia de Application.

2

Se crea una ruta, con la coroutina de destino hello() dada como manejador.

3

Se ejecuta la aplicación web.

Observa que en este código no se mencionan bucles, tareas ni futuros: los desarrolladores del marco aiohttp nos han ocultado todo eso, dejando una API muy limpia. Esto va a ser habitual en la mayoría de los marcos que se construyan sobre asyncio, que ha sido diseñado para permitir a los diseñadores de marcos elegir sólo las partes que necesitan, y encapsularlas en su API preferida.

Caso práctico: Raspando las noticias

aiohttp puede utilizarse como servidor y como biblioteca cliente de, como la muy popular (¡pero bloqueante!)requests . Quería mostrar aiohttp utilizando un ejemplo que incorpora ambas características.

En este caso práctico, implementaremos un sitio web que hace web scraping entre bastidores. La aplicación raspará dos sitios web de noticias y combinará los titulares en una página de resultados. Ésta es la estrategia:

  1. Un navegador cliente realiza una petición web a http://localhost:8080/news.

  2. Nuestro servidor web recibe la solicitud y, en el backend, obtiene los datos HTML de varios sitios web de noticias.

  3. Los datos de cada página se extraen para los titulares.

  4. Los titulares se ordenan y formatean en el HTML de respuesta que enviamos de vuelta al cliente del navegador.

La Figura 4-1 muestra el resultado.

uaip 0401
Figura 4-1. El producto final de nuestro raspador de noticias: los titulares de la CNN se muestran en un color, y los de Al Jazeera en otro

El raspado web se ha vuelto bastante difícil hoy en día. Por ejemplo, si pruebasrequests.get('http://edition.cnn.com'), ¡verás que la respuesta contiene muy pocos datos utilizables! Cada vez es más necesario poder ejecutar JavaScript localmente para obtener datos, porque muchos sitios utilizan JavaScript para cargar su contenido real. El proceso de ejecutar dicho JavaScript para producir la salida HTML final y completa se denomina renderizado.

Para llevar a cabo la renderización de, utilizamos un estupendo proyecto llamadoSplash, que se describe a sí mismo como un "servicio de renderización de JavaScript". Puede ejecutarse en un contenedorDocker y proporciona una API para renderizar otros sitios. Internamente, utiliza un motor WebKit (compatible con JavaScript) para cargar y renderizar completamente un sitio web. Esto es lo que utilizaremos para obtener los datos del sitio web. Nuestro servidor aiohttp, mostrado enel Ejemplo 4-14, llamará a esta API Splash para obtener los datos de la página.

Consejo

Para obtener y ejecutar el contenedor Splash, ejecuta estos comandos en tu shell:

$ docker pull scrapinghub/splash
$ docker run --rm -p 8050:8050 scrapinghub/splash

Nuestro servidor backend llamará a la API de Splash en http://localhost:8050.

Ejemplo 4-14. Código del raspador de noticias
from asyncio import gather, create_task
from string import Template
from aiohttp import web, ClientSession
from bs4 import BeautifulSoup

async def news(request):  1
    sites = [
        ('http://edition.cnn.com', cnn_articles),  2
        ('http://www.aljazeera.com', aljazeera_articles),
    ]
    tasks = [create_task(news_fetch(*s)) for s in sites] 3
    await gather(*tasks)  4

    items = {  5
        text: (  6
            f'<div class="box {kind}">'
            f'<span>'
            f'<a href="{href}">{text}</a>'
            f'</span>'
            f'</div>'
        )
        for task in tasks for href, text, kind in task.result()
    }
    content = ''.join(items[x] for x in sorted(items))

    page = Template(open('index.html').read())  7
    return web.Response(
        body=page.safe_substitute(body=content),  8
        content_type='text/html',
    )

async def news_fetch(url, postprocess):
    proxy_url = (
        f'http://localhost:8050/render.html?'  9
        f'url={url}&timeout=60&wait=1'
    )
    async with ClientSession() as session:
        async with session.get(proxy_url) as resp:  10
            data = await resp.read()
            data = data.decode('utf-8')
    return postprocess(url, data)  11

def cnn_articles(url, page_data):  12
    soup = BeautifulSoup(page_data, 'lxml')
    def match(tag):
        return (
            tag.text and tag.has_attr('href')
            and tag['href'].startswith('/')
            and tag['href'].endswith('.html')
            and tag.find(class_='cd__headline-text')
        )
    headlines = soup.find_all(match)  13
    return [(url + hl['href'], hl.text, 'cnn')
            for hl in headlines]

def aljazeera_articles(url, page_data):  14
    soup = BeautifulSoup(page_data, 'lxml')
    def match(tag):
        return (
            tag.text and tag.has_attr('href')
            and tag['href'].startswith('/news')
            and tag['href'].endswith('.html')
        )
    headlines = soup.find_all(match)
    return [(url + hl['href'], hl. text, 'aljazeera')
            for hl in headlines]

app = web.Application()
app.router.add_get('/news', news)
web.run_app(app, port=8080)
1

La función news() es el manejador de la URL /noticias de nuestro servidor. Devuelve la página HTML que muestra todos los titulares.

2

Aquí, sólo tenemos dos sitios web de noticias para raspar: CNN y Al Jazeera. Podrían añadirse fácilmente más, pero entonces también habría que añadir postprocesadores adicionales, como las funciones cnn_articles() yaljazeera_articles(), personalizadas para extraer datos de titulares.

3

Para cada sitio de noticias, creamos una tarea para obtener y procesar los datos de la página HTML de su portada. Ten en cuenta que descomprimimos la tupla ((*s)), ya que la función coroutine news_fetch() toma como parámetros tanto la URL como la función de postprocesamiento. Cada llamada a news_fetch() devolverá una lista de tuplascomo resultados de la cabecera, de la forma <article URL>, <article title>.

4

Todas las tareas se reúnen en un único Future(gather() devuelve un futuro que representa el estado de todas las tareas que se están reuniendo), y luego inmediatamente await la finalización de ese futuro. Esta línea se suspenderá hasta que el futuro se complete.

5

Como ya se han completado todas las tareas de news_fetch(), recopilamos todos los resultados en un diccionario. Observa cómo se utilizan comprensiones anidadas para iterar sobre las tareas, y luego sobre la lista de tuplas devueltas por cada tarea. También utilizamos cadenas f para sustituir datos directamente, incluyendo incluso el tipo de página, que se utilizará en CSS para colorear el fondo de div.

6

En este diccionario, la clave es el título del titular, y el valor es una cadena HTML para un div que se mostrará en nuestra página de resultados.

7

Nuestro servidor web va a devolver HTML. Vamos a cargar datos HTML de un archivo local llamado index.html. Este archivo se presenta en elEjemplo B-1 por si quieres recrear tú mismo el caso práctico.

8

Sustituimos el titular recogido div en la plantilla y devolvemos la página al cliente del navegador. Esto genera la página que se muestra en la Figura 4-1.

9

Aquí, dentro de la función coroutine news_fetch(), tenemos una pequeña plantilla para acceder a la API Splash (que, en mi caso, se ejecuta en un contenedor Docker local en el puerto 8050). Esto demuestra cómo puede utilizarse aiohttpcomo cliente HTTP.

10

La forma estándar es crear una instancia ClientSession(), y luego utilizar el método get() en la instancia de sesión para realizar la llamada REST. En la línea siguiente, se obtienen los datos de respuesta. Ten en cuenta que, como siempre estamos operando en coroutines, con async with y await, esta coroutine nunca se bloqueará: podremos gestionar muchos miles de estas peticiones, aunque esta operación (news_fetch()) pueda ser relativamente lenta, ya que estamos realizando llamadas web internamente.

11

Una vez obtenidos los datos, llamamos a la función de postprocesamiento. Para la CNN, será cnn_articles(), y para Al Jazeera será aljazeera_articles().

12

Sólo tenemos espacio para echar un breve vistazo al postprocesamiento. Tras obtener los datos de la página, utilizamos la biblioteca Beautiful Soup 4 para extraer los titulares.

13

La función match() devolverá todas las etiquetas que coincidan (he comprobado manualmente la fuente HTML de estos sitios web de noticias para averiguar qué combinación de filtros extrae las mejores etiquetas), y luego devolvemos una lista de tuplas que coincidan con el formato <article URL>, <article title>.

14

Este es el postprocesador análogo para Al Jazeera. La condición match()es ligeramente diferente, pero por lo demás es la misma que la de CNN.

En general, verás que aiohttp tiene una API sencilla y "no te estorba" mientras desarrollas tus aplicaciones.

En la siguiente sección, veremos cómo utilizar ZeroMQ con asyncio, que tiene el curioso efecto de hacer que la programación de sockets sea bastante agradable.

ØMQ (ZeroMQ)

La programación es una ciencia disfrazada de arte, porque la mayoría de nosotros no entendemos la física del software y rara vez, o nunca, se enseña. La física del software no son los algoritmos, las estructuras de datos, los lenguajes y las abstracciones. La verdadera física del software es la física de las personas. Concretamente, se trata de nuestras limitaciones ante la complejidad y nuestro deseo de trabajar juntos para resolver grandes problemas por partes. Esta es la ciencia de la programación: crea bloques de construcción que la gente pueda entender y utilizar fácilmente, y la gente colaborará para resolver los problemas más grandes.

Pieter Hintjens, ZeroMQ: Mensajería para muchas aplicaciones

ØMQ (o ZeroMQ) es una popular biblioteca independiente del lenguaje para aplicaciones de red: proporciona sockets "inteligentes". Cuando creas sockets ØMQ en código, se parecen a los sockets normales, con nombres de métodos reconocibles como recv() ysend(), etc., pero internamente estos sockets se encargan de algunas de las tareas más molestas y tediosas necesarias para trabajar con sockets convencionales.

Una de las funciones que ofrece es la gestión del paso de mensajes, para que no tengas que inventar tu propio protocolo y contar bytes en el cable para averiguar cuándo han llegado todos los bytes de un mensaje concreto: simplemente envía lo que consideres un "mensaje", y todo llegará intacto al otro extremo.

Otra gran característica es la lógica de reconexión automática. Si el servidor se desconecta y vuelve a conectarse más tarde, el socket ØMQ del cliente se reconectaráautomáticamente. Y lo que es aún mejor, los mensajes que tu código envíe al socket se almacenarán en la memoria intermedia durante el periodo de desconexión, de modo que se seguirán enviando cuando vuelva el servidor. Éstas son algunas de las razones por las que a veces se denomina a ØMQmensajeríasin intermediario: proporciona algunas de las características del software de intermediación de mensajes directamente en los propios objetos socket.

Los sockets ØMQ ya están implementados como asíncronos internamente (por lo que pueden mantener muchos miles de conexiones concurrentes, incluso cuando se utilizan en código roscado), pero esto se nos oculta tras la API ØMQ. No obstante, se ha añadido soporte para Asyncio a los enlaces Python dePyZMQ para la biblioteca ØMQ, y en esta sección vamos a ver varios ejemplos de cómo podrías incorporar estos sockets inteligentes a tus aplicaciones Python.

Caso práctico: Enchufes múltiples

He aquí un rompecabezas: si ØMQ proporciona sockets que ya son asíncronos, de forma que se pueden utilizar con roscado, ¿qué sentido tiene utilizar ØMQ con asyncio? La respuesta es un código más limpio.

Para demostrarlo, veamos un pequeño caso práctico en el que utilizas varios sockets ØMQ en la misma aplicación. En primer lugar, el Ejemplo 4-15muestra la versión de bloqueo (este ejemplo está tomado de la zguide, la guía oficial de ØMQ).

Ejemplo 4-15. El enfoque tradicional de ØMQ
# poller.py
import zmq

context = zmq.Context()
receiver = context.socket(zmq.PULL)  1
receiver.connect("tcp://localhost:5557")

subscriber = context.socket(zmq.SUB)  2
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')

poller = zmq.Poller()  3
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

while True:
    try:
        socks = dict(poller.poll())  4
    except KeyboardInterrupt:
        break

    if receiver in socks:
        message = receiver.recv_json()
        print(f'Via PULL: {message}')

    if subscriber in socks:
        message = subscriber.recv_json()
        print(f'Via SUB: {message}')
1

Los sockets ØMQ tienen tipos. Éste es un socket PULL. Puedes pensar en él como un tipo de socketde sólo recepción que será alimentado por algún otro socketde sólo envío, que será del tipo PUSH.

2

El socket SUB es otro tipo de socket de sólo recepción, y se alimentará de un socket PUB que es de sólo envío.

3

Si necesitas mover datos entre varios sockets en una aplicación ØMQ con hilos, vas a necesitar un poller. Esto se debe a que estos sockets no son seguros para hilos, por lo que no puedes recv() en diferentes sockets en diferentes hilos.1

4

Funciona de forma similar a la llamada al sistema select(). El encuestador se desbloqueará cuando haya datos listos para ser recibidos en uno de los sockets registrados, y entonces depende de ti extraer los datos y hacer algo con ellos. El gran bloqueo de if es cómo detectas el socket correcto.

Utilizar un bucle de sondeo más un bloque de selección de socket explícito hace que el código parezca un poco tosco, pero este enfoque evita los problemas de seguridad de los hilos al garantizar que no se utiliza el mismo socket desde distintos hilos.

El Ejemplo 4-16 muestra el código del servidor.

Ejemplo 4-16. Código del servidor
# poller_srv.py
import zmq, itertools, time

context = zmq.Context()
pusher = context.socket(zmq.PUSH)
pusher.bind("tcp://*:5557")

publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")

for i in itertools.count():
    time.sleep(1)
    pusher.send_json(i)
    publisher.send_json(i)

Este código no es importante para la discusión, pero brevemente: hay un socket PUSH y un socket PUB, como he dicho antes, y un bucle dentro que envía datos a ambos sockets cada segundo. Aquí tienes un ejemplo de la salida de poller.py (nota: ambos programas deben estar en ejecución):

$ poller.py
Via PULL: 0
Via SUB: 0
Via PULL: 1
Via SUB: 1
Via PULL: 2
Via SUB: 2
Via PULL: 3
Via SUB: 3

El código funciona; sin embargo, nuestro interés aquí no es si el código se ejecuta, sino si asyncio tiene algo que ofrecer a la estructura depoller.py. Lo fundamental es comprender que nuestro códigoasyncio se va a ejecutar en un único hilo, lo que significa que no hay problema en manejar diferentes sockets en diferentes coroutines-y, de hecho, esto es exactamente lo que haremos.

Por supuesto,alguien tuvo que hacer el arduo trabajo de añadir soporte para coroutines en la propia pyzmq(la biblioteca cliente de Python para ØMQ) para que esto funcionara, así que no fue gratis. Pero podemos aprovechar ese duro trabajo para mejorar la estructura de código "tradicional", como se muestra en el Ejemplo 4-17.

Ejemplo 4-17. Separación limpia con asyncio
# poller_aio.py
import asyncio
import zmq
from zmq.asyncio import Context

context = Context()

async def do_receiver():
    receiver = context.socket(zmq.PULL)  1
    receiver.connect("tcp://localhost:5557")
    while message := await receiver.recv_json():  2
        print(f'Via PULL: {message}')

async def do_subscriber():
    subscriber = context.socket(zmq.SUB)  3
    subscriber.connect("tcp://localhost:5556")
    subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
    while message := await subscriber.recv_json():  4
        print(f'Via SUB: {message}')

async def main():
    await asyncio.gather(
        do_receiver(),
        do_subscriber(),
    )

asyncio.run(main())
1

Este ejemplo de código hace lo mismo que el Ejemplo 4-15, salvo que ahora aprovechamos las coroutines para reestructurarlo todo. Ahora podemos tratar cada socket de forma aislada. He creado dos funciones coroutine, una para cada socket; ésta es para el socket PULL.

2

Estoy utilizando el soporte asyncio en pyzmq, lo que significa que todas las llamadas asend() y recv() deben utilizar la palabra clave await. Poller ya no aparece en ninguna parte, porque se ha integrado en el propio bucle de eventos asyncio.

3

Este es el manejador del socket SUB. La estructura es muy similar a la del manejador del socket PULL, pero no tenía por qué ser así. Si hubiera sido necesaria una lógica más compleja, habría podido añadirla fácilmente aquí, totalmente encapsulada sólo en el código del manejador de SUB.

4

De nuevo, los sockets compatibles con asyncio requieren la palabra clave await para enviar y recibir.

El resultado es el mismo que antes, así que no lo mostraré.

El uso de coroutines tiene, en mi opinión, un efecto asombrosamente positivo en la disposición del código en estos ejemplos. En un código de producción real con muchos sockets ØMQ, los manejadores de coroutina para cada uno de ellos podrían estar incluso en archivos separados, proporcionando más oportunidades para una mejor estructura del código. E incluso para programas con una única toma de lectura/escritura, es muy fácil utilizar coroutinas separadas para la lectura y la escritura, si es necesario.

El código mejorado se parece mucho al código roscado y, de hecho, para el ejemplo concreto que se muestra aquí, el mismo refactor funcionará para roscado: ejecutar las funciones do_receiver() y do_subscriber() bloqueantes en hilos separados. Pero, ¿realmente quieres lidiar incluso con la posibilidadde que se produzcan condiciones de carrera, especialmente a medida que tu aplicación crece en características y complejidad con el tiempo?

Hay mucho que explorar aquí, y como he dicho antes, es muy divertido jugar con estos sockets mágicos. En el próximo estudio de caso, veremos un uso más práctico de ØMQ.

Caso práctico: Monitoreo del rendimiento de las aplicaciones

Con las modernas prácticas de implementación basadas en contenedores y microservicios de hoy en día, algunas cosas que antes eran triviales, como el monitoreo del uso de CPU y memoria de tus aplicaciones, se han vuelto algo más complicadas que simplemente ejecutar top. En los últimos años han surgido varios productos comerciales para solucionar estos problemas, pero su coste puede ser prohibitivo para los equipos de pequeñas startups y aficionados.

En este caso práctico, utilizaré ØMQ y asyncio para construir un prototipo de juguete para el monitoreo de aplicaciones distribuidas. Nuestro diseño consta de tres partes:

Capa de aplicación

Esta capa contiene todas nuestras aplicaciones. Algunos ejemplos podrían ser un microservicio de "clientes", un microservicio de "reservas", un microservicio de "envío de correos electrónicos", etc. Añadiré un socket "transmisor" ØMQ a cada una de nuestras aplicaciones. Este socket enviará métricas de rendimiento a un servidor central.

Capa de recogida

El servidor central expondrá un socket ØMQ para recoger los datos de todas las instancias de la aplicación en ejecución. El servidor también servirá una página web para mostrar gráficos de rendimiento a lo largo del tiempo y transmitirá en directo los datos a medida que vayan llegando.

Capa de visualización

Esta es la página web que se sirve. Mostraremos los datos recogidos en un conjunto de gráficos, que se actualizarán en tiempo real. Para simplificar los ejemplos de código, utilizaré la práctica biblioteca JavaScript Smoothie Charts, que proporciona todas las funciones necesarias del lado del cliente.

La app backend (capa de aplicación) que produce métricas se muestra en el Ejemplo 4-18.

Ejemplo 4-18. La capa de aplicación: producción de métricas
import argparse
import asyncio
from random import randint, uniform
from datetime import datetime as dt
from datetime import timezone as tz
from contextlib import suppress
import zmq, zmq.asyncio, psutil

ctx = zmq.asyncio.Context()

async def stats_reporter(color: str):  1
    p = psutil.Process()
    sock = ctx.socket(zmq.PUB)  2
    sock.setsockopt(zmq.LINGER, 1)
    sock.connect('tcp://localhost:5555')  3
    with suppress(asyncio.CancelledError):  4
        while True:  5
            await sock.send_json(dict(  6
                color=color,
                timestamp=dt.now(tz=tz.utc).isoformat(),  7
                cpu=p.cpu_percent(),
                mem=p.memory_full_info().rss / 1024 / 1024
            ))
            await asyncio.sleep(1)
    sock.close()  8

async def main(args):
    asyncio.create_task(stats_reporter(args.color))
    leak = []
    with suppress(asyncio.CancelledError):
        while True:
            sum(range(randint(1_000, 10_000_000)))  9
            await asyncio.sleep(uniform(0, 1))
            leak += [0] * args.leak

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--color', type=str)  10
    parser.add_argument('--leak', type=int, default=0)
    args = parser.parse_args()
    try:
        asyncio.run(main(args))
    except KeyboardInterrupt:
        print('Leaving...')
        ctx.term()  11
1

Esta función coroutine se ejecutará como una coroutine de larga duración, enviando continuamente datos al proceso servidor.

2

Crea un socket ØMQ. Como sabes, hay distintos tipos de socket; éste es del tipo PUB, que permite enviar mensajes unidireccionales a otro socket ØMQ. Este socket tiene -como dice la guía ØMQ- superpoderes. Se encargará automáticamente de toda la lógica de reconexión y almacenamiento en búfer por nosotros.

3

Conéctate al servidor.

4

Nuestra secuencia de apagado está dirigida por KeyboardInterrupt, más abajo. Cuando se reciba esa señal, se cancelarán todas las tareas. Aquí manejo la señal CancelledError con el práctico gestor de contexto suppress() del módulo de biblioteca estándar contextlib.

5

Iterar eternamente, enviando datos al servidor.

6

Dado que ØMQ sabe cómo trabajar con mensajes completos, y no sólo con trozos de un bytestream, abre la puerta a un montón de útiles envoltorios alrededor del lenguaje habitual de sock.send(): aquí, utilizo uno de esos métodos de ayuda,send_json(), que serializará automáticamente el argumento en JSON. Esto nos permite utilizar directamente un dict().

7

Una forma fiable de transmitir información fecha-hora es mediante el formato ISO 8601. Esto es especialmente cierto si tienes que pasar datos de fecha y hora entre programas escritos en distintos idiomas, ya que la gran mayoría de las implementaciones de idiomas podrán trabajar con esta norma.

8

Para terminar aquí, debemos haber recibido la excepción CancelledError resultante de la cancelación de la tarea. El socket ØMQ debe cerrarse para permitir el cierre del programa.

9

La función main() simboliza la aplicación de microservicio real. Se produce un trabajo falso con esta suma sobre números aleatorios, sólo para darnos algunos datos distintos de cero para verlos en la capa de visualización un poco más tarde.

10

Voy a crear varias instancias de esta aplicación, por lo que será conveniente poder distinguirlas (más adelante, en los gráficos) con un parámetro --color.

11

Por último, se puede terminar el contexto ØMQ.

El principal punto de interés es la función stats_reporter(). Es la que transmite los datos de las métricas (recogidos por la útil biblioteca psutil). Se puede suponer que el resto del código es una aplicación de microservicio típica.

El código del servidor del Ejemplo 4-19 recoge todos los datos y los sirve a un cliente web.

Ejemplo 4-19. La capa de recogida: este servidor recoge las estadísticas de los procesos
# metric-server.py
import asyncio
from contextlib import suppress
import zmq
import zmq.asyncio
import aiohttp
from aiohttp import web
from aiohttp_sse import sse_response
from weakref import WeakSet
import json

# zmq.asyncio.install()
ctx = zmq.asyncio.Context()
connections = WeakSet()  1

async def collector():
    sock = ctx.socket(zmq.SUB)  2
    sock.setsockopt_string(zmq.SUBSCRIBE, '')  3
    sock.bind('tcp://*:5555')  4
    with suppress(asyncio.CancelledError):
        while data := await sock.recv_json():  5
            print(data)
            for q in connections:
                await q.put(data)  6
    sock.close()

async def feed(request):  7
    queue = asyncio.Queue()
    connections.add(queue)  8
    with suppress(asyncio.CancelledError):
        async with sse_response(request) as resp:  9
            while data := await queue.get():  10
                print('sending data:', data)
                resp.send(json.dumps(data))  11
    return resp

async def index(request):  12
    return aiohttp.web.FileResponse('./charts.html')

async def start_collector(app):  13
    app['collector'] = app.loop.create_task(collector())

async def stop_collector(app):
    print('Stopping collector...')
    app['collector'].cancel()  14
    await app['collector']
    ctx.term()

if __name__ == '__main__':
    app = web.Application()
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/feed', feed)
    app.on_startup.append(start_collector)  15
    app.on_cleanup.append(stop_collector)
    web.run_app(app, host='127.0.0.1', port=8088)
1

Una mitad de este programa recibirá datos de otras aplicaciones, y la otra mitad proporcionará datos a los clientes del navegador a través de eventos enviados por el servidor (SSE). Utilizo un identificador WeakSet() para llevar la cuenta de todos los clientes web conectados en ese momento. Cada cliente conectado tendrá una instancia Queue() asociada, por lo que este identificadorconnections es en realidad un conjunto de colas.

2

Recuerda que en la capa de aplicación utilicé un socket zmq.PUB; aquí, en la capa de recogida, utilizo su compañero, el tipo de socket zmq.SUB. Este socket ØMQ sólo puede recibir, no enviar.

3

Para el tipo de socket zmq.SUB, es necesario proporcionar un nombre de suscripción, pero para nuestros propósitos, simplemente tomaremos todo lo que entre, de ahí el nombre de tema vacío.

4

Vinculo la toma zmq.SUB. Piénsalo un momento. En las configuraciones pub-sub, normalmente tienes que hacer que el extremo pub sea el servidor (bind()) y el extremo sub el cliente (connect()). ØMQ es diferente: cualquiera de los dos extremos puede ser el servidor. Para nuestro caso de uso, esto es importante, porque cada una de nuestras instancias de la capa de aplicación se conectará al mismo nombre de dominio del servidor de recogida, y no al revés.

5

El soporte para asyncio en pyzmq nos permite await datos de nuestras aplicaciones conectadas. Y no sólo eso, sino que los datos entrantes se deserializarán automáticamente a partir de JSON (sí, esto significa que data es un dict()).

6

Recuerda que nuestro conjunto connections contiene una cola para cada cliente web conectado. Ahora que se han recibido los datos, es el momento de enviarlos a todos los clientes: los datos se colocan en cada cola.

7

La función coroutine feed() creará coroutines para cada cliente web conectado. Internamente, los eventos enviados por el servidor se utilizan para enviar datos a los clientes web.

8

Como se ha descrito anteriormente, cada cliente web tendrá su propia instancia queue, para recibir datos de la coroutina collector(). La instancia queuese añade al conjunto connections, pero como connections es un conjunto débil, la entrada se eliminará automáticamente deconnections cuando queue salga del ámbito, es decir, cuando un cliente web se desconecte. Las referencias débiles son estupendas para simplificar este tipo de tareas de contabilidad.

9

El paquete aiohttp_sse proporciona el gestor de contexto sse_response(). Esto nos proporciona un ámbito dentro del cual enviar datos al cliente web.

10

Permanecemos conectados al cliente web, y esperamos los datos en la cola de este cliente específico.

11

En cuanto entren los datos (dentro de collector()), se enviarán al cliente web conectado. Observa que aquí vuelvo a serializar el dict data. Una optimización de este código sería evitar deserializar JSON en collector(), y en su lugar utilizar sock.recv_string() para evitar el viaje de ida y vuelta de la serialización. Por supuesto, en un escenario real, puede que quieras deserializar en el colector, y realizar alguna validación de los datos antes de enviarlos al cliente del navegador. ¡Cuántas opciones!

12

El punto final index() es la carga principal de la página, y aquí servimos un archivo estático llamado gráficos.html.

13

La biblioteca aiohttp nos proporciona facilidades para enganchar coroutinas de larga duración adicionales que podamos necesitar. Con la coroutina collector(), tenemos exactamente esa situación, así que creo una coroutina de inicio,start_collector(), y una coroutina de apagado. Éstas serán llamadas durante fases específicas de la secuencia de inicio y apagado de aiohttp. Observa que añado la tarea recolectora a la propia app, que implementa un protocolo de mapeo para que puedas utilizarla como un dict.

14

Obtengo nuestra coroutina collector() del identificadorapp y llamo a cancel() sobre ella.

15

Por último, puedes ver dónde se enganchan las coroutines personalizadas de inicio y apagado: la instancia app proporciona ganchos a los que se pueden añadir nuestras coroutines personalizadas.

Todo lo que queda es la capa de visualización, que se muestra en el Ejemplo 4-20. Estoy utilizando labiblioteca Smoothie Charts para generar gráficos desplazables, y el HTML completo de nuestra página web principal (y única),charts.html, se proporciona en el Ejemplo B-1. Hay demasiado HTML, CSS y JavaScript para presentar en esta sección, pero sí quiero destacar algunos puntos sobre cómo se gestionan en JavaScript los eventos enviados por el servidor en el cliente del navegador.

Ejemplo 4-20. La capa de visualización, que es una forma elegante de decir "el navegador".
<snip>
var evtSource = new EventSource("/feed");  1
evtSource.onmessage = function(e) {
    var obj = JSON.parse(e.data);  2
    if (!(obj.color in cpu)) {
        add_timeseries(cpu, cpu_chart, obj.color);
    }
    if (!(obj.color in mem)) {
        add_timeseries(mem, mem_chart, obj.color);
    }
    cpu[obj.color].append(
        Date.parse(obj.timestamp), obj.cpu);  3
    mem[obj.color].append(
        Date.parse(obj.timestamp), obj.mem);
};
<snip>
1

Crea una nueva instancia de EventSource() en la URL de /feed. El navegador se conectará a /feed en nuestro servidor, (metric_server.py). Ten en cuenta que el navegador intentará reconectarse automáticamente si se pierde la conexión. Los eventos enviados por el servidor suelen pasarse por alto, pero en muchas situaciones su simplicidad los hace preferibles a los WebSockets.

2

El evento onmessage se disparará cada vez que el servidor envíe datos. Aquí los datos se analizan como JSON.

3

El identificador cpu es un mapeo de un color a una instancia deTimeSeries() (para más información, consulta el Ejemplo B-1). Aquí, obtenemos esa serie temporal y le añadimos datos. También obtenemos la marca de tiempo y la analizamos para obtener el formato correcto que requiere el gráfico.

Ahora podemos ejecutar el código. Para poner en marcha todo el programa, se necesitan un montón de instrucciones de la línea de comandos, la primera de las cuales es poner en marcha el proceso del recopilador de datos:

$ metric-server.py
======== Running on http://127.0.0.1:8088 ========
(Press CTRL+C to quit)

El siguiente paso es poner en marcha todas las instancias de microservicio. Éstas enviarán sus métricas de uso de CPU y memoria al recopilador. Cada una se identificará con un color diferente, que se especifica en la línea de comandos. Observa cómo se indica a dos de los microservicios que pierdan algo de memoria:

$ backend-app.py --color red &
$ backend-app.py --color blue --leak 10000 &
$ backend-app.py --color green --leak 100000 &

La Figura 4-2 muestra nuestro producto final en un navegador. Tendrás que creer en mi palabra de que los gráficos realmente se animan. Observarás en las líneas de comando precedentes que añadí algunas fugas de memoria al azul, y muchas al verde. Incluso tuve que reiniciar el servicio verde unas cuantas veces para evitar que superara los 100 MB.

uaip 0402
Figura 4-2. ¡Será mejor que pongamos un SRE en verde lo antes posible!

Lo que es especialmente interesante de este proyecto es lo siguiente: cualquiera de las instancias en ejecución en cualquier parte de esta pila puede reiniciarse, y no es necesario ningún código de gestión de reconexión. Los sockets ØMQ, junto con la instancia JavaScript EventSource() en el navegador, se reconectan mágicamente y continúan donde lo dejaron.

En la siguiente sección, centraremos nuestra atención en las bases de datos y en cómo podría utilizarse asynciopara diseñar un sistema de invalidación de cachés.

asyncpg y Sanic

La bibliotecaasyncpg proporciona acceso de cliente a la base de datos PostgreSQL, pero se diferencia de otras bibliotecas de cliente Postgres compatibles conasynciopor su énfasis en la velocidad.asyncpg es obra de Yury Selivanov, uno de los principales desarrolladores de asyncio Python, que también es autor del proyecto uvloop.No tiene dependencias de terceros, aunque se requiereCython si se instala desde el código fuente.

asyncpg alcanza su velocidad trabajando directamente contra el protocolo binario de PostgreSQL, y otras ventajas de este enfoque de bajo nivel incluyen la compatibilidad consentencias preparadasy cursores desplazables.

Veremos un caso práctico en el que se utiliza asyncpg para la invalidación de cachés, pero antes será útil tener una comprensión básica de la API que proporciona asyncpg. Para todo el código de esta sección, necesitaremos una instancia en ejecución de PostgreSQL. Esto se hace más fácilmente con Docker, utilizando el siguiente comando:

$ docker run -d --rm -p 55432:5432 postgres

Ten en cuenta que he expuesto el puerto 55432 en lugar del predeterminado, 5432, por si ya tienes una instancia en ejecución de la base de datos en el puerto predeterminado. El Ejemplo 4-21 muestra brevemente cómo utilizar asyncpgpara hablar con PostgreSQL.

Ejemplo 4-21. Demostración básica de asyncpg
# asyncpg-basic.py
import asyncio
import asyncpg
import datetime
from util import Database  1

async def main():
    async with Database('test', owner=True) as conn:  2
        await demo(conn)

async def demo(conn: asyncpg.Connection):
    await conn.execute('''
        CREATE TABLE users(
            id serial PRIMARY KEY,
            name text,
            dob date
        )'''
    )  3

    pk = await conn.fetchval(  4
        'INSERT INTO users(name, dob) VALUES($1, $2) '
        'RETURNING id', 'Bob', datetime.date(1984, 3, 1)
    )

    async def get_row():  5
        return await conn.fetchrow(  6
            'SELECT * FROM users WHERE name = $1',
            'Bob'
        )
    print('After INSERT:', await get_row())  7

    await conn.execute(
        'UPDATE users SET dob = $1 WHERE id=1',
        datetime.date(1985, 3, 1)  8
    )
    print('After UPDATE:', await get_row())

    await conn.execute(
        'DELETE FROM users WHERE id=1'
    )
    print('After DELETE:', await get_row())

if __name__ == '__main__':
    asyncio.run(main())
1

He ocultado algunos textos repetitivos en un pequeño módulo de util para simplificar las cosas y mantener el mensaje principal.

2

La clase Database nos proporciona un gestor de contexto que creará una nueva base de datos para nosotros -en este caso llamada test- y destruirá esa base de datos cuando el gestor de contexto salga. Esto resulta muy útil cuando experimentamos con ideas en código. Como no se arrastra ningún estado entre experimentos, cada vez se parte de una base de datos limpia. Ten en cuenta que se trata de un gestor de contexto async with; hablaremos más de ello más adelante, pero por ahora, el área central de esta demostración es lo que ocurre dentro de la coroutina demo().

3

El gestor de contexto Database nos ha proporcionado una instancia Connection, que se utiliza inmediatamente para crear una nueva tabla, users.

4

Utilizo fetchval() para insertar un nuevo registro. Aunque podría haber utilizado execute()para realizar la inserción, la ventaja de utilizar fetchval()es que puedo obtener el id del registro recién insertado, que almaceno en el identificador pk.

Ten en cuenta que utilizo parámetros ($1 y $2) para pasar datos a la consulta SQL. Nunca utilices la interpolación o concatenación de cadenas para construir consultas, ¡ya que es un riesgo para la seguridad!

5

En el resto de esta demostración, voy a manipular los datos de la tabla users, así que aquí creo una nueva función coroutine de utilidad que obtiene un registro de la tabla. Esta función se llamará varias veces.

6

Cuando recuperes datos, es mucho más útil utilizar los métodos basados en fetch, porque devolverán objetos Record. asyncpg convertirá automáticamente los tipos de datos a los tipos más apropiados para Python.

7

Inmediatamente utilizo el ayudante get_row() para mostrar el registro recién insertado.

8

Modifico datos utilizando el comando UPDATE para SQL. Es una modificación minúscula: el valor del año en la fecha de nacimiento se cambia en un año. Como antes, esto se realiza con el método execute() de la conexión. El resto del código de la demostración sigue la misma estructura que lo visto hasta ahora, y un DELETE, seguido de otro print(), ocurre unas líneas más abajo.

Aquí tienes el resultado de ejecutar este script:

$ asyncpg-basic.py
After INSERT: <Record id=1 name='Bob' dob=datetime.date(1984, 3, 1)>
After UPDATE: <Record id=1 name='Bob' dob=datetime.date(1985, 3, 1)>
After DELETE: None

Observa cómo el valor de fecha recuperado en nuestro objeto Recordse ha convertido en un objeto Python date: asyncpg ha convertido automáticamente el tipo de dato del tipo SQL a su homólogo Python. En la documentación de asyncpg hay una gran tabla deconversiones de tipos que describe todas las correspondencias de tipos que incorpora la biblioteca.

El código anterior es muy sencillo, quizá incluso tosco si estás acostumbrado a la comodidad de los mapeadores objeto-relacionales (ORM) como SQLAlchemy o el ORM integrado en el marco web Django. Al final de este capítulo, menciono varias bibliotecas de terceros que proporcionan acceso a los ORM o a funciones similares a los ORM para asyncpg.

El Ejemplo 4-22 muestra mi objeto boilerplate Database en el módulo utils; puede que te resulte útil hacer algo parecido para tus propios experimentos.

Ejemplo 4-22. Herramientas útiles para tus experimentos con asyncpg
# util.py
import argparse, asyncio, asyncpg
from asyncpg.pool import Pool

DSN = 'postgresql://{user}@{host}:{port}'
DSN_DB = DSN + '/{name}'
CREATE_DB = 'CREATE DATABASE {name}'
DROP_DB = 'DROP DATABASE {name}'

class Database:
    def __init__(self, name, owner=False, **kwargs):
        self.params = dict(
            user='postgres', host='localhost',
            port=55432, name=name)  1
        self.params.update(kwargs)
        self.pool: Pool = None
        self.owner = owner
        self.listeners = []

    async def connect(self) -> Pool:
        if self.owner:
            await self.server_command(
                CREATE_DB.format(**self.params))  3

        self.pool = await asyncpg.create_pool(  4
            DSN_DB.format(**self.params))
        return self.pool

    async def disconnect(self):
        """Destroy the database"""
        if self.pool:
            releases = [self.pool.release(conn)
                        for conn in self.listeners]
            await asyncio.gather(*releases)
            await self.pool.close()  5
        if self.owner:
            await self.server_command(  6
                DROP_DB.format(**self.params))

    async def __aenter__(self) -> Pool:  2
        return await self.connect()

    async def __aexit__(self, *exc):
        await self.disconnect()

    async def server_command(self, cmd):  7
        conn = await asyncpg.connect(
            DSN.format(**self.params))
        await conn.execute(cmd)
        await conn.close()

    async def add_listener(self, channel, callback):  8
        conn: asyncpg.Connection = await self.pool.acquire()
        await conn.add_listener(channel, callback)
        self.listeners.append(conn)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--cmd', choices=['create', 'drop'])
    parser.add_argument('--name', type=str)
    args = parser.parse_args()
    d = Database(args.name, owner=True)
    if args.cmd == 'create':
        asyncio.run(d.connect())
    elif args.cmd == 'drop':
        asyncio.run(d.disconnect())
    else:
        parser.print_help()
1

La clase Database no es más que un elegante gestor de contexto para crear y eliminar una base de datos de una instancia PostgreSQL. El nombre de la base de datos se pasa al constructor.

2

(Nota: La secuencia de llamadas en el código es intencionadamente distinta a la de esta lista). Se trata de un gestor de contexto asíncrono. En lugar de los métodos habituales__enter__() y __exit__(), utilizo sus homólogos__aenter__() y __aexit__() .

3

Aquí, en la parte de entrada, crearé la nueva base de datos y devolveré una conexión a esa nueva base de datos. server_command() es otro método auxiliar definido unas líneas más abajo. Lo utilizo para ejecutar el comando de creación de nuestra nueva base de datos.

4

A continuación, establezco una conexión con la base de datos recién creada. Fíjate en que he codificado varios detalles de la conexión: esto es intencionado, ya que quería que los ejemplos de código fueran pequeños. Podrías generalizarlo fácilmente creando campos para el nombre de usuario, el nombre de host y el puerto.

5

En la parte de salida del gestor contextual, cierro la conexión y...

6

...destruir la base de datos.

7

Para completar, éste es nuestro método de utilidad para ejecutar comandos contra el propio servidor PostgreSQL. Crea una conexión a tal efecto, ejecuta el comando dado y sale.

8

Esta función crea una conexión de socket de larga duración a la base de datos que escuchará los eventos. Este mecanismo aparecerá en el próximo estudio de caso.

Precaución

En el punto 8 del código anterior, he creado una conexión dedicada para cada canal en el que quiero escuchar. Esto es caro, ya que significa que un trabajador PostgreSQL estará completamente ocupado para cada canal que se escuche. Un diseño mucho mejor sería utilizar una conexión para varios canales. Cuando hayas trabajado con este ejemplo, intenta modificar el código para utilizar una única conexión para múltiples canales de escucha.

Ahora que ya conoces los componentes básicos de asyncpg, podemos explorarlos más a fondo con un caso práctico realmente divertido: ¡utilizar el soporte integrado de PostgreSQL para enviar notificaciones de eventos para realizar la invalidación de la caché!

Caso práctico: Invalidación de caché

Hay dos cosas difíciles en informática: la invalidación de la caché, poner nombre a las cosas, y los errores fuera de lugar.

Phil Karlton

Es habitual en los servicios web y en las aplicaciones web que la capa de persistencia, es decir, la base de datos (BD) de respaldo, se convierta en el cuello de botella del rendimiento antes que cualquier otra parte de la pila. La capa de aplicación suele poder escalarse horizontalmente ejecutando más instancias, mientras que es más difícil hacerlo con una base de datos.

Por eso es una práctica habitual buscar opciones de diseño que puedan limitar la interacción excesiva con la base de datos. La opción más habitual es utilizar la memoria caché para "recordar" los resultados obtenidos previamente en la base de datos y reproducirlos cuando se soliciten, evitando así llamadas posteriores a la BD para obtener la misma información.

Sin embargo, ¿qué ocurre si una de tus instancias de aplicación escribe nuevos datos en la base de datos mientras otra instancia de aplicación sigue devolviendo los datos antiguos y obsoletos de su caché interna? Este es un problema clásico de invalidación de la caché, y puede ser muy difícil de resolver de forma sólida.

Nuestra estrategia de ataque es la siguiente

  1. Cada instancia de la aplicación tiene una caché en memoria de las consultas a la BD.

  2. Cuando uno escribe nuevos datos en la base de datos, ésta avisa a todas las instancias de app conectadas de los nuevos datos.

  3. Cada instancia de la app actualiza entonces su caché interna en consecuencia.

Este caso práctico destacará cómo PostgreSQL, con su soporte incorporado para actualizaciones de eventos a través de la funciónLISTEN yNOTIFY puede indicarnos de forma sencilla cuándo han cambiado sus datos.

asyncpg ya tiene soporte para la API LISTEN/NOTIFY. Esta característica de PostgreSQL permite a tu aplicación suscribirse a eventos en un canal con nombre y enviar eventos a canales con nombre. ¡PostgreSQL casi puede convertirse en una versión más ligera deRabbitMQ oActiveMQ!

Este caso práctico tiene más partes móviles de lo habitual, y eso hace que sea difícil presentarlo en el formato lineal habitual. En lugar de eso, empezaremos por el producto final, y luego iremos hacia la implementación subyacente.

Nuestra aplicación proporciona un servidor API basado en JSON para gestionar los platos favoritos de los clientes de nuestro restaurante robotizado. La base de datos de respaldo tendrá una sola tabla, patron, con sólo dos campos: name y fav_dish. Nuestra API permitirá el conjunto habitual de cuatro operaciones: crear, leer,actualizar y eliminar (CRUD).

A continuación se muestra un ejemplo de interacción con nuestra API utilizando curl, que ilustra cómo crear una nueva entrada en nuestra base de datos (aún no he mostrado cómo poner en marcha el servidor que se ejecuta en localhost:8000; eso vendrá más adelante):

$ curl -d '{"name": "Carol", "fav_dish": "SPAM Bruschetta"}' \
    -H "Content-Type: application/json" \
    -X POST \
    http://localhost:8000/patron
{"msg":"ok","id":37}

El parámetro -d es para los datos,2 -H es para las cabeceras HTTP, -Xes para el método de solicitud HTTP (las alternativas incluyen GET, DELETE, PUT, y algunas otras), y la URL es para nuestro servidor API. En breve veremos el código correspondiente.

En la salida, vemos que la creación fue ok, y la id que se devuelve es la clave primaria del nuevo registro en la base de datos.

En los siguientes fragmentos de shell, repasaremos las otras tres operaciones:leer, actualizar y borrar. Podemos leer el registro de patrón que acabamos de crear con este comando:

$ curl -X GET http://localhost:8000/patron/37
{"id":37,"name":"Carol","fav_dish":"SPAM Bruschetta"}

Leer los datos es bastante sencillo. Ten en cuenta que la dirección id del registro deseado debe indicarse en la URL.

A continuación, actualizaremos el registro y comprobaremos los resultados:

$ curl -d '{"name": "Eric", "fav_dish": "SPAM Bruschetta"}' \
    -H "Content-Type: application/json" \
    -X PUT \
    http://localhost:8000/patron/37
$ curl -X GET http://localhost:8000/patron/37
{"msg":"ok"}
{"id":37,"name":"Eric","fav_dish":"SPAM Bruschetta"}

Actualizar un recurso es similar a crearlo, con dos diferencias clave:

  • El método de solicitud HTTP (-X) es PUT, no POST.

  • La URL requiere ahora el campo id para especificar qué recurso actualizar.

Por último, podemos eliminar el registro y verificar su eliminación con los siguientes comandos:

$ curl -X DELETE http://localhost:8000/patron/37
$ curl -X GET http://localhost:8000/patron/37
{"msg":"ok"}
null

Como puedes ver, se devuelve null cuando intentasGET un registro que no existe.

Hasta ahora todo esto parece bastante ordinario, pero nuestro objetivo no es sólo hacer una API CRUD: queremos examinar la invalidación de la caché. Así que dirijamos nuestra atención hacia la caché. Ahora que tenemos un conocimiento básico de la API de nuestra aplicación, podemos mirar los registros de la aplicación para ver los datos de tiempo de cada solicitud: esto nos dirá qué solicitudes se almacenan en caché y cuáles llegan a la BD.

Cuando se inicia el servidor por primera vez, la caché está vacía; al fin y al cabo, es una caché de memoria. Vamos a poner en marcha nuestro servidor y, a continuación, en un intérprete de comandos separado, ejecutaremos dos peticiones GET en rápida sucesión:

$ curl -X GET http://localhost:8000/patron/29
$ curl -X GET http://localhost:8000/patron/29
{"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"}
{"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"}

Esperamos que la primera vez que recuperemos nuestro registro, se produzca un fallo de caché, y la segunda vez, un acierto. Podemos ver pruebas de ello en el registro del propio servidor de la API (el primer servidor web de Sanic, que se ejecuta en localhost:8000):

$ sanic_demo.py
2019-09-29 16:20:33 - (sanic)[DEBUG]:
                 ▄▄▄▄▄
        ▀▀▀██████▄▄▄       _______________
      ▄▄▄▄▄  █████████▄  /                 \
     ▀▀▀▀█████▌ ▀▐▄ ▀▐█ |   Gotta go fast!  |
   ▀▀█████▄▄ ▀██████▄██ | _________________/
   ▀▄▄▄▄▄  ▀▀█▄▀█════█▀ |/
        ▀▀▀▄  ▀▀███ ▀       ▄▄
     ▄███▀▀██▄████████▄ ▄▀▀▀▀▀▀█▌
   ██▀▄▄▄██▀▄███▀ ▀▀████      ▄██
▄▀▀▀▄██▄▀▀▌████▒▒▒▒▒▒███     ▌▄▄▀
▌    ▐▀████▐███▒▒▒▒▒▐██▌
▀▄▄▄▄▀   ▀▀████▒▒▒▒▄██▀
          ▀▀█████████▀
        ▄▄██▀██████▀█
      ▄██▀     ▀▀▀  █
     ▄█             ▐▌
 ▄▄▄▄█▌              ▀█▄▄▄▄▀▀▄
▌     ▐                ▀▀▄▄▄▀
 ▀▀▄▄▀

2019-09-29 16:20:33 (sanic): Goin' Fast @ http://0.0.0.0:8000
2019-09-29 16:20:33 (sanic): Starting worker [10366]  1
2019-09-29 16:25:27 (perf): id=37 Cache miss  2
2019-09-29 16:25:27 (perf): get Elapsed: 4.26 ms 3
2019-09-29 16:25:27 (perf): get Elapsed: 0.04 ms 4
1

Todo lo que hay hasta esta línea es el mensaje de registro de inicio predeterminado de sanic.

2

Como se ha descrito, la primera GET provoca una pérdida de caché porque el servidor acaba de iniciarse.

3

Esto es de nuestro primer curl -X GET. He añadido algunas funciones de temporización a los puntos finales de la API. Aquí podemos ver que el gestor de la petición GET tardó ~4 ms.

4

El segundo GET devuelve datos de la caché, y los datos de temporización mucho más rápidos (¡100 veces más rápidos!).

Hasta aquí, nada inusual. Muchas aplicaciones web utilizan la caché de este modo.

Ahora vamos a iniciar una segunda instancia de la app en el puerto 8001 (la primera instancia estaba en el puerto 8000):

$ sanic_demo.py --port 8001
<snip>
2017-10-02 08:09:56 - (sanic): Goin' Fast @ http://0.0.0.0:8001
2017-10-02 08:09:56 - (sanic): Starting worker [385]

Ambas instancias, por supuesto, se conectan a la misma base de datos. Ahora, con las dos instancias del servidor API en funcionamiento, vamos a modificar los datos para el clienteJuan, que claramente carece de suficiente Spam en su dieta. Aquí realizamos un UPDATE contra la primera instancia de la app en el puerto 8000:

$ curl -d '{"name": "John Cleese", "fav_dish": "SPAM on toast"}' \
    -H "Content-Type: application/json" \
    -X PUT \
    http://localhost:8000/patron/29
{"msg":"ok"}

Inmediatamente después de este evento de actualización en una sola de las instancias de la aplicación, ambosservidores API, el 8000 y el 8001, informan del evento en sus respectivos registros:

2019-10-02 08:35:49 - (perf)[INFO]: Got DB event:
{
    "table": "patron",
    "id": 29,
    "type": "UPDATE",
    "data": {
        "old": {
            "id": 29,
            "name": "John Cleese",
            "fav_dish": "Gravy on Toast"
        },
        "new": {
            "id": 29,
            "name": "John Cleese",
            "fav_dish": "SPAM on toast"
        },
        "diff": {
            "fav_dish": "SPAM on toast"
        }
    }
}

La base de datos ha comunicado el evento de actualización a ambas instancias de la aplicación, pero aún no hemos realizado ninguna solicitud a la instancia 8001 de la aplicación.

Para comprobarlo, podemos hacer un GET en el segundo servidor, en el puerto 8001:

$ curl -X GET http://localhost:8001/patron/29
{"id":29,"name":"John Cleese","fav_dish":"SPAM on toast"}

La información sobre el tiempo en la salida del registro muestra que, efectivamente, obtenemos los datos directamente de la caché, aunque ésta sea nuestra primera petición:

2019-10-02 08:46:45 - (perf)[INFO]: get Elapsed: 0.04 ms

El resultado es que, cuando cambia la base de datos, todas las instancias de aplicación conectadas reciben una notificación, lo que les permite actualizar sus cachés.

Con esta explicación fuera del camino, ahora podemos ver la implementación del códigoasyncpg necesaria para que nuestra invalidación de caché funcione realmente. El diseño básico del código del servidor que se muestra en el Ejemplo 4-23 es el siguiente:

  1. Tenemos una API web sencilla que utiliza el nuevomarco web Sanic, compatible con asyncio.

  2. Los datos se almacenarán en una instancia PostgreSQL backend, pero la API se servirá a través de múltiples instancias de los servidores de la app API web.

  3. Los servidores de la aplicación almacenarán en caché los datos de la base de datos.

  4. Los servidores de aplicaciones se suscribirán a eventos a través de asyncpg en tablas específicas de la BD, y recibirán notificaciones de actualización cuando los datos de la tabla de la BD hayan cambiado. Esto permite a los servidores de aplicaciones actualizar sus cachés individuales en memoria.

Ejemplo 4-23. Servidor API con Sanic
# sanic_demo.py
import argparse
from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import json
from util import Database  1
from perf import aelapsed, aprofiler  2
import model

app = Sanic()  3

@aelapsed
async def new_patron(request):  4
    data = request.json  5
    id = await model.add_patron(app.pool, data)  6
    return json(dict(msg='ok', id=id))  7

class PatronAPI(HTTPMethodView, metaclass=aprofiler):  8
    async def get(self, request, id):
        data = await model.get_patron(app.pool, id)  9
        return json(data)

    async def put(self, request, id):
        data = request.json
        ok = await model.update_patron(app.pool, id, data)
        return json(dict(msg='ok' if ok else 'bad'))  10

    async def delete(self, request, id):
        ok = await model.delete_patron(app.pool, id)
        return json(dict(msg='ok' if ok else 'bad'))

@app.listener('before_server_start')  11
async def db_connect(app, loop):
    app.db = Database('restaurant', owner=False)  12
    app.pool = await app.db.connect()  13
    await model.create_table_if_missing(app.pool)  14
    await app.db.add_listener('chan_patron', model.db_event)  15

@app.listener('after_server_stop')  16
async def db_disconnect(app, loop):
    await app.db.disconnect()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--port', type=int, default=8000)
    args = parser.parse_args()
    app.add_route(
        new_patron, '/patron', methods=['POST'])  17
    app.add_route(
        PatronAPI.as_view(), '/patron/<id:int>')  18
    app.run(host="0.0.0.0", port=args.port)
1

El ayudante de la utilidad Database, como se ha descrito anteriormente. Proporcionará los métodos necesarios para conectar con la base de datos.

2

He improvisado dos herramientas más para registrar el tiempo transcurrido de cada punto final de la API. Lo utilicé en la discusión anterior para detectar cuándo se devolvía un GET desde la caché. Las implementaciones deaelapsed() y aprofiler() no son importantes para este caso práctico, pero puedes obtenerlas en el Ejemplo B-1.

3

Creamos la instancia principal de la app Sanic.

4

Esta función coroutine sirve para crear nuevas entradas de patrón. En una llamada aadd_route() hacia la parte inferior del código,new_patron() se asocia con el endpoint /patron, sólo para el método HTTPPOST. El decorador @aelapsed no forma parte de la API de Sanic: es una invención mía, simplemente para registrar los tiempos de cada llamada.

5

Sanic proporciona deserialización inmediata de los datos JSON recibidos utilizando el atributo .json del objeto request.

6

El módulo model, que he importado, es el modelo de nuestra tablapatron en la base de datos. Lo explicaré con más detalle en el siguiente listado de código; por ahora, entiende que todas las consultas a la base de datos y el SQL están en este módulo model. Aquí estoy pasando el pool de conexiones para la base de datos, y se utiliza el mismo patrón para toda la interacción con el modelo de la base de datos en esta función y en la clase PatronAPI más adelante.

7

Se creará una nueva clave primaria, id, que se devolverá a la persona que llama como JSON.

8

Mientras que la creación se gestiona en la función new_patron(), el resto de interacciones se gestionan en esta vista basada en la clase, que es una comodidad proporcionada por Sanic. Todos los métodos de esta clase están asociados a la misma URL, /patron/<id:int>, que puedes ver en la función add_route() cerca de la parte inferior. Ten en cuenta que el parámetro URLid se pasará a cada uno de los métodos, y este parámetro es necesario para los tres puntos finales.

Puedes ignorar sin problemas el argumento metaclass: lo único que hace es envolver cada método con el decorador @aelapsed para que se impriman los tiempos en los registros. De nuevo, esto no forma parte de la API de Sanic; es una invención mía para registrar los datos de temporización.

9

Como antes, la interacción del modelo se realiza dentro del módulo model.

10

Si el modelo informa de un fallo al realizar la actualización, modifico los datos de respuesta. He incluido esto para los lectores que aún no hayan visto la versión de Python del operador ternario.

11

Los decoradores @app.listener son ganchos proporcionados por Sanic para darte un lugar donde añadir acciones adicionales durante la secuencia de arranque y apagado. Éste, before_server_start, se invoca antes de que se inicie el servidor API. Parece un buen lugar para inicializar nuestra conexión a la base de datos.

12

Utiliza el ayudante Database para crear una conexión a nuestra instancia PostgreSQL. La base de datos a la que nos conectamos es restaurant.

13

Obtener un pool de conexión a nuestra base de datos.

14

Utiliza nuestro modelo (para la tabla patron ) para crear la tabla si falta.

15

Utiliza nuestro modelo para crear un escucha_dedicado para los eventos de la base de datos, escuchando en el canal chan_patron. La función de llamada de retorno para estos eventos es model.db_event(), que repasaré en el siguiente listado. Se llamará a la llamada de retorno cada vez que la base de datos actualice el canal.

16

after_server_stop es el gancho para las tareas que deben ocurrir durante la desconexión. Aquí nos desconectamos de la base de datos.

17

Esta llamada a add_route() envía POST peticiones de la URL /patron a la función coroutine new_patron().

18

Esta llamada a add_route() envía todas las peticiones de la URL /patron/<id:int> a la vista basada en la clase PatronAPI. Los nombres de los métodos de esa clase determinan a cuál se llama: una solicitud HTTP GET llamará al método PatronAPI.get(), y así sucesivamente.

El código anterior contiene toda la gestión HTTP de nuestro servidor, así como las tareas de arranque y apagado, como la configuración de un grupo de conexiones a la base de datos y, lo que es más importante, la configuración de un receptor db-event en el canal chan_patron del servidor de base de datos.

El Ejemplo 4-24 presenta el modelo de la tabla patron de la base de datos.

Ejemplo 4-24. Modelo de BD para la tabla "mecenas
# model.py
import logging
from json import loads, dumps
from triggers import (
    create_notify_trigger, add_table_triggers)  1
from boltons.cacheutils import LRU  2

logger = logging.getLogger('perf')

CREATE_TABLE = ('CREATE TABLE IF NOT EXISTS patron('  3
                'id serial PRIMARY KEY, name text, '
                'fav_dish text)')
INSERT = ('INSERT INTO patron(name, fav_dish) '
          'VALUES ($1, $2) RETURNING id')
SELECT = 'SELECT * FROM patron WHERE id = $1'
UPDATE = 'UPDATE patron SET name=$1, fav_dish=$2 WHERE id=$3'
DELETE = 'DELETE FROM patron WHERE id=$1'
EXISTS = "SELECT to_regclass('patron')"

CACHE = LRU(max_size=65536)  4

async def add_patron(conn, data: dict) -> int:  5
    return await conn.fetchval(
        INSERT, data['name'], data['fav_dish'])

async def update_patron(conn, id: int, data: dict) -> bool:
    result = await conn.execute(  6
        UPDATE, data['name'], data['fav_dish'], id)
    return result == 'UPDATE 1'

async def delete_patron(conn, id: int):  7
    result = await conn.execute(DELETE, id)
    return result == 'DELETE 1'

async def get_patron(conn, id: int) -> dict:  8
    if id not in CACHE:
        logger.info(f'id={id} Cache miss')
        record = await conn.fetchrow(SELECT, id)  9
        CACHE[id] = record and dict(record.items())
    return CACHE[id]

def db_event(conn, pid, channel, payload):  10
    event = loads(payload)  11
    logger.info('Got DB event:\n' + dumps(event, indent=4))
    id = event['id']
    if event['type'] == 'INSERT':
        CACHE[id] = event['data']
    elif event['type'] == 'UPDATE':
        CACHE[id] = event['data']['new']  12
    elif event['type'] == 'DELETE':
        CACHE[id] = None

async def create_table_if_missing(conn):  13
    if not await conn.fetchval(EXISTS):
        await conn.fetchval(CREATE_TABLE)
        await create_notify_trigger(
            conn, channel='chan_patron')
        await add_table_triggers(
            conn, table='patron')
1

Tienes que añadir disparadores a la base de datos para recibir notificaciones cuando cambien los datos. He creado estos prácticos ayudantes para crear la propia función desencadenante (con create_notify_trigger) y para añadir el desencadenante a una tabla concreta (con add_table_triggers). El SQL necesario para hacerlo está un poco fuera del alcance de este libro, pero sigue siendo crucial para entender cómo funciona este caso práctico. He incluido el código anotado de estos activadores enel Apéndice B.

2

El paquete de terceros boltons proporciona un montón de herramientas útiles, entre las que destaca la caché LRU, una opción más versátil que el decorador@lru_cache del módulo de la biblioteca estándar functools.3

3

Este bloque de texto contiene todo el SQL para las operaciones CRUD estándar. Ten en cuenta que estoy utilizando la sintaxis nativa de PostgreSQL para los parámetros: $1, $2, etc. No hay nada novedoso aquí, y no se discutirá más.

4

Crea la caché para esta instancia de aplicación.

5

He llamado a esta función desde el módulo Sanic dentro del punto final new_patron() para añadir nuevos clientes. Dentro de la función, utilizo el método fetchval() para insertar nuevos datos. ¿Por qué fetchval() y no execute()? Porque fetchval() devuelve la clave primaria del nuevo registro insertado.4

6

Actualiza un registro existente. Cuando esto tenga éxito, PostgreSQL devolverá UPDATE 1, así que lo utilizo como comprobación para verificar que la actualización ha tenido éxito.

7

La eliminación es muy similar a la actualización.

8

Esta es la operación de lectura. Es la única parte de nuestra interfaz CRUD que se preocupa por la caché. Piénsalo un segundo: no actualizamos la caché cuando hacemos una inserción, actualización o eliminación. Esto se debe a que confiamos en la notificación asíncrona de la base de datos (a través de los activadores instalados) para actualizar la caché si se modifica algún dato.

9

Por supuesto, seguimos queriendo utilizar la caché después de la primera GET.

10

La función db_event() es la llamada de retorno que hará asyncpg cuando se produzcan eventos en nuestro canal de notificación de la BD, chan_patron. Esta lista específica de parámetros es necesaria para asyncpg. conn es la conexión en la que se envió el evento, pid es el ID del proceso de la instancia de PostgreSQL que envió el evento, channel es el nombre del canal (que en este caso será chan_patron), y la carga útil son los datos que se envían por el canal.

11

Deserializa los datos JSON a un dict.

12

En general, la población de la caché es bastante sencilla, pero ten en cuenta que los eventos de actualización contienen datos nuevos y antiguos, por lo que tenemos que asegurarnos de almacenar en caché sólo los datos nuevos.

13

Ésta es una pequeña función utilitaria que he creado para volver a crear fácilmente una tabla si falta. Es muy útil si tienes que hacerlo con frecuencia, como cuando escribes los ejemplos de código de este libro.

Aquí es también donde se crean los disparadores de notificación de la base de datos y se añaden a nuestra tabla patron. Consulta el Ejemplo B-1para ver una lista anotada de estas funciones.

Esto nos lleva al final de este caso práctico. Hemos visto cómo Sanic hace que sea muy sencillo crear un servidor API, y hemos visto cómo utilizar asyncpg para realizar consultas a través de un grupo de conexiones, y cómo utilizar las funciones de notificación asíncrona de PostgreSQL para recibir llamadas de retorno a través de una conexión de base de datos dedicada y de larga duración.

Mucha gente de prefiere utilizar mapeadores objeto-relacionales para trabajar con bases de datos, y en este campo, SQLAlchemyes el líder. Cada vez hay más apoyo al uso de SQLAlchemy junto con asyncpgen bibliotecas de terceros como asyncpgsa y GINO. Otro ORM popular,Peewee, tiene soporte paraasyncio a través del paquete aiopeewee paquete.

Otras bibliotecas y recursos

Existen muchas otras bibliotecas para asyncio que no se tratan en este libro. Para saber más, puedes consultar elproyectoaio-libs , que gestiona cerca de 40 bibliotecas, y elproyecto Awesome asyncio , que marca muchos otros proyectos compatibles con el módulo asyncio.

Una biblioteca que merece una mención especial esaiofiles. Como recordarás de nuestras discusiones anteriores, dije que para lograr una alta concurrencia en Asyncio, es de vital importancia que el bucle nunca se bloquee. En este contexto, nos hemos centrado exclusivamente en las operaciones bloqueantes de E/S basadas en la red, pero resulta que el acceso al disco también es una operación bloqueante que afectará a tu rendimiento a niveles de concurrencia muy altos. La solución a esto es aiofiles, que proporciona una cómoda envoltura para realizar el acceso al disco en un hilo. Esto funciona porque Python libera el GIL durante las operaciones de archivo, de modo que tu hilo principal (que ejecuta el bucle asyncio ) no se ve afectado.

El dominio más importante para Asyncio va a ser la programación de redes. Por esta razón, no es mala idea aprender un poco sobre programación de sockets, e incluso después de todos estos años, el"Socket Programming HOWTO" de Gordon McMillan, incluido con la documentación estándar de Python, es una de las mejores introducciones que encontrarás.

Aprendí Asyncio de una gran variedad de fuentes, muchas de las cuales ya se han mencionado en secciones anteriores. Cada persona aprende de forma diferente, por lo que merece la pena explorar distintos tipos de materiales de aprendizaje. Aquí tienes algunos otros que me resultaron útiles:

  • Charla de Robert Smallshire "Get to Grips with Asyncio in Python 3", presentada en el NDC de Londres en enero de 2017. Es, con diferencia, el mejor vídeo de YouTube sobre Asyncio que he encontrado. Puede que la charla sea algo avanzada para un principiante, pero realmente ofrece una descripción clara de cómo está diseñado Asyncio.

  • Diapositivas "Construir aplicaciones con Asyncio" de Nikolay Novik, presentadas en la PyCon UA 2016. La información es densa, pero en estas diapositivas se recoge mucha experiencia práctica.

  • Interminables sesiones en el REPL de Python, probando cosas y "viendo qué pasa".

Te animo a que sigas aprendiendo, y si un concepto no se te queda claro, sigue buscando nuevas fuentes hasta que encuentres una explicación que te sirva.

1 En realidad, puedes hacerlo siempre que los sockets que se utilicen en hilos diferentes se creen, utilicen y destruyan totalmente en sus propios hilos. Es posible, pero difícil de hacer, y mucha gente tiene dificultades para conseguirlo. Por eso es tan fuerte la recomendación de utilizar un único hilo y un mecanismo de sondeo.

2 La receta de este plato, y las recetas de otros platos a base de Spam, se pueden encontrar en el sitio web de UKTV.

3 Obtener boltons con pip install boltons.

4 ¡Pero también necesitas la parte RETURNING id del SQL!

Get Utilizar Asyncio en Python now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.