Capítulo 4. 20 bibliotecas Asyncio que no estás utilizando (pero... Oh, no importa)
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
En este capítulo, veremos casos prácticos en los que se utilizan las nuevas funciones de Python para la programación asíncrona. Utilizaremos varias bibliotecas de terceros, como harás tú en tus propios proyectos.
El título de este capítulo es un juego de palabras con el título de un libro anterior que escribí, titulado20 bibliotecas de Python que no estás utilizando (pero deberías) (O'Reilly). Muchas de esas bibliotecas también serán útiles en tus aplicaciones basadas en asyncio
, pero este capítulo se centra en las bibliotecas diseñadas específicamente para las nuevas funciones asíncronas de Python.
Es difícil presentar código basado en asyncio
en breves fragmentos. Como has visto en todos los ejemplos de código anteriores del libro, he intentado que cada ejemplo sea un programa completo y ejecutable, porque la gestión de la vida útil de la aplicación es una consideración fundamental para utilizar correctamente la programación asíncrona.
Por esta razón, la mayoría de los casos prácticos de este capítulo serán algo mayores, en términos de líneas de código, de lo que es habitual en un libro de este tipo. Mi objetivo al utilizar este enfoque es hacer que los casos prácticos sean más útiles, ofreciéndote una "visión completa" de un programa asíncrono, en lugar de dejarte que descubras cómo pueden encajar fragmentos sueltos.
Nota
Algunos de los ejemplos de código de este capítulo renuncian al estilo para ahorrar espacio. Me gusta PEP8 tanto como al siguiente Pythonista, ¡pero la practicidad vence a la pureza!
Flujos (Biblioteca estándar)
Antes de examinar las bibliotecas de terceros, empecemos por la biblioteca estándar. La API de flujoses la interfaz de alto nivel que se ofrece para la programación de sockets asíncronos y, como mostrará el siguiente caso práctico, es bastante fácil de utilizar. Sin embargo, el diseño de la aplicación sigue siendo complejo simplemente por la naturaleza del dominio.
El siguiente caso práctico muestra la implementación de un corredor de mensajes, con un diseño inicial ingenuo seguido de un diseño más considerado. Ninguno de los dos debe considerarse listo para la producción; mi objetivo es ayudarte a reflexionar sobre los diversos aspectos de la programación concurrente en red que hay que tener en cuenta al diseñar este tipo de aplicaciones.
Caso práctico: Una cola de mensajes
Un servicio de cola de mensajes es una aplicación backend que recibe conexiones de otras aplicaciones y pasa mensajes entre esos servicios conectados, a menudo denominados editores y suscriptores. Los suscriptores suelen estar a la escucha de canales específicos para recibir mensajes, y normalmente es posible configurar la distribución de mensajes en diferentes canales de dos maneras: los mensajes pueden distribuirse a todos los suscriptores de un canal(pub-sub), o un mensaje diferente puede ir a cada suscriptor de uno en uno(punto a punto).
Recientemente, he trabajado en un proyecto que implicaba el uso deActiveMQ como broker de mensajes para la intercomunicación de microservicios. A un nivel básico, un broker de este tipo (servidor):
-
Mantiene conexiones de socket persistentes con varios clientes
-
Recibe mensajes de clientes con un nombre de canal de destino
-
Entrega esos mensajes a todos los demás clientes suscritos a ese mismo nombre de canal
Recuerdo que me preguntaba lo difícil que sería crear una aplicación así. Como toque añadido, ActiveMQ puede realizar ambos modelos de distribución de mensajes, y los dos modelos se diferencian generalmente por el nombre del canal:
-
Los nombres de canal con el prefijo
/topic
(por ejemplo,/topic/customer/registration
) se gestionan con el patrón pub-sub, en el que todos los suscriptores del canal reciben todos los mensajes. -
Los nombres de canal con el prefijo
/queue
se gestionan con el modelopunto a punto, en el que los mensajes de un canal se distribuyen entre los suscriptores del canal de forma rotatoria: cada suscriptor recibe un mensaje único.
En nuestro caso práctico, construiremos un intermediario de mensajes de juguete con estas características básicas. La primera cuestión que debemos abordar es que TCP no es un protocolo basado en mensajes: sólo recibimos flujos de bytes en el cable. Tenemos que crear nuestro propio protocolo para la estructura de los mensajes, y el protocolo más sencillo es anteponer a cada mensaje una cabecera de tamaño, seguida de una carga útil de ese tamaño. La biblioteca de utilidades del Ejemplo 4-1 proporciona capacidades de lectura y escritura para este tipo de mensajes.
Ejemplo 4-1. Protocolo de mensajes: lectura y escritura
# msgproto.py
from
asyncio
import
StreamReader
,
StreamWriter
async
def
read_msg
(
stream
:
StreamReader
)
-
>
bytes
:
size_bytes
=
await
stream
.
readexactly
(
4
)
size
=
int
.
from_bytes
(
size_bytes
,
byteorder
=
'
big
'
)
data
=
await
stream
.
readexactly
(
size
)
return
data
async
def
send_msg
(
stream
:
StreamWriter
,
data
:
bytes
)
:
size_bytes
=
len
(
data
)
.
to_bytes
(
4
,
byteorder
=
'
big
'
)
stream
.
writelines
(
[
size_bytes
,
data
]
)
await
stream
.
drain
(
)
Obtén los 4 primeros bytes. Es el prefijo del tamaño.
Esos 4 bytes deben convertirse en un número entero.
Ahora sabemos el tamaño de la carga útil, así que lo leemos del flujo.
La escritura es la inversa de la lectura: primero enviamos la longitud de los datos, codificada como 4 bytes, y después los datos.
Ahora que ya tenemos un protocolo de mensajes rudimentario, podemos centrarnos en la aplicación del corredor de mensajes del Ejemplo 4-2.
Ejemplo 4-2. Un servidor prototipo de 40 líneas
# mq_server.py
import
asyncio
from
asyncio
import
StreamReader
,
StreamWriter
,
gather
from
collections
import
deque
,
defaultdict
from
typing
import
Deque
,
DefaultDict
from
msgproto
import
read_msg
,
send_msg
SUBSCRIBERS
:
DefaultDict
[
bytes
,
Deque
]
=
defaultdict
(
deque
)
async
def
client
(
reader
:
StreamReader
,
writer
:
StreamWriter
)
:
peername
=
writer
.
get_extra_info
(
'
peername
'
)
subscribe_chan
=
await
read_msg
(
reader
)
SUBSCRIBERS
[
subscribe_chan
]
.
append
(
writer
)
(
f
'
Remote
{peername}
subscribed to
{subscribe_chan}
'
)
try
:
while
channel_name
:
=
await
read_msg
(
reader
)
:
data
=
await
read_msg
(
reader
)
(
f
'
Sending to
{channel_name}
:
{data[:19]}
...
'
)
conns
=
SUBSCRIBERS
[
channel_name
]
if
conns
and
channel_name
.
startswith
(
b
'
/queue
'
)
:
conns
.
rotate
(
)
conns
=
[
conns
[
0
]
]
await
gather
(
*
[
send_msg
(
c
,
data
)
for
c
in
conns
]
)
except
asyncio
.
CancelledError
:
(
f
'
Remote
{peername}
closing connection.
'
)
writer
.
close
(
)
await
writer
.
wait_closed
(
)
except
asyncio
.
IncompleteReadError
:
(
f
'
Remote
{peername}
disconnected
'
)
finally
:
(
f
'
Remote
{peername}
closed
'
)
SUBSCRIBERS
[
subscribe_chan
]
.
remove
(
writer
)
async
def
main
(
*
args
,
*
*
kwargs
)
:
server
=
await
asyncio
.
start_server
(
*
args
,
*
*
kwargs
)
async
with
server
:
await
server
.
serve_forever
(
)
try
:
asyncio
.
run
(
main
(
client
,
host
=
'
127.0.0.1
'
,
port
=
25000
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
Importa de nuestro módulo msgproto.py.
Una colección global de los suscriptores actualmente activos. Cada vez que un cliente se conecta, debe enviar primero el nombre del canal al que se suscribe. Un deque contendrá todos los suscriptores de un canal concreto.
La función
client()
coroutine producirá una coroutine de larga duración para cada nueva conexión. Piensa en ella como una llamada de retorno para el servidor TCP iniciado enmain()
. En esta línea, he mostrado cómo se pueden obtener el host y el puerto del par remoto, por ejemplo, para el registro.Nuestro protocolo para los clientes es el siguiente
-
En la primera conexión, un cliente debe enviar un mensaje que contenga el canal al que desea suscribirse (aquí,
subscribe_chan
). -
A partir de entonces, durante toda la vida de la conexión, un cliente envía un mensaje a un canal enviando primero un mensaje que contenga el nombre del canal de destino, seguido de un mensaje que contenga los datos. Nuestro intermediario enviará esos mensajes de datos a todos los clientes suscritos a ese nombre de canal.
-
Añade la instancia
StreamWriter
a la colección global de suscriptores.Un bucle infinito, esperando datos de este cliente. El primer mensaje de un cliente debe ser el nombre del canal de destino.
A continuación vienen los datos reales que hay que distribuir al canal.
Obtén el deque de suscriptores del canal de destino.
Un manejo especial si el nombre del canal empieza por la palabra mágica
/queue
: en este caso, enviamos los datos sólo a uno de los suscriptores, no a todos. Esto puede utilizarse para compartir trabajo entre un grupo de trabajadores, en lugar del esquema habitual de notificación pub-sub, en el que todos los suscriptores de un canal reciben todos los mensajes.Ésta es la razón por la que utilizamos un deque y no una lista: la rotación del deque es la forma en que hacemos un seguimiento de qué cliente es el siguiente en la cola para la distribución de
/queue
. Esto parece costoso hasta que te das cuenta de que una sola rotación del deque es una operación O(1).Apunta sólo al cliente que esté primero; esto cambia después de cada rotación.
Crea una lista de coroutines para enviar el mensaje a cada escritor, y luego desempaquétalas en
gather()
para que podamos esperar a que se complete todo el envío.Esta línea es un fallo de nuestro programa, pero puede que no sea obvio por qué: aunque sea cierto que todos los envíos a cada abonado se producirán de forma concurrente, ¿qué ocurre si tenemos un cliente muy lento? En este caso, el
gather()
sólo terminará cuando el abonado más lento haya recibido sus datos. No podremos recibir más datos del cliente emisor hasta que terminen todas estas coroutines desend_msg()
. Esto ralentiza toda la distribución de mensajes a la velocidad del abonado más lento.Al salir de la coroutina
client()
, nos aseguramos de eliminarnos de la colección globalSUBSCRIBERS
. Desgraciadamente, se trata de una operación O(n), que puede resultar un poco cara para n muy grandes. Una estructura de datos diferente lo solucionaría, pero por ahora nos consolamos sabiendo que las conexiones están pensadas para durar mucho -por tanto, debería haber pocos eventos de desconexión- y es poco probable que n sea muy grande (digamos ~10.000 como estimación aproximada de orden de magnitud), y este código es, al menos, fácil de entender.
Así que ése es nuestro servidor; ahora necesitamos clientes, y entonces podremos mostrar alguna salida. A efectos de demostración, crearé dos tipos de clientes: unemisor y un oyente. El servidor no hace diferencias; todos los clientes son iguales. La distinción entre el comportamiento del emisor y del oyente sólo tiene fines educativos. El Ejemplo 4-3 muestra el código de la aplicación oyente.
Ejemplo 4-3. Listener: un conjunto de herramientas para escuchar mensajes en nuestro corredor de mensajes
# mq_client_listen.py
import
asyncio
import
argparse
,
uuid
from
msgproto
import
read_msg
,
send_msg
async
def
main
(
args
)
:
me
=
uuid
.
uuid4
(
)
.
hex
[
:
8
]
(
f
'
Starting up
{me}
'
)
reader
,
writer
=
await
asyncio
.
open_connection
(
args
.
host
,
args
.
port
)
(
f
'
I am
{
writer.get_extra_info(
"
sockname
"
)}
'
)
channel
=
args
.
listen
.
encode
(
)
await
send_msg
(
writer
,
channel
)
try
:
while
data
:
=
await
read_msg
(
reader
)
:
(
f
'
Received by
{me}
:
{data[:20]}
'
)
(
'
Connection ended.
'
)
except
asyncio
.
IncompleteReadError
:
(
'
Server closed.
'
)
finally
:
writer
.
close
(
)
await
writer
.
wait_closed
(
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--host
'
,
default
=
'
localhost
'
)
parser
.
add_argument
(
'
--port
'
,
default
=
25000
)
parser
.
add_argument
(
'
--listen
'
,
default
=
'
/topic/foo
'
)
try
:
asyncio
.
run
(
main
(
parser
.
parse_args
(
)
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
El módulo de la biblioteca estándar
uuid
es una forma cómoda de crear una "identidad" para este oyente. Si pones en marcha varias instancias, cada una tendrá su propia identidad, y podrás hacer un seguimiento de lo que ocurre en los registros.Abre una conexión con el servidor.
El canal al que suscribirse es un parámetro de entrada, capturado en
args.listen
. Codifícalo en bytes antes de enviarlo.Según las reglas de nuestro protocolo (como ya se ha comentado en el análisis del código del intermediario), lo primero que hay que hacer tras conectarse es enviar el nombre del canal al que suscribirse.
Este bucle no hace otra cosa que esperar a que aparezcan datos en la toma.
Los argumentos de la línea de comandos de este programa facilitan indicar un host, un puerto y un nombre de canal para escuchar.
El código del otro cliente, el programa emisor que se muestra en el Ejemplo 4-4, tiene una estructura similar a la del módulo oyente.
Ejemplo 4-4. Remitente: un conjunto de herramientas para enviar datos a nuestro corredor de mensajes
# mq_client_sender.py
import
asyncio
import
argparse
,
uuid
from
itertools
import
count
from
msgproto
import
send_msg
async
def
main
(
args
)
:
me
=
uuid
.
uuid4
(
)
.
hex
[
:
8
]
(
f
'
Starting up
{me}
'
)
reader
,
writer
=
await
asyncio
.
open_connection
(
host
=
args
.
host
,
port
=
args
.
port
)
(
f
'
I am
{
writer.get_extra_info(
"
sockname
"
)}
'
)
channel
=
b
'
/null
'
await
send_msg
(
writer
,
channel
)
chan
=
args
.
channel
.
encode
(
)
try
:
for
i
in
count
(
)
:
await
asyncio
.
sleep
(
args
.
interval
)
data
=
b
'
X
'
*
args
.
size
or
f
'
Msg
{i}
from
{me}
'
.
encode
(
)
try
:
await
send_msg
(
writer
,
chan
)
await
send_msg
(
writer
,
data
)
except
OSError
:
(
'
Connection ended.
'
)
break
except
asyncio
.
CancelledError
:
writer
.
close
(
)
await
writer
.
wait_closed
(
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--host
'
,
default
=
'
localhost
'
)
parser
.
add_argument
(
'
--port
'
,
default
=
25000
,
type
=
int
)
parser
.
add_argument
(
'
--channel
'
,
default
=
'
/topic/foo
'
)
parser
.
add_argument
(
'
--interval
'
,
default
=
1
,
type
=
float
)
parser
.
add_argument
(
'
--size
'
,
default
=
0
,
type
=
int
)
try
:
asyncio
.
run
(
main
(
parser
.
parse_args
(
)
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
Al igual que con el oyente, reclama una identidad.
Acércate y establece una conexión.
Según las reglas de nuestro protocolo, lo primero que hay que hacer tras conectarse al servidor es dar el nombre del canal al que suscribirse; sin embargo, como somos un emisor, en realidad no nos interesa suscribirnos a ningún canal. No obstante, el protocolo lo exige, así que basta con proporcionar un canal nulo al que suscribirse (en realidad no escucharemos nada).
Envía el canal al que suscribirte.
El parámetro de la línea de comandos
args.channel
proporciona el canal al quequeremos enviar los mensajes. Hay que convertirlo primero a bytes antes de enviarlo.Utilizar
itertools.count()
es como un buclewhile True
, salvo que obtenemos una variable de iteración para utilizarla. La utilizamos en los mensajes de depuración, ya que facilita un poco el seguimiento de qué mensaje se envió desde dónde.El retardo entre mensajes enviados es un parámetro de entrada,
args.interval
. La siguiente línea genera la carga útil del mensaje. Puede ser una cadena de bytes del tamaño especificado (args.size
) o un mensaje descriptivo. Esta flexibilidad es sólo para pruebas.Ten en cuenta que aquí se envían dos mensajes: el primero es el nombre del canal de destino y el segundo es la carga útil.
Al igual que con el oyente, hay un montón de opciones de línea de comandos para ajustar el remitente:
channel
determina el canal de destino al que enviar, mientras queinterval
controla el retardo entre envíos. El parámetrosize
controla el tamaño de la carga útil de cada mensaje.
Ahora tenemos un intermediario, un oyente y un remitente; es hora de ver algún resultado. Para producir los siguientes fragmentos de código, puse en marcha el servidor, luego dos oyentes y después un remitente. Después de enviar unos cuantos mensajes, detuve el servidor con Ctrl-C. La salida del servidor se muestra en el Ejemplo 4-5, la del emisor en el Ejemplo 4-6 y la del oyente en los Ejemplos 4-7 y 4-8.
Ejemplo 4-5. Salida del agente de mensajes (servidor)
$ mq_server.py Remote ('127.0.0.1', 55382) subscribed to b'/queue/blah' Remote ('127.0.0.1', 55386) subscribed to b'/queue/blah' Remote ('127.0.0.1', 55390) subscribed to b'/null' Sending to b'/queue/blah': b'Msg 0 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 1 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 2 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 3 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 4 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 5 from 6b5a8e1d'... ^CBye! Remote ('127.0.0.1', 55382) closing connection. Remote ('127.0.0.1', 55382) closed Remote ('127.0.0.1', 55390) closing connection. Remote ('127.0.0.1', 55390) closed Remote ('127.0.0.1', 55386) closing connection. Remote ('127.0.0.1', 55386) closed
Ejemplo 4-6. Salida del remitente (cliente)
$ mq_client_sender.py --channel /queue/blah Starting up 6b5a8e1d I am ('127.0.0.1', 55390) Connection ended.
Ejemplo 4-7. Salida del oyente 1 (cliente)
$ mq_client_listen.py --listen /queue/blah Starting up 9ae04690 I am ('127.0.0.1', 55382) Received by 9ae04690: b'Msg 1 from 6b5a8e1d' Received by 9ae04690: b'Msg 3 from 6b5a8e1d' Received by 9ae04690: b'Msg 5 from 6b5a8e1d' Server closed.
Ejemplo 4-8. Salida del oyente 2 (cliente)
$ mq_client_listen.py --listen /queue/blah Starting up bd4e3baa I am ('127.0.0.1', 55386) Received by bd4e3baa: b'Msg 0 from 6b5a8e1d' Received by bd4e3baa: b'Msg 2 from 6b5a8e1d' Received by bd4e3baa: b'Msg 4 from 6b5a8e1d' Server closed.
Nuestro corredor de mensajes de juguete funciona. El código también es bastante fácil de entender, dado un dominio de problemas tan complejo, pero por desgracia, el diseño del propio código del corredor es problemático.
El problema es que, para un cliente concreto, enviamos mensajes a los suscriptores en la misma coroutina en la que se reciben nuevos mensajes. Esto significa que si algún suscriptor tarda en consumir lo que estamos enviando, puede que esa línea await gather(...)
del Ejemplo 4-2 tarde mucho tiempo en completarse, y no podremos recibir y procesar más mensajes mientras esperamos.
En su lugar, necesitamos desacoplar la recepción de mensajes del envío de mensajes. En el siguiente caso práctico, refactorizamos nuestro código para hacer exactamente eso.
Caso práctico: Mejorar la cola de mensajes
En este caso práctico, mejoramos el diseño de nuestro corredor de mensajes de juguete. Los programas oyente y emisor permanecen tal cual. La mejora específica en el nuevo diseño del agente es desacoplar el envío y la recepción de mensajes; esto resolverá el problema por el que un suscriptor lento también ralentizaría la recepción de nuevos mensajes, como se ha comentado en la sección anterior. El nuevo código, que se muestra en el Ejemplo 4-9, es un poco más largo, pero no demasiado.
Ejemplo 4-9. Agente de mensajes: diseño mejorado
# mq_server_plus.py
import
asyncio
from
asyncio
import
StreamReader
,
StreamWriter
,
Queue
from
collections
import
deque
,
defaultdict
from
contextlib
import
suppress
from
typing
import
Deque
,
DefaultDict
,
Dict
from
msgproto
import
read_msg
,
send_msg
SUBSCRIBERS
:
DefaultDict
[
bytes
,
Deque
]
=
defaultdict
(
deque
)
SEND_QUEUES
:
DefaultDict
[
StreamWriter
,
Queue
]
=
defaultdict
(
Queue
)
CHAN_QUEUES
:
Dict
[
bytes
,
Queue
]
=
{
}
async
def
client
(
reader
:
StreamReader
,
writer
:
StreamWriter
)
:
peername
=
writer
.
get_extra_info
(
'
peername
'
)
subscribe_chan
=
await
read_msg
(
reader
)
SUBSCRIBERS
[
subscribe_chan
]
.
append
(
writer
)
send_task
=
asyncio
.
create_task
(
send_client
(
writer
,
SEND_QUEUES
[
writer
]
)
)
(
f
'
Remote
{peername}
subscribed to
{subscribe_chan}
'
)
try
:
while
channel_name
:
=
await
read_msg
(
reader
)
:
data
=
await
read_msg
(
reader
)
if
channel_name
not
in
CHAN_QUEUES
:
CHAN_QUEUES
[
channel_name
]
=
Queue
(
maxsize
=
10
)
asyncio
.
create_task
(
chan_sender
(
channel_name
)
)
await
CHAN_QUEUES
[
channel_name
]
.
put
(
data
)
except
asyncio
.
CancelledError
:
(
f
'
Remote
{peername}
connection cancelled.
'
)
except
asyncio
.
IncompleteReadError
:
(
f
'
Remote
{peername}
disconnected
'
)
finally
:
(
f
'
Remote
{peername}
closed
'
)
await
SEND_QUEUES
[
writer
]
.
put
(
None
)
await
send_task
del
SEND_QUEUES
[
writer
]
SUBSCRIBERS
[
subscribe_chan
]
.
remove
(
writer
)
async
def
send_client
(
writer
:
StreamWriter
,
queue
:
Queue
)
:
while
True
:
try
:
data
=
await
queue
.
get
(
)
except
asyncio
.
CancelledError
:
continue
if
not
data
:
break
try
:
await
send_msg
(
writer
,
data
)
except
asyncio
.
CancelledError
:
await
send_msg
(
writer
,
data
)
writer
.
close
(
)
await
writer
.
wait_closed
(
)
async
def
chan_sender
(
name
:
bytes
)
:
with
suppress
(
asyncio
.
CancelledError
)
:
while
True
:
writers
=
SUBSCRIBERS
[
name
]
if
not
writers
:
await
asyncio
.
sleep
(
1
)
continue
if
name
.
startswith
(
b
'
/queue
'
)
:
writers
.
rotate
(
)
writers
=
[
writers
[
0
]
]
if
not
(
msg
:
=
await
CHAN_QUEUES
[
name
]
.
get
(
)
)
:
break
for
writer
in
writers
:
if
not
SEND_QUEUES
[
writer
]
.
full
(
)
:
(
f
'
Sending to
{name}
:
{msg[:19]}
...
'
)
await
SEND_QUEUES
[
writer
]
.
put
(
msg
)
async
def
main
(
*
args
,
*
*
kwargs
)
:
server
=
await
asyncio
.
start_server
(
*
args
,
*
*
kwargs
)
async
with
server
:
await
server
.
serve_forever
(
)
try
:
asyncio
.
run
(
main
(
client
,
host
=
'
127.0.0.1
'
,
port
=
25000
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
En la implementación anterior, sólo existía
SUBSCRIBERS
; ahora existenSEND_QUEUES
yCHAN_QUEUES
como colecciones globales. Esto es consecuencia de desacoplar completamente la recepcióny el envío de datos.SEND_QUEUES
tiene una entrada de cola por cada conexión de cliente: todos los datos que deban enviarse a ese cliente deben colocarse en esa cola. (Si te adelantas, la coroutinasend_client()
sacará datos deSEND_QUEUES
y los enviará).Hasta este punto de la función coroutine
client()
, el código es el mismo que en el servidor simple: se recibe el nombre del canal suscrito, y añadimos la instancia deStreamWriter
instancia del nuevo cliente a la colección globalSUBSCRIBERS
.Esto es nuevo: creamos una tarea de larga duración que realizará todo el envío de datos a este cliente. La tarea se ejecutará independientemente como una coroutine separada y sacará mensajes de la cola suministrada,
SEND_QUEUES[writer]
, para enviarlos.Ahora estamos dentro del bucle donde recibimos los datos. Recuerda que siempre recibimos dos mensajes: uno para el nombre del canal de destino y otro para los datos. Vamos a crear un
Queue
nuevo y dedicado para cada canal de destino, y para eso estáCHAN_QUEUES
: cuando cualquier cliente quiera enviar datos a un canal, vamos a poner esos datos en la cola apropiada y luego volveremos inmediatamente a escuchar más datos. Este enfoque desacopla la distribución de mensajes de la recepción de mensajes de este cliente.Si todavía no existe una cola para el canal de destino, crea una.
Crea una tarea dedicada y de larga duración para ese canal. La coroutina
chan_sender()
se encargará de sacar datos de la cola del canal y distribuirlos a los suscriptores.Coloca los nuevos datos recibidos en la cola del canal específico. Si la cola se llena, esperaremos aquí hasta que haya espacio para los nuevos datos. Esperar aquí significa que no leeremos ningún dato nuevo del socket, lo que significa que el cliente tendrá que esperar para enviar nuevos datos al socket por su parte. Esto no es necesariamente algo malo, ya que comunica la llamada contrapresión a este cliente. (Alternativamente, podrías optar por soltar mensajes aquí si el caso de uso está de acuerdo con ello).
Cuando se cierra la conexión, es hora de limpiar. La tarea de larga duración que creamos para enviar datos a este cliente,
send_task
, puede cerrarse colocandoNone
en su cola,SEND_QUEUES[writer]
(comprueba el código desend_client()
). Es importante utilizar un valor en la cola, en lugar de la cancelación directa, porque puede que ya haya datos en esa cola y queremos que esos datos se envíen antes de quesend_client()
finalice.Espera a que termine la tarea del remitente...
...y eliminamos la entrada de la colección
SEND_QUEUES
(y en la línea siguiente, eliminamos también la entradasock
de la colecciónSUBSCRIBERS
, como antes).La función de la corrutina
send_client()
es casi un ejemplo de libro de texto de cómo sacar trabajo de una cola. Observa cómo la corrutina saldrá sólo siNone
se coloca en la cola. Observa también cómo suprimimosCancelledError
dentrodel bucle: esto se debe a que queremos que esta tarea se cierre sólo al recibir unNone
en la cola. De esta forma, todos los datos pendientes en la cola pueden enviarse antes de cerrarse.chan_sender()
es la lógica de distribución de un canal: envía datos desde una instancia dedicada del canalQueue
a todos los suscriptores de ese canal. Pero, ¿qué ocurre si todavía no hay suscriptores para este canal? Esperaremos un poco y volveremos a intentarlo. (Aunque ten en cuenta que la cola de este canal,CHAN_QUEUES[name]
, seguirá llenándose).Como en nuestra implementación anterior del corredor, hacemos algo especial para los canales cuyo nombre empieza por
/queue
: rotamos el deque y enviamos sólo a la primera entrada. Esto actúa como un burdo sistema de equilibrio de carga, ya que cada suscriptor recibe mensajes diferentes de la misma cola. Para todos los demás canales, todos los suscriptores reciben todos los mensajes.Aquí esperaremos datos en la cola, y saldremos si se recibe
None
. Actualmente, esto no se activa en ningún sitio (por lo que estas coroutines dechan_sender()
viven para siempre), pero si se añadiera lógica para limpiar estas tareas del canal después de, digamos, algún periodo de inactividad, así es como se haría.Se han recibido los datos, así que es hora de enviarlos a los suscriptores. Aquí no hacemos el envío: en su lugar, colocamos los datos en la cola de envío propia de cada suscriptor. Este desacoplamiento es necesario para garantizar que un suscriptor lento no ralentice a nadie más que reciba datos. Y además, si el abonado es tan lento que su cola de envío se llena, no ponemos esos datos en su cola; es decir, se pierden.
El diseño anterior produce el mismo resultado que la implementación simplista anterior, pero ahora podemos estar seguros de que un oyente lento no interferirá en la distribución de mensajes a otros oyentes.
Estos dos casos prácticos muestran una progresión en el pensamiento en torno al diseño de un sistema de distribución de mensajes. Un aspecto clave fue darse cuenta de que el envío y la recepción de datos podrían gestionarse mejor en coroutines separadas, dependiendo del caso de uso. En tales casos, las colas pueden ser muy útiles para mover datos entre esas distintas coroutines y para proporcionar almacenamiento intermedio para desacoplarlas.
El objetivo más importante de estos casos prácticos era mostrar cómo la API de flujos de asyncio
hace que sea muy fácil crear aplicaciones basadas en sockets.
Retorcido
El proyecto Twisted es anterior -dramáticamente- a la biblioteca estándarasyncio
, y lleva enarbolando la bandera de la programación asíncrona en Python desde hace unos 14 años. El proyecto proporciona no sólo los bloques de construcción básicos, como un bucle de eventos, sino también primitivas como los deferreds, que son un poco como los futuros de asyncio
. El diseño de asyncio
se ha visto muy influido por Twisted y por la amplia experiencia de sus líderes y mantenedores.
Ten en cuenta que asyncio
no sustituye a Twisted. Twisted incluye implementaciones de alta calidad de un enorme número de protocolos de Internet, incluyendo no sólo el habitual HTTP, sino también XMPP, NNTP, IMAP, SSH, IRC y FTP (tanto servidores como clientes). Y la lista continúa: ¿DNS? Comprobado ¿SMTP? Comprobado. ¿POP3? Comprobado. La disponibilidad de estas excelentes implementaciones del protocolo de Internet sigue haciendo que Twisted sea convincente.
A nivel de código, la principal diferencia entre Twisted yasyncio
, aparte de la historia y el contexto histórico, es que durante mucho tiempo Python careció de soporte de lenguaje para coroutines, y esto significó que Twisted y proyectos similares tuvieron que idear formas de tratar la asincronía que funcionaran con la sintaxis estándar de Python.
Durante la mayor parte de la historia de Twisted, las devoluciones de llamada eran el medio por el que se realizaba la programación asíncrona, con toda la complejidad no lineal que ello conlleva; sin embargo, cuando se hizo posible utilizar generadores como coroutines improvisadas, de repente se hizo posible maquetar código en Twisted de forma lineal utilizando su decorador @defer.inlineCallbacks
, como se muestra en el Ejemplo 4-10.
Ejemplo 4-10. Aún más Twisted con devoluciones de llamada inlined
@defer
.
inlineCallbacks
def
f
(
)
:
yield
defer
.
returnValue
(
123
)
@defer
.
inlineCallbacks
def
my_coro_func
(
)
:
value
=
yield
f
(
)
assert
value
==
123
Normalmente, Twisted requiere crear instancias de
Deferred
y añadir devoluciones de llamada a esas instancias como método para construir programas asíncronos. Hace unos años, se añadió el decorador@inlineCallbacks
, que reutiliza los generadores como coroutines.Aunque
@inlineCallbacks
te permitía escribir código lineal en apariencia (a diferencia de las devoluciones de llamada), se necesitaban algunos hacks, como esta llamada adefer.returnValue()
, que es cómo tienes que devolver valores de@inlineCallbacks
coroutines.Aquí podemos ver el
yield
que hace de esta función un generador. Para que@inlineCallbacks
funcione, debe haber al menos unyield
presente en la función decorada.
Desde que aparecieron las coroutines nativas en Python 3.5, el equipo de Twisted (y Amber Brown en particular) han estado trabajando para añadir soporte para ejecutar Twisted en el bucle de eventos asyncio
.
Se trata de un esfuerzo en curso, y mi objetivo en esta sección no es convencerte de que crees todas tus aplicaciones como híbridos Twisted-asyncio
, sino más bien hacerte consciente de que actualmente se está trabajando para proporcionar una interoperabilidad significativa entre ambos.
Para los que tengáis experiencia con Twisted, el Ejemplo 4-11puede resultar chocante.
Ejemplo 4-11. Soporte para asyncio en Twisted
# twisted_asyncio.py
from
time
import
ctime
from
twisted
.
internet
import
asyncioreactor
asyncioreactor
.
install
(
)
from
twisted
.
internet
import
reactor
,
defer
,
task
async
def
main
(
)
:
for
i
in
range
(
5
)
:
(
f
'
{
ctime()} Hello
{i}
'
)
await
task
.
deferLater
(
reactor
,
1
,
lambda
:
None
)
defer
.
ensureDeferred
(
main
(
)
)
reactor
.
run
(
)
Así es como le dices a Twisted que utilice el bucle de eventos
asyncio
como sureactor
principal. Ten en cuenta que esta línea debe ir antes de que se importe elreactor
detwisted.internet
en la línea siguiente.Cualquiera que esté familiarizado con la programación Twisted reconocerá estas importaciones. No tenemos espacio para cubrirlas en profundidad aquí, pero en pocas palabras,
reactor
es la versiónTwisted
del bucleasyncio
, ydefer
ytask
son espacios de nombres para herramientas que trabajan con coroutines de programación.Ver
async def
aquí, en un programa Twisted, parece extraño, pero este es en realidad lo que nos ofrece el nuevo soporte paraasync/await
: la posibilidad de utilizar coroutines nativas directamente en programas Twisted.En el mundo antiguo de
@inlineCallbacks
, habrías utilizadoyield from
aquí, pero ahora podemos utilizarawait
, igual que en el códigoasyncio
. La otra parte de esta línea,deferLater()
, es una forma alternativa de hacer lo mismo queasyncio.sleep(1)
.await
un futuro en el que, al cabo de un segundo, se disparará una llamada de retorno de no hacer nada.ensureDeferred()
es una versión Twisted de programar una coroutina. Esto sería análogo aloop.create_task()
oasyncio.ensure_future()
.Ejecutar el
reactor
es lo mismo queloop.run_forever()
enasyncio
.
La ejecución de este script produce el siguiente resultado:
$ twisted_asyncio.py Mon Oct 16 16:19:49 2019 Hello 0 Mon Oct 16 16:19:50 2019 Hello 1 Mon Oct 16 16:19:51 2019 Hello 2 Mon Oct 16 16:19:52 2019 Hello 3 Mon Oct 16 16:19:53 2019 Hello 4
Hay mucho más que aprender sobre Twisted. En particular, merece la pena que repases la lista de protocolos de red que implementa. Aún queda trabajo por hacer, pero el futuro parece muy prometedor para la interoperabilidad entre Twisted y asyncio
.
asyncio
se ha diseñado de tal manera que podemos esperar un futuro en el que será posible incorporar código de muchos marcos asíncronos, como Twisted y Tornado, en una sola aplicación, con todo el código ejecutándose en el mismo bucle de eventos.
La cola de Janus
La cola Janus (instalada con pip install janus
) proporciona una solución para la comunicación entre hilos y coroutines. En la biblioteca estándar de Python, existen dos tipos de colas:
queue.Queue
-
Una cola de bloqueo, comúnmente utilizada para la comunicación y almacenamiento en búfer entre hilos
asyncio.Queue
-
Una cola compatible con
async
, utilizada habitualmente para la comunicación y almacenamiento en búfer entre coroutines
Por desgracia, ¡ninguna de las dos es útil para la comunicación entre hilos y coroutines! Aquí es donde entra Janus: es una cola única que expone ambas APIs, una de bloqueo y otra asíncrona.El Ejemplo 4-12 genera datos desde dentro de un hilo, coloca esos datos en una cola y luego los consume desde una coroutina.
Ejemplo 4-12. Conectar coroutines e hilos con una cola Janus
# janus_demo.py
import
asyncio
import
random
import
time
import
janus
async
def
main
(
)
:
loop
=
asyncio
.
get_running_loop
(
)
queue
=
janus
.
Queue
(
loop
=
loop
)
future
=
loop
.
run_in_executor
(
None
,
data_source
,
queue
)
while
(
data
:
=
await
queue
.
async_q
.
get
(
)
)
is
not
None
:
(
f
'
Got
{data}
off queue
'
)
(
'
Done.
'
)
def
data_source
(
queue
)
:
for
i
in
range
(
10
)
:
r
=
random
.
randint
(
0
,
4
)
time
.
sleep
(
r
)
queue
.
sync_q
.
put
(
r
)
queue
.
sync_q
.
put
(
None
)
asyncio
.
run
(
main
(
)
)
Crea una cola Janus. Ten en cuenta que, al igual que un
asyncio.Queue
, la cola Janus estará asociada a un bucle de eventos específico. Como de costumbre, si no proporcionas el parámetroloop
, se utilizará internamente la llamada estándarget_event_loop()
.Nuestra función coroutine
main()
simplemente espera datos en una cola. Esta línea se suspenderá hasta que haya datos, exactamente hasta que haya datos, exactamente igual que llamar aget()
en una instancia deasyncio.Queue
. El objeto cola tiene dos caras: ésta se llamaasync_q
y proporciona la API de cola compatible con asíncronos.Imprime un mensaje.
Dentro de la función
data_source()
, se genera unint
aleatorio, que se utiliza como duración del sueño y como valor de los datos. Ten en cuenta que la llamada atime.sleep()
es bloqueante, por lo que esta función debe ejecutarse en un hilo.Coloca los datos en la cola Janus. Esto muestra la otracara de la cola Janus:
sync_q
que proporciona la API estándar y bloqueanteQueue
.
Este es el resultado:
$ <name> Got 2 off queue Got 4 off queue Got 4 off queue Got 2 off queue Got 3 off queue Got 4 off queue Got 1 off queue Got 1 off queue Got 0 off queue Got 4 off queue Done.
Si puedes, es mejor aspirar a tener trabajos de ejecución cortos, y en estos casos, no será necesaria una cola (para la comunicación). Sin embargo, esto no siempre es posible, y en tales situaciones, la cola Janus puede ser la solución más conveniente para almacenar y distribuir datos entre hilos y coroutines.
aiohttp
aiohttp
lleva todo lo relacionado con HTTP a asyncio
, incluida la compatibilidad con clientes y servidores HTTP, así como con WebSocket. Saltemos directamente a los ejemplos de código, empezando por la simplicidad misma: "Hola Mundo".
Caso práctico: Hola Mundo
El Ejemplo 4-13 muestra un servidor web mínimo utilizando aiohttp
.
Ejemplo 4-13. Ejemplo mínimo de aiohttp
from
aiohttp
import
web
async
def
hello
(
request
)
:
return
web
.
Response
(
text
=
"
Hello, world
"
)
app
=
web
.
Application
(
)
app
.
router
.
add_get
(
'
/
'
,
hello
)
web
.
run_app
(
app
,
port
=
8080
)
Observa que en este código no se mencionan bucles, tareas ni futuros: los desarrolladores del marco aiohttp
nos han ocultado todo eso, dejando una API muy limpia. Esto va a ser habitual en la mayoría de los marcos que se construyan sobre asyncio
, que ha sido diseñado para permitir a los diseñadores de marcos elegir sólo las partes que necesitan, y encapsularlas en su API preferida.
Caso práctico: Raspando las noticias
aiohttp
puede utilizarse como servidor y como biblioteca cliente de, como la muy popular (¡pero bloqueante!)requests
. Quería mostrar aiohttp
utilizando un ejemplo que incorpora ambas características.
En este caso práctico, implementaremos un sitio web que hace web scraping entre bastidores. La aplicación raspará dos sitios web de noticias y combinará los titulares en una página de resultados. Ésta es la estrategia:
-
Un navegador cliente realiza una petición web a http://localhost:8080/news.
-
Nuestro servidor web recibe la solicitud y, en el backend, obtiene los datos HTML de varios sitios web de noticias.
-
Los datos de cada página se extraen para los titulares.
-
Los titulares se ordenan y formatean en el HTML de respuesta que enviamos de vuelta al cliente del navegador.
La Figura 4-1 muestra el resultado.
El raspado web se ha vuelto bastante difícil hoy en día. Por ejemplo, si pruebasrequests.get('http://edition.cnn.com')
, ¡verás que la respuesta contiene muy pocos datos utilizables! Cada vez es más necesario poder ejecutar JavaScript localmente para obtener datos, porque muchos sitios utilizan JavaScript para cargar su contenido real. El proceso de ejecutar dicho JavaScript para producir la salida HTML final y completa se denomina renderizado.
Para llevar a cabo la renderización de, utilizamos un estupendo proyecto llamadoSplash, que se describe a sí mismo como un "servicio de renderización de JavaScript". Puede ejecutarse en un contenedorDocker y proporciona una API para renderizar otros sitios. Internamente, utiliza un motor WebKit (compatible con JavaScript) para cargar y renderizar completamente un sitio web. Esto es lo que utilizaremos para obtener los datos del sitio web. Nuestro servidor aiohttp
, mostrado enel Ejemplo 4-14, llamará a esta API Splash para obtener los datos de la página.
Consejo
Para obtener y ejecutar el contenedor Splash, ejecuta estos comandos en tu shell:
$ docker pull scrapinghub/splash $ docker run --rm -p 8050:8050 scrapinghub/splash
Nuestro servidor backend llamará a la API de Splash en http://localhost:8050.
Ejemplo 4-14. Código del raspador de noticias
from
asyncio
import
gather
,
create_task
from
string
import
Template
from
aiohttp
import
web
,
ClientSession
from
bs4
import
BeautifulSoup
async
def
news
(
request
)
:
sites
=
[
(
'
http://edition.cnn.com
'
,
cnn_articles
)
,
(
'
http://www.aljazeera.com
'
,
aljazeera_articles
)
,
]
tasks
=
[
create_task
(
news_fetch
(
*
s
)
)
for
s
in
sites
]
await
gather
(
*
tasks
)
items
=
{
text
:
(
f
'
<div class=
"
box
{kind}
"
>
'
f
'
<span>
'
f
'
<a href=
"
{href}
"
>
{text}
</a>
'
f
'
</span>
'
f
'
</div>
'
)
for
task
in
tasks
for
href
,
text
,
kind
in
task
.
result
(
)
}
content
=
'
'
.
join
(
items
[
x
]
for
x
in
sorted
(
items
)
)
page
=
Template
(
open
(
'
index.html
'
)
.
read
(
)
)
return
web
.
Response
(
body
=
page
.
safe_substitute
(
body
=
content
)
,
content_type
=
'
text/html
'
,
)
async
def
news_fetch
(
url
,
postprocess
)
:
proxy_url
=
(
f
'
http://localhost:8050/render.html?
'
f
'
url=
{url}
&timeout=60&wait=1
'
)
async
with
ClientSession
(
)
as
session
:
async
with
session
.
get
(
proxy_url
)
as
resp
:
data
=
await
resp
.
read
(
)
data
=
data
.
decode
(
'
utf-8
'
)
return
postprocess
(
url
,
data
)
def
cnn_articles
(
url
,
page_data
)
:
soup
=
BeautifulSoup
(
page_data
,
'
lxml
'
)
def
match
(
tag
)
:
return
(
tag
.
text
and
tag
.
has_attr
(
'
href
'
)
and
tag
[
'
href
'
]
.
startswith
(
'
/
'
)
and
tag
[
'
href
'
]
.
endswith
(
'
.html
'
)
and
tag
.
find
(
class_
=
'
cd__headline-text
'
)
)
headlines
=
soup
.
find_all
(
match
)
return
[
(
url
+
hl
[
'
href
'
]
,
hl
.
text
,
'
cnn
'
)
for
hl
in
headlines
]
def
aljazeera_articles
(
url
,
page_data
)
:
soup
=
BeautifulSoup
(
page_data
,
'
lxml
'
)
def
match
(
tag
)
:
return
(
tag
.
text
and
tag
.
has_attr
(
'
href
'
)
and
tag
[
'
href
'
]
.
startswith
(
'
/news
'
)
and
tag
[
'
href
'
]
.
endswith
(
'
.html
'
)
)
headlines
=
soup
.
find_all
(
match
)
return
[
(
url
+
hl
[
'
href
'
]
,
hl
.
text
,
'
aljazeera
'
)
for
hl
in
headlines
]
app
=
web
.
Application
(
)
app
.
router
.
add_get
(
'
/news
'
,
news
)
web
.
run_app
(
app
,
port
=
8080
)
La función
news()
es el manejador de la URL /noticias de nuestro servidor. Devuelve la página HTML que muestra todos los titulares.Aquí, sólo tenemos dos sitios web de noticias para raspar: CNN y Al Jazeera. Podrían añadirse fácilmente más, pero entonces también habría que añadir postprocesadores adicionales, como las funciones
cnn_articles()
yaljazeera_articles()
, personalizadas para extraer datos de titulares.Para cada sitio de noticias, creamos una tarea para obtener y procesar los datos de la página HTML de su portada. Ten en cuenta que descomprimimos la tupla (
(*s)
), ya que la función coroutinenews_fetch()
toma como parámetros tanto la URL como la función de postprocesamiento. Cada llamada anews_fetch()
devolverá una lista de tuplascomo resultados de la cabecera, de la forma<article URL>
,<article title>
.Todas las tareas se reúnen en un único
Future
(gather()
devuelve un futuro que representa el estado de todas las tareas que se están reuniendo), y luego inmediatamenteawait
la finalización de ese futuro. Esta línea se suspenderá hasta que el futuro se complete.Como ya se han completado todas las tareas de
news_fetch()
, recopilamos todos los resultados en un diccionario. Observa cómo se utilizan comprensiones anidadas para iterar sobre las tareas, y luego sobre la lista de tuplas devueltas por cada tarea. También utilizamos cadenas f para sustituir datos directamente, incluyendo incluso el tipo de página, que se utilizará en CSS para colorear el fondo dediv
.En este diccionario, la clave es el título del titular, y el valor es una cadena HTML para un
div
que se mostrará en nuestra página de resultados.Nuestro servidor web va a devolver HTML. Vamos a cargar datos HTML de un archivo local llamado index.html. Este archivo se presenta en elEjemplo B-1 por si quieres recrear tú mismo el caso práctico.
Sustituimos el titular recogido
div
en la plantilla y devolvemos la página al cliente del navegador. Esto genera la página que se muestra en la Figura 4-1.Aquí, dentro de la función coroutine
news_fetch()
, tenemos una pequeña plantilla para acceder a la API Splash (que, en mi caso, se ejecuta en un contenedor Docker local en el puerto 8050). Esto demuestra cómo puede utilizarseaiohttp
como cliente HTTP.La forma estándar es crear una instancia
ClientSession()
, y luego utilizar el métodoget()
en la instancia de sesión para realizar la llamada REST. En la línea siguiente, se obtienen los datos de respuesta. Ten en cuenta que, como siempre estamos operando en coroutines, conasync with
yawait
, esta coroutine nunca se bloqueará: podremos gestionar muchos miles de estas peticiones, aunque esta operación (news_fetch()
) pueda ser relativamente lenta, ya que estamos realizando llamadas web internamente.Una vez obtenidos los datos, llamamos a la función de postprocesamiento. Para la CNN, será
cnn_articles()
, y para Al Jazeera seráaljazeera_articles()
.Sólo tenemos espacio para echar un breve vistazo al postprocesamiento. Tras obtener los datos de la página, utilizamos la biblioteca Beautiful Soup 4 para extraer los titulares.
La función
match()
devolverá todas las etiquetas que coincidan (he comprobado manualmente la fuente HTML de estos sitios web de noticias para averiguar qué combinación de filtros extrae las mejores etiquetas), y luego devolvemos una lista de tuplas que coincidan con el formato<article URL>
,<article title>
.Este es el postprocesador análogo para Al Jazeera. La condición
match()
es ligeramente diferente, pero por lo demás es la misma que la de CNN.
En general, verás que aiohttp
tiene una API sencilla y "no te estorba" mientras desarrollas tus aplicaciones.
En la siguiente sección, veremos cómo utilizar ZeroMQ con asyncio
, que tiene el curioso efecto de hacer que la programación de sockets sea bastante agradable.
ØMQ (ZeroMQ)
La programación es una ciencia disfrazada de arte, porque la mayoría de nosotros no entendemos la física del software y rara vez, o nunca, se enseña. La física del software no son los algoritmos, las estructuras de datos, los lenguajes y las abstracciones. La verdadera física del software es la física de las personas. Concretamente, se trata de nuestras limitaciones ante la complejidad y nuestro deseo de trabajar juntos para resolver grandes problemas por partes. Esta es la ciencia de la programación: crea bloques de construcción que la gente pueda entender y utilizar fácilmente, y la gente colaborará para resolver los problemas más grandes.
Pieter Hintjens, ZeroMQ: Mensajería para muchas aplicaciones
ØMQ (o ZeroMQ) es una popular biblioteca independiente del lenguaje para aplicaciones de red: proporciona sockets "inteligentes". Cuando creas sockets ØMQ en código, se parecen a los sockets normales, con nombres de métodos reconocibles como recv()
ysend()
, etc., pero internamente estos sockets se encargan de algunas de las tareas más molestas y tediosas necesarias para trabajar con sockets convencionales.
Una de las funciones que ofrece es la gestión del paso de mensajes, para que no tengas que inventar tu propio protocolo y contar bytes en el cable para averiguar cuándo han llegado todos los bytes de un mensaje concreto: simplemente envía lo que consideres un "mensaje", y todo llegará intacto al otro extremo.
Otra gran característica es la lógica de reconexión automática. Si el servidor se desconecta y vuelve a conectarse más tarde, el socket ØMQ del cliente se reconectaráautomáticamente. Y lo que es aún mejor, los mensajes que tu código envíe al socket se almacenarán en la memoria intermedia durante el periodo de desconexión, de modo que se seguirán enviando cuando vuelva el servidor. Éstas son algunas de las razones por las que a veces se denomina a ØMQmensajeríasin intermediario: proporciona algunas de las características del software de intermediación de mensajes directamente en los propios objetos socket.
Los sockets ØMQ ya están implementados como asíncronos internamente (por lo que pueden mantener muchos miles de conexiones concurrentes, incluso cuando se utilizan en código roscado), pero esto se nos oculta tras la API ØMQ. No obstante, se ha añadido soporte para Asyncio a los enlaces Python dePyZMQ para la biblioteca ØMQ, y en esta sección vamos a ver varios ejemplos de cómo podrías incorporar estos sockets inteligentes a tus aplicaciones Python.
Caso práctico: Enchufes múltiples
He aquí un rompecabezas: si ØMQ proporciona sockets que ya son asíncronos, de forma que se pueden utilizar con roscado, ¿qué sentido tiene utilizar ØMQ con asyncio
? La respuesta es un código más limpio.
Para demostrarlo, veamos un pequeño caso práctico en el que utilizas varios sockets ØMQ en la misma aplicación. En primer lugar, el Ejemplo 4-15muestra la versión de bloqueo (este ejemplo está tomado de la zguide, la guía oficial de ØMQ).
Ejemplo 4-15. El enfoque tradicional de ØMQ
# poller.py
import
zmq
context
=
zmq
.
Context
(
)
receiver
=
context
.
socket
(
zmq
.
PULL
)
receiver
.
connect
(
"
tcp://localhost:5557
"
)
subscriber
=
context
.
socket
(
zmq
.
SUB
)
subscriber
.
connect
(
"
tcp://localhost:5556
"
)
subscriber
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
poller
=
zmq
.
Poller
(
)
poller
.
register
(
receiver
,
zmq
.
POLLIN
)
poller
.
register
(
subscriber
,
zmq
.
POLLIN
)
while
True
:
try
:
socks
=
dict
(
poller
.
poll
(
)
)
except
KeyboardInterrupt
:
break
if
receiver
in
socks
:
message
=
receiver
.
recv_json
(
)
(
f
'
Via PULL:
{message}
'
)
if
subscriber
in
socks
:
message
=
subscriber
.
recv_json
(
)
(
f
'
Via SUB:
{message}
'
)
Los sockets ØMQ tienen tipos. Éste es un socket
PULL
. Puedes pensar en él como un tipo de socketde sólo recepción que será alimentado por algún otro socketde sólo envío, que será del tipoPUSH
.El socket
SUB
es otro tipo de socket de sólo recepción, y se alimentará de un socketPUB
que es de sólo envío.Si necesitas mover datos entre varios sockets en una aplicación ØMQ con hilos, vas a necesitar un poller. Esto se debe a que estos sockets no son seguros para hilos, por lo que no puedes
recv()
en diferentes sockets en diferentes hilos.1Funciona de forma similar a la llamada al sistema
select()
. El encuestador se desbloqueará cuando haya datos listos para ser recibidos en uno de los sockets registrados, y entonces depende de ti extraer los datos y hacer algo con ellos. El gran bloqueo deif
es cómo detectas el socket correcto.
Utilizar un bucle de sondeo más un bloque de selección de socket explícito hace que el código parezca un poco tosco, pero este enfoque evita los problemas de seguridad de los hilos al garantizar que no se utiliza el mismo socket desde distintos hilos.
El Ejemplo 4-16 muestra el código del servidor.
Ejemplo 4-16. Código del servidor
# poller_srv.py
import
zmq
,
itertools
,
time
context
=
zmq
.
Context
()
pusher
=
context
.
socket
(
zmq
.
PUSH
)
pusher
.
bind
(
"tcp://*:5557"
)
publisher
=
context
.
socket
(
zmq
.
PUB
)
publisher
.
bind
(
"tcp://*:5556"
)
for
i
in
itertools
.
count
():
time
.
sleep
(
1
)
pusher
.
send_json
(
i
)
publisher
.
send_json
(
i
)
Este código no es importante para la discusión, pero brevemente: hay un socket PUSH
y un socket PUB
, como he dicho antes, y un bucle dentro que envía datos a ambos sockets cada segundo. Aquí tienes un ejemplo de la salida de poller.py (nota: ambos programas deben estar en ejecución):
$ poller.py Via PULL: 0 Via SUB: 0 Via PULL: 1 Via SUB: 1 Via PULL: 2 Via SUB: 2 Via PULL: 3 Via SUB: 3
El código funciona; sin embargo, nuestro interés aquí no es si el código se ejecuta, sino si asyncio
tiene algo que ofrecer a la estructura depoller.py. Lo fundamental es comprender que nuestro códigoasyncio
se va a ejecutar en un único hilo, lo que significa que no hay problema en manejar diferentes sockets en diferentes coroutines-y, de hecho, esto es exactamente lo que haremos.
Por supuesto,alguien tuvo que hacer el arduo trabajo de añadir soporte para coroutines en la propia pyzmq
(la biblioteca cliente de Python para ØMQ) para que esto funcionara, así que no fue gratis. Pero podemos aprovechar ese duro trabajo para mejorar la estructura de código "tradicional", como se muestra en el Ejemplo 4-17.
Ejemplo 4-17. Separación limpia con asyncio
# poller_aio.py
import
asyncio
import
zmq
from
zmq
.
asyncio
import
Context
context
=
Context
(
)
async
def
do_receiver
(
)
:
receiver
=
context
.
socket
(
zmq
.
PULL
)
receiver
.
connect
(
"
tcp://localhost:5557
"
)
while
message
:
=
await
receiver
.
recv_json
(
)
:
(
f
'
Via PULL:
{message}
'
)
async
def
do_subscriber
(
)
:
subscriber
=
context
.
socket
(
zmq
.
SUB
)
subscriber
.
connect
(
"
tcp://localhost:5556
"
)
subscriber
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
while
message
:
=
await
subscriber
.
recv_json
(
)
:
(
f
'
Via SUB:
{message}
'
)
async
def
main
(
)
:
await
asyncio
.
gather
(
do_receiver
(
)
,
do_subscriber
(
)
,
)
asyncio
.
run
(
main
(
)
)
Este ejemplo de código hace lo mismo que el Ejemplo 4-15, salvo que ahora aprovechamos las coroutines para reestructurarlo todo. Ahora podemos tratar cada socket de forma aislada. He creado dos funciones coroutine, una para cada socket; ésta es para el socket
PULL
.Estoy utilizando el soporte
asyncio
enpyzmq
, lo que significa que todas las llamadas asend()
yrecv()
deben utilizar la palabra claveawait
.Poller
ya no aparece en ninguna parte, porque se ha integrado en el propio bucle de eventosasyncio
.Este es el manejador del socket
SUB
. La estructura es muy similar a la del manejador del socketPULL
, pero no tenía por qué ser así. Si hubiera sido necesaria una lógica más compleja, habría podido añadirla fácilmente aquí, totalmente encapsulada sólo en el código del manejador deSUB
.De nuevo, los sockets compatibles con
asyncio
requieren la palabra claveawait
para enviar y recibir.
El resultado es el mismo que antes, así que no lo mostraré.
El uso de coroutines tiene, en mi opinión, un efecto asombrosamente positivo en la disposición del código en estos ejemplos. En un código de producción real con muchos sockets ØMQ, los manejadores de coroutina para cada uno de ellos podrían estar incluso en archivos separados, proporcionando más oportunidades para una mejor estructura del código. E incluso para programas con una única toma de lectura/escritura, es muy fácil utilizar coroutinas separadas para la lectura y la escritura, si es necesario.
El código mejorado se parece mucho al código roscado y, de hecho, para el ejemplo concreto que se muestra aquí, el mismo refactor funcionará para roscado: ejecutar las funciones do_receiver()
y do_subscriber()
bloqueantes en hilos separados. Pero, ¿realmente quieres lidiar incluso con la posibilidadde que se produzcan condiciones de carrera, especialmente a medida que tu aplicación crece en características y complejidad con el tiempo?
Hay mucho que explorar aquí, y como he dicho antes, es muy divertido jugar con estos sockets mágicos. En el próximo estudio de caso, veremos un uso más práctico de ØMQ.
Caso práctico: Monitoreo del rendimiento de las aplicaciones
Con las modernas prácticas de implementación basadas en contenedores y microservicios de hoy en día, algunas cosas que antes eran triviales, como el monitoreo del uso de CPU y memoria de tus aplicaciones, se han vuelto algo más complicadas que simplemente ejecutar top
. En los últimos años han surgido varios productos comerciales para solucionar estos problemas, pero su coste puede ser prohibitivo para los equipos de pequeñas startups y aficionados.
En este caso práctico, utilizaré ØMQ y asyncio
para construir un prototipo de juguete para el monitoreo de aplicaciones distribuidas. Nuestro diseño consta de tres partes:
- Capa de aplicación
-
Esta capa contiene todas nuestras aplicaciones. Algunos ejemplos podrían ser un microservicio de "clientes", un microservicio de "reservas", un microservicio de "envío de correos electrónicos", etc. Añadiré un socket "transmisor" ØMQ a cada una de nuestras aplicaciones. Este socket enviará métricas de rendimiento a un servidor central.
- Capa de recogida
-
El servidor central expondrá un socket ØMQ para recoger los datos de todas las instancias de la aplicación en ejecución. El servidor también servirá una página web para mostrar gráficos de rendimiento a lo largo del tiempo y transmitirá en directo los datos a medida que vayan llegando.
- Capa de visualización
-
Esta es la página web que se sirve. Mostraremos los datos recogidos en un conjunto de gráficos, que se actualizarán en tiempo real. Para simplificar los ejemplos de código, utilizaré la práctica biblioteca JavaScript Smoothie Charts, que proporciona todas las funciones necesarias del lado del cliente.
La app backend (capa de aplicación) que produce métricas se muestra en el Ejemplo 4-18.
Ejemplo 4-18. La capa de aplicación: producción de métricas
import
argparse
import
asyncio
from
random
import
randint
,
uniform
from
datetime
import
datetime
as
dt
from
datetime
import
timezone
as
tz
from
contextlib
import
suppress
import
zmq
,
zmq
.
asyncio
,
psutil
ctx
=
zmq
.
asyncio
.
Context
(
)
async
def
stats_reporter
(
color
:
str
)
:
p
=
psutil
.
Process
(
)
sock
=
ctx
.
socket
(
zmq
.
PUB
)
sock
.
setsockopt
(
zmq
.
LINGER
,
1
)
sock
.
connect
(
'
tcp://localhost:5555
'
)
with
suppress
(
asyncio
.
CancelledError
)
:
while
True
:
await
sock
.
send_json
(
dict
(
color
=
color
,
timestamp
=
dt
.
now
(
tz
=
tz
.
utc
)
.
isoformat
(
)
,
cpu
=
p
.
cpu_percent
(
)
,
mem
=
p
.
memory_full_info
(
)
.
rss
/
1024
/
1024
)
)
await
asyncio
.
sleep
(
1
)
sock
.
close
(
)
async
def
main
(
args
)
:
asyncio
.
create_task
(
stats_reporter
(
args
.
color
)
)
leak
=
[
]
with
suppress
(
asyncio
.
CancelledError
)
:
while
True
:
sum
(
range
(
randint
(
1
_000
,
10
_000_000
)
)
)
await
asyncio
.
sleep
(
uniform
(
0
,
1
)
)
leak
+
=
[
0
]
*
args
.
leak
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--color
'
,
type
=
str
)
parser
.
add_argument
(
'
--leak
'
,
type
=
int
,
default
=
0
)
args
=
parser
.
parse_args
(
)
try
:
asyncio
.
run
(
main
(
args
)
)
except
KeyboardInterrupt
:
(
'
Leaving...
'
)
ctx
.
term
(
)
Esta función coroutine se ejecutará como una coroutine de larga duración, enviando continuamente datos al proceso servidor.
Crea un socket ØMQ. Como sabes, hay distintos tipos de socket; éste es del tipo
PUB
, que permite enviar mensajes unidireccionales a otro socket ØMQ. Este socket tiene -como dice la guía ØMQ- superpoderes. Se encargará automáticamente de toda la lógica de reconexión y almacenamiento en búfer por nosotros.Conéctate al servidor.
Nuestra secuencia de apagado está dirigida por
KeyboardInterrupt
, más abajo. Cuando se reciba esa señal, se cancelarán todas las tareas. Aquí manejo la señalCancelledError
con el práctico gestor de contextosuppress()
del módulo de biblioteca estándarcontextlib
.Iterar eternamente, enviando datos al servidor.
Dado que ØMQ sabe cómo trabajar con mensajes completos, y no sólo con trozos de un bytestream, abre la puerta a un montón de útiles envoltorios alrededor del lenguaje habitual de
sock.send()
: aquí, utilizo uno de esos métodos de ayuda,send_json()
, que serializará automáticamente el argumento en JSON. Esto nos permite utilizar directamente undict()
.Una forma fiable de transmitir información fecha-hora es mediante el formato ISO 8601. Esto es especialmente cierto si tienes que pasar datos de fecha y hora entre programas escritos en distintos idiomas, ya que la gran mayoría de las implementaciones de idiomas podrán trabajar con esta norma.
Para terminar aquí, debemos haber recibido la excepción
CancelledError
resultante de la cancelación de la tarea. El socket ØMQ debe cerrarse para permitir el cierre del programa.La función
main()
simboliza la aplicación de microservicio real. Se produce un trabajo falso con esta suma sobre números aleatorios, sólo para darnos algunos datos distintos de cero para verlos en la capa de visualización un poco más tarde.Voy a crear varias instancias de esta aplicación, por lo que será conveniente poder distinguirlas (más adelante, en los gráficos) con un parámetro
--color
.Por último, se puede terminar el contexto ØMQ.
El principal punto de interés es la función stats_reporter()
. Es la que transmite los datos de las métricas (recogidos por la útil biblioteca psutil
). Se puede suponer que el resto del código es una aplicación de microservicio típica.
El código del servidor del Ejemplo 4-19 recoge todos los datos y los sirve a un cliente web.
Ejemplo 4-19. La capa de recogida: este servidor recoge las estadísticas de los procesos
# metric-server.py
import
asyncio
from
contextlib
import
suppress
import
zmq
import
zmq
.
asyncio
import
aiohttp
from
aiohttp
import
web
from
aiohttp_sse
import
sse_response
from
weakref
import
WeakSet
import
json
# zmq.asyncio.install()
ctx
=
zmq
.
asyncio
.
Context
(
)
connections
=
WeakSet
(
)
async
def
collector
(
)
:
sock
=
ctx
.
socket
(
zmq
.
SUB
)
sock
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
sock
.
bind
(
'
tcp://*:5555
'
)
with
suppress
(
asyncio
.
CancelledError
)
:
while
data
:
=
await
sock
.
recv_json
(
)
:
(
data
)
for
q
in
connections
:
await
q
.
put
(
data
)
sock
.
close
(
)
async
def
feed
(
request
)
:
queue
=
asyncio
.
Queue
(
)
connections
.
add
(
queue
)
with
suppress
(
asyncio
.
CancelledError
)
:
async
with
sse_response
(
request
)
as
resp
:
while
data
:
=
await
queue
.
get
(
)
:
(
'
sending data:
'
,
data
)
resp
.
send
(
json
.
dumps
(
data
)
)
return
resp
async
def
index
(
request
)
:
return
aiohttp
.
web
.
FileResponse
(
'
./charts.html
'
)
async
def
start_collector
(
app
)
:
app
[
'
collector
'
]
=
app
.
loop
.
create_task
(
collector
(
)
)
async
def
stop_collector
(
app
)
:
(
'
Stopping collector...
'
)
app
[
'
collector
'
]
.
cancel
(
)
await
app
[
'
collector
'
]
ctx
.
term
(
)
if
__name__
==
'
__main__
'
:
app
=
web
.
Application
(
)
app
.
router
.
add_route
(
'
GET
'
,
'
/
'
,
index
)
app
.
router
.
add_route
(
'
GET
'
,
'
/feed
'
,
feed
)
app
.
on_startup
.
append
(
start_collector
)
app
.
on_cleanup
.
append
(
stop_collector
)
web
.
run_app
(
app
,
host
=
'
127.0.0.1
'
,
port
=
8088
)
Una mitad de este programa recibirá datos de otras aplicaciones, y la otra mitad proporcionará datos a los clientes del navegador a través de eventos enviados por el servidor (SSE). Utilizo un identificador
WeakSet()
para llevar la cuenta de todos los clientes web conectados en ese momento. Cada cliente conectado tendrá una instanciaQueue()
asociada, por lo que este identificadorconnections
es en realidad un conjunto de colas.Recuerda que en la capa de aplicación utilicé un socket
zmq.PUB
; aquí, en la capa de recogida, utilizo su compañero, el tipo de socketzmq.SUB
. Este socket ØMQ sólo puede recibir, no enviar.Para el tipo de socket
zmq.SUB
, es necesario proporcionar un nombre de suscripción, pero para nuestros propósitos, simplemente tomaremos todo lo que entre, de ahí el nombre de tema vacío.Vinculo la toma
zmq.SUB
. Piénsalo un momento. En las configuraciones pub-sub, normalmente tienes que hacer que el extremo pub sea el servidor (bind()
) y el extremo sub el cliente (connect()
). ØMQ es diferente: cualquiera de los dos extremos puede ser el servidor. Para nuestro caso de uso, esto es importante, porque cada una de nuestras instancias de la capa de aplicación se conectará al mismo nombre de dominio del servidor de recogida, y no al revés.El soporte para
asyncio
enpyzmq
nos permiteawait
datos de nuestras aplicaciones conectadas. Y no sólo eso, sino que los datos entrantes se deserializarán automáticamente a partir de JSON (sí, esto significa quedata
es undict()
).Recuerda que nuestro conjunto
connections
contiene una cola para cada cliente web conectado. Ahora que se han recibido los datos, es el momento de enviarlos a todos los clientes: los datos se colocan en cada cola.La función coroutine
feed()
creará coroutines para cada cliente web conectado. Internamente, los eventos enviados por el servidor se utilizan para enviar datos a los clientes web.Como se ha descrito anteriormente, cada cliente web tendrá su propia instancia
queue
, para recibir datos de la coroutinacollector()
. La instanciaqueue
se añade al conjuntoconnections
, pero comoconnections
es un conjunto débil, la entrada se eliminará automáticamente deconnections
cuandoqueue
salga del ámbito, es decir, cuando un cliente web se desconecte. Las referencias débiles son estupendas para simplificar este tipo de tareas de contabilidad.El paquete
aiohttp_sse
proporciona el gestor de contextosse_response()
. Esto nos proporciona un ámbito dentro del cual enviar datos al cliente web.Permanecemos conectados al cliente web, y esperamos los datos en la cola de este cliente específico.
En cuanto entren los datos (dentro de
collector()
), se enviarán al cliente web conectado. Observa que aquí vuelvo a serializar el dictdata
. Una optimización de este código sería evitar deserializar JSON encollector()
, y en su lugar utilizarsock.recv_string()
para evitar el viaje de ida y vuelta de la serialización. Por supuesto, en un escenario real, puede que quieras deserializar en el colector, y realizar alguna validación de los datos antes de enviarlos al cliente del navegador. ¡Cuántas opciones!El punto final
index()
es la carga principal de la página, y aquí servimos un archivo estático llamado gráficos.html.La biblioteca
aiohttp
nos proporciona facilidades para enganchar coroutinas de larga duración adicionales que podamos necesitar. Con la coroutinacollector()
, tenemos exactamente esa situación, así que creo una coroutina de inicio,start_collector()
, y una coroutina de apagado. Éstas serán llamadas durante fases específicas de la secuencia de inicio y apagado deaiohttp
. Observa que añado la tarea recolectora a la propiaapp
, que implementa un protocolo de mapeo para que puedas utilizarla como un dict.Obtengo nuestra coroutina
collector()
del identificadorapp
y llamo acancel()
sobre ella.Por último, puedes ver dónde se enganchan las coroutines personalizadas de inicio y apagado: la instancia
app
proporciona ganchos a los que se pueden añadir nuestras coroutines personalizadas.
Todo lo que queda es la capa de visualización, que se muestra en el Ejemplo 4-20. Estoy utilizando labiblioteca Smoothie Charts para generar gráficos desplazables, y el HTML completo de nuestra página web principal (y única),charts.html, se proporciona en el Ejemplo B-1. Hay demasiado HTML, CSS y JavaScript para presentar en esta sección, pero sí quiero destacar algunos puntos sobre cómo se gestionan en JavaScript los eventos enviados por el servidor en el cliente del navegador.
Ejemplo 4-20. La capa de visualización, que es una forma elegante de decir "el navegador".
<
snip
>
var
evtSource
=
new
EventSource
(
"/feed"
)
;
evtSource
.
onmessage
=
function
(
e
)
{
var
obj
=
JSON
.
parse
(
e
.
data
)
;
if
(
!
(
obj
.
color
in
cpu
)
)
{
add_timeseries
(
cpu
,
cpu_chart
,
obj
.
color
)
;
}
if
(
!
(
obj
.
color
in
mem
)
)
{
add_timeseries
(
mem
,
mem_chart
,
obj
.
color
)
;
}
cpu
[
obj
.
color
]
.
append
(
Date
.
parse
(
obj
.
timestamp
)
,
obj
.
cpu
)
;
mem
[
obj
.
color
]
.
append
(
Date
.
parse
(
obj
.
timestamp
)
,
obj
.
mem
)
;
}
;
<
snip
>
Crea una nueva instancia de
EventSource()
en la URL de /feed. El navegador se conectará a /feed en nuestro servidor, (metric_server.py). Ten en cuenta que el navegador intentará reconectarse automáticamente si se pierde la conexión. Los eventos enviados por el servidor suelen pasarse por alto, pero en muchas situaciones su simplicidad los hace preferibles a los WebSockets.El evento
onmessage
se disparará cada vez que el servidor envíe datos. Aquí los datos se analizan como JSON.El identificador
cpu
es un mapeo de un color a una instancia deTimeSeries()
(para más información, consulta el Ejemplo B-1). Aquí, obtenemos esa serie temporal y le añadimos datos. También obtenemos la marca de tiempo y la analizamos para obtener el formato correcto que requiere el gráfico.
Ahora podemos ejecutar el código. Para poner en marcha todo el programa, se necesitan un montón de instrucciones de la línea de comandos, la primera de las cuales es poner en marcha el proceso del recopilador de datos:
$ metric-server.py ======== Running on http://127.0.0.1:8088 ======== (Press CTRL+C to quit)
El siguiente paso es poner en marcha todas las instancias de microservicio. Éstas enviarán sus métricas de uso de CPU y memoria al recopilador. Cada una se identificará con un color diferente, que se especifica en la línea de comandos. Observa cómo se indica a dos de los microservicios que pierdan algo de memoria:
$ backend-app.py --color red & $ backend-app.py --color blue --leak 10000 & $ backend-app.py --color green --leak 100000 &
La Figura 4-2 muestra nuestro producto final en un navegador. Tendrás que creer en mi palabra de que los gráficos realmente se animan. Observarás en las líneas de comando precedentes que añadí algunas fugas de memoria al azul, y muchas al verde. Incluso tuve que reiniciar el servicio verde unas cuantas veces para evitar que superara los 100 MB.
Lo que es especialmente interesante de este proyecto es lo siguiente: cualquiera de las instancias en ejecución en cualquier parte de esta pila puede reiniciarse, y no es necesario ningún código de gestión de reconexión. Los sockets ØMQ, junto con la instancia JavaScript EventSource()
en el navegador, se reconectan mágicamente y continúan donde lo dejaron.
En la siguiente sección, centraremos nuestra atención en las bases de datos y en cómo podría utilizarse asyncio
para diseñar un sistema de invalidación de cachés.
asyncpg y Sanic
La bibliotecaasyncpg
proporciona acceso de cliente a la base de datos PostgreSQL, pero se diferencia de otras bibliotecas de cliente Postgres compatibles conasyncio
por su énfasis en la velocidad.asyncpg
es obra de Yury Selivanov, uno de los principales desarrolladores de asyncio
Python, que también es autor del proyecto uvloop.No tiene dependencias de terceros, aunque se requiereCython si se instala desde el código fuente.
asyncpg
alcanza su velocidad trabajando directamente contra el protocolo binario de PostgreSQL, y otras ventajas de este enfoque de bajo nivel incluyen la compatibilidad consentencias preparadasy cursores desplazables.
Veremos un caso práctico en el que se utiliza asyncpg
para la invalidación de cachés, pero antes será útil tener una comprensión básica de la API que proporciona asyncpg
. Para todo el código de esta sección, necesitaremos una instancia en ejecución de PostgreSQL. Esto se hace más fácilmente con Docker, utilizando el siguiente comando:
$ docker run -d --rm -p 55432:5432 postgres
Ten en cuenta que he expuesto el puerto 55432 en lugar del predeterminado, 5432, por si ya tienes una instancia en ejecución de la base de datos en el puerto predeterminado. El Ejemplo 4-21 muestra brevemente cómo utilizar asyncpg
para hablar con PostgreSQL.
Ejemplo 4-21. Demostración básica de asyncpg
# asyncpg-basic.py
import
asyncio
import
asyncpg
import
datetime
from
util
import
Database
async
def
main
(
)
:
async
with
Database
(
'
test
'
,
owner
=
True
)
as
conn
:
await
demo
(
conn
)
async
def
demo
(
conn
:
asyncpg
.
Connection
)
:
await
conn
.
execute
(
'''
CREATE TABLE users(
id serial PRIMARY KEY,
name text,
dob date
)
'''
)
pk
=
await
conn
.
fetchval
(
'
INSERT INTO users(name, dob) VALUES($1, $2)
'
'
RETURNING id
'
,
'
Bob
'
,
datetime
.
date
(
1984
,
3
,
1
)
)
async
def
get_row
(
)
:
return
await
conn
.
fetchrow
(
'
SELECT * FROM users WHERE name = $1
'
,
'
Bob
'
)
(
'
After INSERT:
'
,
await
get_row
(
)
)
await
conn
.
execute
(
'
UPDATE users SET dob = $1 WHERE id=1
'
,
datetime
.
date
(
1985
,
3
,
1
)
)
(
'
After UPDATE:
'
,
await
get_row
(
)
)
await
conn
.
execute
(
'
DELETE FROM users WHERE id=1
'
)
(
'
After DELETE:
'
,
await
get_row
(
)
)
if
__name__
==
'
__main__
'
:
asyncio
.
run
(
main
(
)
)
He ocultado algunos textos repetitivos en un pequeño módulo de
util
para simplificar las cosas y mantener el mensaje principal.La clase
Database
nos proporciona un gestor de contexto que creará una nueva base de datos para nosotros -en este caso llamadatest
- y destruirá esa base de datos cuando el gestor de contexto salga. Esto resulta muy útil cuando experimentamos con ideas en código. Como no se arrastra ningún estado entre experimentos, cada vez se parte de una base de datos limpia. Ten en cuenta que se trata de un gestor de contextoasync with
; hablaremos más de ello más adelante, pero por ahora, el área central de esta demostración es lo que ocurre dentro de la coroutinademo()
.El gestor de contexto
Database
nos ha proporcionado una instanciaConnection
, que se utiliza inmediatamente para crear una nueva tabla,users
.Utilizo
fetchval()
para insertar un nuevo registro. Aunque podría haber utilizadoexecute()
para realizar la inserción, la ventaja de utilizarfetchval()
es que puedo obtener elid
del registro recién insertado, que almaceno en el identificadorpk
.Ten en cuenta que utilizo parámetros (
$1
y$2
) para pasar datos a la consulta SQL. Nunca utilices la interpolación o concatenación de cadenas para construir consultas, ¡ya que es un riesgo para la seguridad!En el resto de esta demostración, voy a manipular los datos de la tabla
users
, así que aquí creo una nueva función coroutine de utilidad que obtiene un registro de la tabla. Esta función se llamará varias veces.Cuando recuperes datos, es mucho más útil utilizar los métodos basados en
fetch
, porque devolverán objetosRecord
.asyncpg
convertirá automáticamente los tipos de datos a los tipos más apropiados para Python.Inmediatamente utilizo el ayudante
get_row()
para mostrar el registro recién insertado.Modifico datos utilizando el comando
UPDATE
para SQL. Es una modificación minúscula: el valor del año en la fecha de nacimiento se cambia en un año. Como antes, esto se realiza con el métodoexecute()
de la conexión. El resto del código de la demostración sigue la misma estructura que lo visto hasta ahora, y unDELETE
, seguido de otroprint()
, ocurre unas líneas más abajo.
Aquí tienes el resultado de ejecutar este script:
$ asyncpg-basic.py After INSERT: <Record id=1 name='Bob' dob=datetime.date(1984, 3, 1)> After UPDATE: <Record id=1 name='Bob' dob=datetime.date(1985, 3, 1)> After DELETE: None
Observa cómo el valor de fecha recuperado en nuestro objeto Record
se ha convertido en un objeto Python date
: asyncpg
ha convertido automáticamente el tipo de dato del tipo SQL a su homólogo Python. En la documentación de asyncpg
hay una gran tabla deconversiones de tipos que describe todas las correspondencias de tipos que incorpora la biblioteca.
El código anterior es muy sencillo, quizá incluso tosco si estás acostumbrado a la comodidad de los mapeadores objeto-relacionales (ORM) como SQLAlchemy o el ORM integrado en el marco web Django. Al final de este capítulo, menciono varias bibliotecas de terceros que proporcionan acceso a los ORM o a funciones similares a los ORM para asyncpg
.
El Ejemplo 4-22 muestra mi objeto boilerplate Database
en el módulo utils
; puede que te resulte útil hacer algo parecido para tus propios experimentos.
Ejemplo 4-22. Herramientas útiles para tus experimentos con asyncpg
# util.py
import
argparse
,
asyncio
,
asyncpg
from
asyncpg
.
pool
import
Pool
DSN
=
'
postgresql://
{user}
@
{host}
:
{port}
'
DSN_DB
=
DSN
+
'
/
{name}
'
CREATE_DB
=
'
CREATE DATABASE
{name}
'
DROP_DB
=
'
DROP DATABASE
{name}
'
class
Database
:
def
__init__
(
self
,
name
,
owner
=
False
,
*
*
kwargs
)
:
self
.
params
=
dict
(
user
=
'
postgres
'
,
host
=
'
localhost
'
,
port
=
55432
,
name
=
name
)
self
.
params
.
update
(
kwargs
)
self
.
pool
:
Pool
=
None
self
.
owner
=
owner
self
.
listeners
=
[
]
async
def
connect
(
self
)
-
>
Pool
:
if
self
.
owner
:
await
self
.
server_command
(
CREATE_DB
.
format
(
*
*
self
.
params
)
)
self
.
pool
=
await
asyncpg
.
create_pool
(
DSN_DB
.
format
(
*
*
self
.
params
)
)
return
self
.
pool
async
def
disconnect
(
self
)
:
"""Destroy the database"""
if
self
.
pool
:
releases
=
[
self
.
pool
.
release
(
conn
)
for
conn
in
self
.
listeners
]
await
asyncio
.
gather
(
*
releases
)
await
self
.
pool
.
close
(
)
if
self
.
owner
:
await
self
.
server_command
(
DROP_DB
.
format
(
*
*
self
.
params
)
)
async
def
__aenter__
(
self
)
-
>
Pool
:
return
await
self
.
connect
(
)
async
def
__aexit__
(
self
,
*
exc
)
:
await
self
.
disconnect
(
)
async
def
server_command
(
self
,
cmd
)
:
conn
=
await
asyncpg
.
connect
(
DSN
.
format
(
*
*
self
.
params
)
)
await
conn
.
execute
(
cmd
)
await
conn
.
close
(
)
async
def
add_listener
(
self
,
channel
,
callback
)
:
conn
:
asyncpg
.
Connection
=
await
self
.
pool
.
acquire
(
)
await
conn
.
add_listener
(
channel
,
callback
)
self
.
listeners
.
append
(
conn
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--cmd
'
,
choices
=
[
'
create
'
,
'
drop
'
]
)
parser
.
add_argument
(
'
--name
'
,
type
=
str
)
args
=
parser
.
parse_args
(
)
d
=
Database
(
args
.
name
,
owner
=
True
)
if
args
.
cmd
==
'
create
'
:
asyncio
.
run
(
d
.
connect
(
)
)
elif
args
.
cmd
==
'
drop
'
:
asyncio
.
run
(
d
.
disconnect
(
)
)
else
:
parser
.
print_help
(
)
La clase
Database
no es más que un elegante gestor de contexto para crear y eliminar una base de datos de una instancia PostgreSQL. El nombre de la base de datos se pasa al constructor.(Nota: La secuencia de llamadas en el código es intencionadamente distinta a la de esta lista). Se trata de un gestor de contexto asíncrono. En lugar de los métodos habituales
__enter__()
y__exit__()
, utilizo sus homólogos__aenter__()
y__aexit__()
.Aquí, en la parte de entrada, crearé la nueva base de datos y devolveré una conexión a esa nueva base de datos.
server_command()
es otro método auxiliar definido unas líneas más abajo. Lo utilizo para ejecutar el comando de creación de nuestra nueva base de datos.A continuación, establezco una conexión con la base de datos recién creada. Fíjate en que he codificado varios detalles de la conexión: esto es intencionado, ya que quería que los ejemplos de código fueran pequeños. Podrías generalizarlo fácilmente creando campos para el nombre de usuario, el nombre de host y el puerto.
En la parte de salida del gestor contextual, cierro la conexión y...
...destruir la base de datos.
Para completar, éste es nuestro método de utilidad para ejecutar comandos contra el propio servidor PostgreSQL. Crea una conexión a tal efecto, ejecuta el comando dado y sale.
Esta función crea una conexión de socket de larga duración a la base de datos que escuchará los eventos. Este mecanismo aparecerá en el próximo estudio de caso.
Precaución
En el punto 8 del código anterior, he creado una conexión dedicada para cada canal en el que quiero escuchar. Esto es caro, ya que significa que un trabajador PostgreSQL estará completamente ocupado para cada canal que se escuche. Un diseño mucho mejor sería utilizar una conexión para varios canales. Cuando hayas trabajado con este ejemplo, intenta modificar el código para utilizar una única conexión para múltiples canales de escucha.
Ahora que ya conoces los componentes básicos de asyncpg
, podemos explorarlos más a fondo con un caso práctico realmente divertido: ¡utilizar el soporte integrado de PostgreSQL para enviar notificaciones de eventos para realizar la invalidación de la caché!
Caso práctico: Invalidación de caché
Hay dos cosas difíciles en informática: la invalidación de la caché, poner nombre a las cosas, y los errores fuera de lugar.
Phil Karlton
Es habitual en los servicios web y en las aplicaciones web que la capa de persistencia, es decir, la base de datos (BD) de respaldo, se convierta en el cuello de botella del rendimiento antes que cualquier otra parte de la pila. La capa de aplicación suele poder escalarse horizontalmente ejecutando más instancias, mientras que es más difícil hacerlo con una base de datos.
Por eso es una práctica habitual buscar opciones de diseño que puedan limitar la interacción excesiva con la base de datos. La opción más habitual es utilizar la memoria caché para "recordar" los resultados obtenidos previamente en la base de datos y reproducirlos cuando se soliciten, evitando así llamadas posteriores a la BD para obtener la misma información.
Sin embargo, ¿qué ocurre si una de tus instancias de aplicación escribe nuevos datos en la base de datos mientras otra instancia de aplicación sigue devolviendo los datos antiguos y obsoletos de su caché interna? Este es un problema clásico de invalidación de la caché, y puede ser muy difícil de resolver de forma sólida.
Nuestra estrategia de ataque es la siguiente
-
Cada instancia de la aplicación tiene una caché en memoria de las consultas a la BD.
-
Cuando uno escribe nuevos datos en la base de datos, ésta avisa a todas las instancias de app conectadas de los nuevos datos.
-
Cada instancia de la app actualiza entonces su caché interna en consecuencia.
Este caso práctico destacará cómo PostgreSQL, con su soporte incorporado para actualizaciones de eventos a través de la funciónLISTEN
yNOTIFY
puede indicarnos de forma sencilla cuándo han cambiado sus datos.
asyncpg
ya tiene soporte para la API LISTEN
/NOTIFY
. Esta característica de PostgreSQL permite a tu aplicación suscribirse a eventos en un canal con nombre y enviar eventos a canales con nombre. ¡PostgreSQL casi puede convertirse en una versión más ligera deRabbitMQ oActiveMQ!
Este caso práctico tiene más partes móviles de lo habitual, y eso hace que sea difícil presentarlo en el formato lineal habitual. En lugar de eso, empezaremos por el producto final, y luego iremos hacia la implementación subyacente.
Nuestra aplicación proporciona un servidor API basado en JSON para gestionar los platos favoritos de los clientes de nuestro restaurante robotizado. La base de datos de respaldo tendrá una sola tabla, patron
, con sólo dos campos: name
y fav_dish
. Nuestra API permitirá el conjunto habitual de cuatro operaciones: crear, leer,actualizar y eliminar (CRUD).
A continuación se muestra un ejemplo de interacción con nuestra API utilizando curl
, que ilustra cómo crear una nueva entrada en nuestra base de datos (aún no he mostrado cómo poner en marcha el servidor que se ejecuta en localhost:8000; eso vendrá más adelante):
$ curl -d '{"name": "Carol", "fav_dish": "SPAM Bruschetta"}' \ -H "Content-Type: application/json" \ -X POST \ http://localhost:8000/patron {"msg":"ok","id":37}
El parámetro -d
es para los datos,2 -H
es para las cabeceras HTTP, -X
es para el método de solicitud HTTP (las alternativas incluyen GET
, DELETE
, PUT
, y algunas otras), y la URL es para nuestro servidor API. En breve veremos el código correspondiente.
En la salida, vemos que la creación fue ok
, y la id
que se devuelve es la clave primaria del nuevo registro en la base de datos.
En los siguientes fragmentos de shell, repasaremos las otras tres operaciones:leer, actualizar y borrar. Podemos leer el registro de patrón que acabamos de crear con este comando:
$ curl -X GET http://localhost:8000/patron/37
{"id":37,"name":"Carol","fav_dish":"SPAM Bruschetta"}
Leer los datos es bastante sencillo. Ten en cuenta que la dirección id
del registro deseado debe indicarse en la URL.
A continuación, actualizaremos el registro y comprobaremos los resultados:
$ curl -d '{"name": "Eric", "fav_dish": "SPAM Bruschetta"}' \ -H "Content-Type: application/json" \ -X PUT \ http://localhost:8000/patron/37 $ curl -X GET http://localhost:8000/patron/37 {"msg":"ok"} {"id":37,"name":"Eric","fav_dish":"SPAM Bruschetta"}
Actualizar un recurso es similar a crearlo, con dos diferencias clave:
-
El método de solicitud HTTP (
-X
) esPUT
, noPOST
. -
La URL requiere ahora el campo
id
para especificar qué recurso actualizar.
Por último, podemos eliminar el registro y verificar su eliminación con los siguientes comandos:
$ curl -X DELETE http://localhost:8000/patron/37 $ curl -X GET http://localhost:8000/patron/37 {"msg":"ok"} null
Como puedes ver, se devuelve null
cuando intentasGET
un registro que no existe.
Hasta ahora todo esto parece bastante ordinario, pero nuestro objetivo no es sólo hacer una API CRUD: queremos examinar la invalidación de la caché. Así que dirijamos nuestra atención hacia la caché. Ahora que tenemos un conocimiento básico de la API de nuestra aplicación, podemos mirar los registros de la aplicación para ver los datos de tiempo de cada solicitud: esto nos dirá qué solicitudes se almacenan en caché y cuáles llegan a la BD.
Cuando se inicia el servidor por primera vez, la caché está vacía; al fin y al cabo, es una caché de memoria. Vamos a poner en marcha nuestro servidor y, a continuación, en un intérprete de comandos separado, ejecutaremos dos peticiones GET
en rápida sucesión:
$ curl -X GET http://localhost:8000/patron/29 $ curl -X GET http://localhost:8000/patron/29 {"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"} {"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"}
Esperamos que la primera vez que recuperemos nuestro registro, se produzca un fallo de caché, y la segunda vez, un acierto. Podemos ver pruebas de ello en el registro del propio servidor de la API (el primer servidor web de Sanic, que se ejecuta en localhost:8000):
$ sanic_demo.py 2019-09-29 16:20:33 - (sanic)[DEBUG]: ▄▄▄▄▄ ▀▀▀██████▄▄▄ _______________ ▄▄▄▄▄ █████████▄ / \ ▀▀▀▀█████▌ ▀▐▄ ▀▐█ | Gotta go fast! | ▀▀█████▄▄ ▀██████▄██ | _________________/ ▀▄▄▄▄▄ ▀▀█▄▀█════█▀ |/ ▀▀▀▄ ▀▀███ ▀ ▄▄ ▄███▀▀██▄████████▄ ▄▀▀▀▀▀▀█▌ ██▀▄▄▄██▀▄███▀ ▀▀████ ▄██ ▄▀▀▀▄██▄▀▀▌████▒▒▒▒▒▒███ ▌▄▄▀ ▌ ▐▀████▐███▒▒▒▒▒▐██▌ ▀▄▄▄▄▀ ▀▀████▒▒▒▒▄██▀ ▀▀█████████▀ ▄▄██▀██████▀█ ▄██▀ ▀▀▀ █ ▄█ ▐▌ ▄▄▄▄█▌ ▀█▄▄▄▄▀▀▄ ▌ ▐ ▀▀▄▄▄▀ ▀▀▄▄▀ 2019-09-29 16:20:33 (sanic): Goin' Fast @ http://0.0.0.0:8000 2019-09-29 16:20:33 (sanic): Starting worker [10366] 2019-09-29 16:25:27 (perf): id=37 Cache miss 2019-09-29 16:25:27 (perf): get Elapsed: 4.26 ms 2019-09-29 16:25:27 (perf): get Elapsed: 0.04 ms
Todo lo que hay hasta esta línea es el mensaje de registro de inicio predeterminado de
sanic
.Como se ha descrito, la primera
GET
provoca una pérdida de caché porque el servidor acaba de iniciarse.Esto es de nuestro primer
curl -X GET
. He añadido algunas funciones de temporización a los puntos finales de la API. Aquí podemos ver que el gestor de la peticiónGET
tardó ~4 ms.El segundo
GET
devuelve datos de la caché, y los datos de temporización mucho más rápidos (¡100 veces más rápidos!).
Hasta aquí, nada inusual. Muchas aplicaciones web utilizan la caché de este modo.
Ahora vamos a iniciar una segunda instancia de la app en el puerto 8001 (la primera instancia estaba en el puerto 8000):
$ sanic_demo.py --port 8001 <snip> 2017-10-02 08:09:56 - (sanic): Goin' Fast @ http://0.0.0.0:8001 2017-10-02 08:09:56 - (sanic): Starting worker [385]
Ambas instancias, por supuesto, se conectan a la misma base de datos. Ahora, con las dos instancias del servidor API en funcionamiento, vamos a modificar los datos para el clienteJuan, que claramente carece de suficiente Spam en su dieta. Aquí realizamos un UPDATE
contra la primera instancia de la app en el puerto 8000:
$ curl -d '{"name": "John Cleese", "fav_dish": "SPAM on toast"}' \ -H "Content-Type: application/json" \ -X PUT \ http://localhost:8000/patron/29 {"msg":"ok"}
Inmediatamente después de este evento de actualización en una sola de las instancias de la aplicación, ambosservidores API, el 8000 y el 8001, informan del evento en sus respectivos registros:
2019-10-02 08:35:49 - (perf)[INFO]: Got DB event: { "table": "patron", "id": 29, "type": "UPDATE", "data": { "old": { "id": 29, "name": "John Cleese", "fav_dish": "Gravy on Toast" }, "new": { "id": 29, "name": "John Cleese", "fav_dish": "SPAM on toast" }, "diff": { "fav_dish": "SPAM on toast" } } }
La base de datos ha comunicado el evento de actualización a ambas instancias de la aplicación, pero aún no hemos realizado ninguna solicitud a la instancia 8001 de la aplicación.
Para comprobarlo, podemos hacer un GET
en el segundo servidor, en el puerto 8001:
$ curl -X GET http://localhost:8001/patron/29 {"id":29,"name":"John Cleese","fav_dish":"SPAM on toast"}
La información sobre el tiempo en la salida del registro muestra que, efectivamente, obtenemos los datos directamente de la caché, aunque ésta sea nuestra primera petición:
2019-10-02 08:46:45 - (perf)[INFO]: get Elapsed: 0.04 ms
El resultado es que, cuando cambia la base de datos, todas las instancias de aplicación conectadas reciben una notificación, lo que les permite actualizar sus cachés.
Con esta explicación fuera del camino, ahora podemos ver la implementación del códigoasyncpg
necesaria para que nuestra invalidación de caché funcione realmente. El diseño básico del código del servidor que se muestra en el Ejemplo 4-23 es el siguiente:
-
Tenemos una API web sencilla que utiliza el nuevomarco web Sanic, compatible con
asyncio
. -
Los datos se almacenarán en una instancia PostgreSQL backend, pero la API se servirá a través de múltiples instancias de los servidores de la app API web.
-
Los servidores de la aplicación almacenarán en caché los datos de la base de datos.
-
Los servidores de aplicaciones se suscribirán a eventos a través de
asyncpg
en tablas específicas de la BD, y recibirán notificaciones de actualización cuando los datos de la tabla de la BD hayan cambiado. Esto permite a los servidores de aplicaciones actualizar sus cachés individuales en memoria.
Ejemplo 4-23. Servidor API con Sanic
# sanic_demo.py
import
argparse
from
sanic
import
Sanic
from
sanic
.
views
import
HTTPMethodView
from
sanic
.
response
import
json
from
util
import
Database
from
perf
import
aelapsed
,
aprofiler
import
model
app
=
Sanic
(
)
@aelapsed
async
def
new_patron
(
request
)
:
data
=
request
.
json
id
=
await
model
.
add_patron
(
app
.
pool
,
data
)
return
json
(
dict
(
msg
=
'
ok
'
,
id
=
id
)
)
class
PatronAPI
(
HTTPMethodView
,
metaclass
=
aprofiler
)
:
async
def
get
(
self
,
request
,
id
)
:
data
=
await
model
.
get_patron
(
app
.
pool
,
id
)
return
json
(
data
)
async
def
put
(
self
,
request
,
id
)
:
data
=
request
.
json
ok
=
await
model
.
update_patron
(
app
.
pool
,
id
,
data
)
return
json
(
dict
(
msg
=
'
ok
'
if
ok
else
'
bad
'
)
)
async
def
delete
(
self
,
request
,
id
)
:
ok
=
await
model
.
delete_patron
(
app
.
pool
,
id
)
return
json
(
dict
(
msg
=
'
ok
'
if
ok
else
'
bad
'
)
)
@app
.
listener
(
'
before_server_start
'
)
async
def
db_connect
(
app
,
loop
)
:
app
.
db
=
Database
(
'
restaurant
'
,
owner
=
False
)
app
.
pool
=
await
app
.
db
.
connect
(
)
await
model
.
create_table_if_missing
(
app
.
pool
)
await
app
.
db
.
add_listener
(
'
chan_patron
'
,
model
.
db_event
)
@app
.
listener
(
'
after_server_stop
'
)
async
def
db_disconnect
(
app
,
loop
)
:
await
app
.
db
.
disconnect
(
)
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--port
'
,
type
=
int
,
default
=
8000
)
args
=
parser
.
parse_args
(
)
app
.
add_route
(
new_patron
,
'
/patron
'
,
methods
=
[
'
POST
'
]
)
app
.
add_route
(
PatronAPI
.
as_view
(
)
,
'
/patron/<id:int>
'
)
app
.
run
(
host
=
"
0.0.0.0
"
,
port
=
args
.
port
)
El ayudante de la utilidad
Database
, como se ha descrito anteriormente. Proporcionará los métodos necesarios para conectar con la base de datos.He improvisado dos herramientas más para registrar el tiempo transcurrido de cada punto final de la API. Lo utilicé en la discusión anterior para detectar cuándo se devolvía un
GET
desde la caché. Las implementaciones deaelapsed()
yaprofiler()
no son importantes para este caso práctico, pero puedes obtenerlas en el Ejemplo B-1.Creamos la instancia principal de la app Sanic.
Esta función coroutine sirve para crear nuevas entradas de patrón. En una llamada a
add_route()
hacia la parte inferior del código,new_patron()
se asocia con el endpoint/patron
, sólo para el método HTTPPOST
. El decorador@aelapsed
no forma parte de la API de Sanic: es una invención mía, simplemente para registrar los tiempos de cada llamada.Sanic proporciona deserialización inmediata de los datos JSON recibidos utilizando el atributo
.json
del objetorequest
.El módulo
model
, que he importado, es el modelo de nuestra tablapatron
en la base de datos. Lo explicaré con más detalle en el siguiente listado de código; por ahora, entiende que todas las consultas a la base de datos y el SQL están en este módulomodel
. Aquí estoy pasando el pool de conexiones para la base de datos, y se utiliza el mismo patrón para toda la interacción con el modelo de la base de datos en esta función y en la clasePatronAPI
más adelante.Se creará una nueva clave primaria,
id
, que se devolverá a la persona que llama como JSON.Mientras que la creación se gestiona en la función
new_patron()
, el resto de interacciones se gestionan en esta vista basada en la clase, que es una comodidad proporcionada por Sanic. Todos los métodos de esta clase están asociados a la misma URL,/patron/<id:int>
, que puedes ver en la funciónadd_route()
cerca de la parte inferior. Ten en cuenta que el parámetro URLid
se pasará a cada uno de los métodos, y este parámetro es necesario para los tres puntos finales.Puedes ignorar sin problemas el argumento
metaclass
: lo único que hace es envolver cada método con el decorador@aelapsed
para que se impriman los tiempos en los registros. De nuevo, esto no forma parte de la API de Sanic; es una invención mía para registrar los datos de temporización.Como antes, la interacción del modelo se realiza dentro del módulo
model
.Si el modelo informa de un fallo al realizar la actualización, modifico los datos de respuesta. He incluido esto para los lectores que aún no hayan visto la versión de Python del operador ternario.
Los decoradores
@app.listener
son ganchos proporcionados por Sanic para darte un lugar donde añadir acciones adicionales durante la secuencia de arranque y apagado. Éste,before_server_start
, se invoca antes de que se inicie el servidor API. Parece un buen lugar para inicializar nuestra conexión a la base de datos.Utiliza el ayudante
Database
para crear una conexión a nuestra instancia PostgreSQL. La base de datos a la que nos conectamos esrestaurant
.Obtener un pool de conexión a nuestra base de datos.
Utiliza nuestro modelo (para la tabla
patron
) para crear la tabla si falta.Utiliza nuestro modelo para crear un escucha_dedicado para los eventos de la base de datos, escuchando en el canal
chan_patron
. La función de llamada de retorno para estos eventos esmodel.db_event()
, que repasaré en el siguiente listado. Se llamará a la llamada de retorno cada vez que la base de datos actualice el canal.after_server_stop
es el gancho para las tareas que deben ocurrir durante la desconexión. Aquí nos desconectamos de la base de datos.Esta llamada a
add_route()
envíaPOST
peticiones de la URL/patron
a la función coroutinenew_patron()
.Esta llamada a
add_route()
envía todas las peticiones de la URL/patron/<id:int>
a la vista basada en la clasePatronAPI
. Los nombres de los métodos de esa clase determinan a cuál se llama: una solicitud HTTPGET
llamará al métodoPatronAPI.get()
, y así sucesivamente.
El código anterior contiene toda la gestión HTTP de nuestro servidor, así como las tareas de arranque y apagado, como la configuración de un grupo de conexiones a la base de datos y, lo que es más importante, la configuración de un receptor db-event
en el canal chan_patron
del servidor de base de datos.
El Ejemplo 4-24 presenta el modelo de la tabla patron
de la base de datos.
Ejemplo 4-24. Modelo de BD para la tabla "mecenas
# model.py
import
logging
from
json
import
loads
,
dumps
from
triggers
import
(
create_notify_trigger
,
add_table_triggers
)
from
boltons
.
cacheutils
import
LRU
logger
=
logging
.
getLogger
(
'
perf
'
)
CREATE_TABLE
=
(
'
CREATE TABLE IF NOT EXISTS patron(
'
'
id serial PRIMARY KEY, name text,
'
'
fav_dish text)
'
)
INSERT
=
(
'
INSERT INTO patron(name, fav_dish)
'
'
VALUES ($1, $2) RETURNING id
'
)
SELECT
=
'
SELECT * FROM patron WHERE id = $1
'
UPDATE
=
'
UPDATE patron SET name=$1, fav_dish=$2 WHERE id=$3
'
DELETE
=
'
DELETE FROM patron WHERE id=$1
'
EXISTS
=
"
SELECT to_regclass(
'
patron
'
)
"
CACHE
=
LRU
(
max_size
=
65536
)
async
def
add_patron
(
conn
,
data
:
dict
)
-
>
int
:
return
await
conn
.
fetchval
(
INSERT
,
data
[
'
name
'
]
,
data
[
'
fav_dish
'
]
)
async
def
update_patron
(
conn
,
id
:
int
,
data
:
dict
)
-
>
bool
:
result
=
await
conn
.
execute
(
UPDATE
,
data
[
'
name
'
]
,
data
[
'
fav_dish
'
]
,
id
)
return
result
==
'
UPDATE 1
'
async
def
delete_patron
(
conn
,
id
:
int
)
:
result
=
await
conn
.
execute
(
DELETE
,
id
)
return
result
==
'
DELETE 1
'
async
def
get_patron
(
conn
,
id
:
int
)
-
>
dict
:
if
id
not
in
CACHE
:
logger
.
info
(
f
'
id=
{id}
Cache miss
'
)
record
=
await
conn
.
fetchrow
(
SELECT
,
id
)
CACHE
[
id
]
=
record
and
dict
(
record
.
items
(
)
)
return
CACHE
[
id
]
def
db_event
(
conn
,
pid
,
channel
,
payload
)
:
event
=
loads
(
payload
)
logger
.
info
(
'
Got DB event:
\n
'
+
dumps
(
event
,
indent
=
4
)
)
id
=
event
[
'
id
'
]
if
event
[
'
type
'
]
==
'
INSERT
'
:
CACHE
[
id
]
=
event
[
'
data
'
]
elif
event
[
'
type
'
]
==
'
UPDATE
'
:
CACHE
[
id
]
=
event
[
'
data
'
]
[
'
new
'
]
elif
event
[
'
type
'
]
==
'
DELETE
'
:
CACHE
[
id
]
=
None
async
def
create_table_if_missing
(
conn
)
:
if
not
await
conn
.
fetchval
(
EXISTS
)
:
await
conn
.
fetchval
(
CREATE_TABLE
)
await
create_notify_trigger
(
conn
,
channel
=
'
chan_patron
'
)
await
add_table_triggers
(
conn
,
table
=
'
patron
'
)
Tienes que añadir disparadores a la base de datos para recibir notificaciones cuando cambien los datos. He creado estos prácticos ayudantes para crear la propia función desencadenante (con
create_notify_trigger
) y para añadir el desencadenante a una tabla concreta (conadd_table_triggers
). El SQL necesario para hacerlo está un poco fuera del alcance de este libro, pero sigue siendo crucial para entender cómo funciona este caso práctico. He incluido el código anotado de estos activadores enel Apéndice B.El paquete de terceros
boltons
proporciona un montón de herramientas útiles, entre las que destaca la cachéLRU
, una opción más versátil que el decorador@lru_cache
del módulo de la biblioteca estándarfunctools
.3Este bloque de texto contiene todo el SQL para las operaciones CRUD estándar. Ten en cuenta que estoy utilizando la sintaxis nativa de PostgreSQL para los parámetros:
$1
,$2
, etc. No hay nada novedoso aquí, y no se discutirá más.Crea la caché para esta instancia de aplicación.
He llamado a esta función desde el módulo Sanic dentro del punto final
new_patron()
para añadir nuevos clientes. Dentro de la función, utilizo el métodofetchval()
para insertar nuevos datos. ¿Por quéfetchval()
y noexecute()
? Porquefetchval()
devuelve la clave primaria del nuevo registro insertado.4Actualiza un registro existente. Cuando esto tenga éxito, PostgreSQL devolverá
UPDATE 1
, así que lo utilizo como comprobación para verificar que la actualización ha tenido éxito.La eliminación es muy similar a la actualización.
Esta es la operación de lectura. Es la única parte de nuestra interfaz CRUD que se preocupa por la caché. Piénsalo un segundo: no actualizamos la caché cuando hacemos una inserción, actualización o eliminación. Esto se debe a que confiamos en la notificación asíncrona de la base de datos (a través de los activadores instalados) para actualizar la caché si se modifica algún dato.
Por supuesto, seguimos queriendo utilizar la caché después de la primera
GET
.La función
db_event()
es la llamada de retorno que haráasyncpg
cuando se produzcan eventos en nuestro canal de notificación de la BD,chan_patron
. Esta lista específica de parámetros es necesaria paraasyncpg
.conn
es la conexión en la que se envió el evento,pid
es el ID del proceso de la instancia de PostgreSQL que envió el evento,channel
es el nombre del canal (que en este caso seráchan_patron
), y la carga útil son los datos que se envían por el canal.Deserializa los datos JSON a un dict.
En general, la población de la caché es bastante sencilla, pero ten en cuenta que los eventos de actualización contienen datos nuevos y antiguos, por lo que tenemos que asegurarnos de almacenar en caché sólo los datos nuevos.
Ésta es una pequeña función utilitaria que he creado para volver a crear fácilmente una tabla si falta. Es muy útil si tienes que hacerlo con frecuencia, como cuando escribes los ejemplos de código de este libro.
Aquí es también donde se crean los disparadores de notificación de la base de datos y se añaden a nuestra tabla
patron
. Consulta el Ejemplo B-1para ver una lista anotada de estas funciones.
Esto nos lleva al final de este caso práctico. Hemos visto cómo Sanic hace que sea muy sencillo crear un servidor API, y hemos visto cómo utilizar asyncpg
para realizar consultas a través de un grupo de conexiones, y cómo utilizar las funciones de notificación asíncrona de PostgreSQL para recibir llamadas de retorno a través de una conexión de base de datos dedicada y de larga duración.
Mucha gente de prefiere utilizar mapeadores objeto-relacionales para trabajar con bases de datos, y en este campo, SQLAlchemyes el líder. Cada vez hay más apoyo al uso de SQLAlchemy junto con asyncpg
en bibliotecas de terceros como asyncpgsa
y GINO. Otro ORM popular,Peewee, tiene soporte paraasyncio
a través del paquete aiopeewee
paquete.
Otras bibliotecas y recursos
Existen muchas otras bibliotecas para asyncio
que no se tratan en este libro. Para saber más, puedes consultar elproyectoaio-libs
, que gestiona cerca de 40 bibliotecas, y elproyecto Awesome asyncio
, que marca muchos otros proyectos compatibles con el módulo asyncio
.
Una biblioteca que merece una mención especial esaiofiles
. Como recordarás de nuestras discusiones anteriores, dije que para lograr una alta concurrencia en Asyncio, es de vital importancia que el bucle nunca se bloquee. En este contexto, nos hemos centrado exclusivamente en las operaciones bloqueantes de E/S basadas en la red, pero resulta que el acceso al disco también es una operación bloqueante que afectará a tu rendimiento a niveles de concurrencia muy altos. La solución a esto es aiofiles
, que proporciona una cómoda envoltura para realizar el acceso al disco en un hilo. Esto funciona porque Python libera el GIL durante las operaciones de archivo, de modo que tu hilo principal (que ejecuta el bucle asyncio
) no se ve afectado.
El dominio más importante para Asyncio va a ser la programación de redes. Por esta razón, no es mala idea aprender un poco sobre programación de sockets, e incluso después de todos estos años, el"Socket Programming HOWTO" de Gordon McMillan, incluido con la documentación estándar de Python, es una de las mejores introducciones que encontrarás.
Aprendí Asyncio de una gran variedad de fuentes, muchas de las cuales ya se han mencionado en secciones anteriores. Cada persona aprende de forma diferente, por lo que merece la pena explorar distintos tipos de materiales de aprendizaje. Aquí tienes algunos otros que me resultaron útiles:
-
Charla de Robert Smallshire "Get to Grips with Asyncio in Python 3", presentada en el NDC de Londres en enero de 2017. Es, con diferencia, el mejor vídeo de YouTube sobre Asyncio que he encontrado. Puede que la charla sea algo avanzada para un principiante, pero realmente ofrece una descripción clara de cómo está diseñado Asyncio.
-
Diapositivas "Construir aplicaciones con Asyncio" de Nikolay Novik, presentadas en la PyCon UA 2016. La información es densa, pero en estas diapositivas se recoge mucha experiencia práctica.
-
Interminables sesiones en el REPL de Python, probando cosas y "viendo qué pasa".
Te animo a que sigas aprendiendo, y si un concepto no se te queda claro, sigue buscando nuevas fuentes hasta que encuentres una explicación que te sirva.
1 En realidad, puedes hacerlo siempre que los sockets que se utilicen en hilos diferentes se creen, utilicen y destruyan totalmente en sus propios hilos. Es posible, pero difícil de hacer, y mucha gente tiene dificultades para conseguirlo. Por eso es tan fuerte la recomendación de utilizar un único hilo y un mecanismo de sondeo.
2 La receta de este plato, y las recetas de otros platos a base de Spam, se pueden encontrar en el sitio web de UKTV.
3 Obtener boltons
con pip install boltons
.
4 ¡Pero también necesitas la parte RETURNING id
del SQL!
Get Utilizar Asyncio en Python now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.