Kapitel 4. 20 Asyncio-Bibliotheken, die du nicht verwendest (aber... Ach, egal)
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
In diesem Kapitel sehen wir uns Fallstudien an, in denen wir die neuen Python-Funktionen für die asynchrone Programmierung nutzen. Wir werden verschiedene Bibliotheken von Drittanbietern verwenden, wie du es auch in deinen eigenen Projekten tun wirst.
Der Titel dieses Kapitels ist eine Anspielung auf den Titel meines früheren Buches20 Python Libraries You Aren't Using (But Should) (O'Reilly). Viele dieser Bibliotheken werden auch in deinen asyncio
-basierten Anwendungen nützlich sein, aber dieses Kapitel konzentriert sich auf Bibliotheken, die speziell für die neuen asynchronen Funktionen in Python entwickelt wurden.
Es ist schwierig, asyncio
-basierten Code in kurzen Schnipseln zu präsentieren. Wie du in den bisherigen Codebeispielen in diesem Buch gesehen hast, habe ich versucht, aus jedem Beispiel ein vollständiges, lauffähiges Programm zu machen, denn die Verwaltung der Anwendungslebensdauer ist ein zentraler Aspekt für die richtige Anwendung der asynchronen Programmierung.
Aus diesem Grund sind die meisten Fallstudien in diesem Kapitel etwas umfangreicher, was die Anzahl der Codezeilen angeht, als es für ein solches Buch üblich ist. Mit diesem Ansatz möchte ich die Fallstudien nützlicher machen, indem ich dir einen "Gesamtüberblick" über ein asynchrones Programm vermittle, anstatt dich damit zu überlassen, herauszufinden, wie einzelne Fragmente zusammenpassen könnten.
Hinweis
Einige der Codebeispiele in diesem Kapitel gehen Kompromisse beim Stil ein, um Platz zu sparen. Ich mag PEP8 genauso sehr wie jeder andere Pythonista, aber Praktikabilität schlägt Reinheit!
Streams (Standardbibliothek)
Bevor wir uns die Bibliotheken von Drittanbietern ansehen, beginnen wir mit der Standardbibliothek. Die Streams-APIist die High-Level-Schnittstelle, die für die asynchrone Socket-Programmierung angeboten wird, und wie die folgende Fallstudie zeigen wird, ist sie ziemlich einfach zu benutzen. Das Anwendungsdesign bleibt jedoch komplex, weil es sich um eine Domäne handelt.
Die folgende Fallstudie zeigt die Implementierung eines Nachrichtenmaklers mit einem anfänglich naiven Design, gefolgt von einem durchdachteren Design. Keine der beiden Implementierungen sollte als produktionsreif angesehen werden. Mein Ziel ist es, dir zu helfen, über die verschiedenen Aspekte der gleichzeitigen Netzwerkprogrammierung nachzudenken, die beim Entwurf solcher Anwendungen berücksichtigt werden müssen.
Fallstudie: Eine Nachrichten-Warteschlange
Ein Message-Queue-Dienst ist eine Backend-Anwendung, die Verbindungen von anderen Anwendungen empfängt und Nachrichten zwischen diesen verbundenen Diensten weiterleitet, die oft als Publisher und Subscriber bezeichnet werden. Abonnenten hören in der Regel bestimmte Kanäle ab, um Nachrichten zu erhalten, und in der Regel ist es möglich, die Nachrichtenverteilung in verschiedenen Kanälen auf zwei Arten zu konfigurieren: Nachrichten können an alle Abonnenten eines Kanals verteilt werden(pub-sub), oder eine andere Nachricht kann an jeden Abonnenten einzeln gehen(point-to-point).
Vor kurzem habe ich an einem Projekt gearbeitet, bei demActiveMQ als Message Broker für die Kommunikation zwischen Microservices eingesetzt wurde. Auf einer grundlegenden Ebene ist ein solcher Broker (Server):
-
Unterhält dauerhafte Socket-Verbindungen zu mehreren Clients
-
Empfängt Nachrichten von Clients mit einem Zielkanalnamen
-
Leitet diese Nachrichten an alle anderen Clients weiter, die denselben Kanalnamen abonniert haben
Ich erinnere mich, dass ich mich gefragt habe, wie schwer es sein könnte, eine solche Anwendung zu erstellen. Außerdem kann ActiveMQ beide Modelle der Nachrichtenverteilung durchführen, und die beiden Modelle werden in der Regel durch den Kanalnamen unterschieden:
-
Kanalnamen mit dem Präfix
/topic
(z. B./topic/customer/registration
) werden nach dem Muster pub-subverwaltet, wobei alle Kanalabonnenten alle Nachrichten erhalten. -
Kanalnamen mit dem Präfix
/queue
werden mit demPunkt-zu-Punkt-Modellgehandhabt, bei dem Nachrichten auf einem Kanal zwischen den Kanalteilnehmern in einem Rundlaufverfahren verteilt werden: Jeder Teilnehmer erhält eine eindeutige Nachricht.
In unserer Fallstudie werden wir einen kleinen Message Broker mit diesen grundlegenden Funktionen bauen. Das erste Problem, das wir angehen müssen ist, dass TCP kein nachrichtenbasiertes Protokoll ist: Wir bekommen nur Byteströme auf dem Draht. Wir müssen ein eigenes Protokoll für die Struktur der Nachrichten erstellen. Das einfachste Protokoll besteht darin, jeder Nachricht einen Header mit einer bestimmten Größe voranzustellen, gefolgt von einer Nutzlast in dieser Größe. Die Hilfsbibliothek in Beispiel 4-1 bietet Lese- und Schreibfunktionen für solche Nachrichten.
Beispiel 4-1. Nachrichtenprotokoll: Lesen und Schreiben
# msgproto.py
from
asyncio
import
StreamReader
,
StreamWriter
async
def
read_msg
(
stream
:
StreamReader
)
-
>
bytes
:
size_bytes
=
await
stream
.
readexactly
(
4
)
size
=
int
.
from_bytes
(
size_bytes
,
byteorder
=
'
big
'
)
data
=
await
stream
.
readexactly
(
size
)
return
data
async
def
send_msg
(
stream
:
StreamWriter
,
data
:
bytes
)
:
size_bytes
=
len
(
data
)
.
to_bytes
(
4
,
byteorder
=
'
big
'
)
stream
.
writelines
(
[
size_bytes
,
data
]
)
await
stream
.
drain
(
)
Erhalte die ersten 4 Bytes. Dies ist das Größenpräfix.
Diese 4 Bytes müssen in eine Ganzzahl umgewandelt werden.
Jetzt kennen wir die Größe der Nutzlast, also lesen wir sie aus dem Stream.
Schreiben ist die Umkehrung von Lesen: Zuerst senden wir die Länge der Daten, kodiert als 4 Bytes, und danach die Daten.
Da wir nun ein rudimentäres Nachrichtenprotokoll haben, können wir uns auf die Message Broker Anwendung in Beispiel 4-2 konzentrieren.
Beispiel 4-2. Ein 40-Zeilen-Prototyp-Server
# mq_server.py
import
asyncio
from
asyncio
import
StreamReader
,
StreamWriter
,
gather
from
collections
import
deque
,
defaultdict
from
typing
import
Deque
,
DefaultDict
from
msgproto
import
read_msg
,
send_msg
SUBSCRIBERS
:
DefaultDict
[
bytes
,
Deque
]
=
defaultdict
(
deque
)
async
def
client
(
reader
:
StreamReader
,
writer
:
StreamWriter
)
:
peername
=
writer
.
get_extra_info
(
'
peername
'
)
subscribe_chan
=
await
read_msg
(
reader
)
SUBSCRIBERS
[
subscribe_chan
]
.
append
(
writer
)
(
f
'
Remote
{peername}
subscribed to
{subscribe_chan}
'
)
try
:
while
channel_name
:
=
await
read_msg
(
reader
)
:
data
=
await
read_msg
(
reader
)
(
f
'
Sending to
{channel_name}
:
{data[:19]}
...
'
)
conns
=
SUBSCRIBERS
[
channel_name
]
if
conns
and
channel_name
.
startswith
(
b
'
/queue
'
)
:
conns
.
rotate
(
)
conns
=
[
conns
[
0
]
]
await
gather
(
*
[
send_msg
(
c
,
data
)
for
c
in
conns
]
)
except
asyncio
.
CancelledError
:
(
f
'
Remote
{peername}
closing connection.
'
)
writer
.
close
(
)
await
writer
.
wait_closed
(
)
except
asyncio
.
IncompleteReadError
:
(
f
'
Remote
{peername}
disconnected
'
)
finally
:
(
f
'
Remote
{peername}
closed
'
)
SUBSCRIBERS
[
subscribe_chan
]
.
remove
(
writer
)
async
def
main
(
*
args
,
*
*
kwargs
)
:
server
=
await
asyncio
.
start_server
(
*
args
,
*
*
kwargs
)
async
with
server
:
await
server
.
serve_forever
(
)
try
:
asyncio
.
run
(
main
(
client
,
host
=
'
127.0.0.1
'
,
port
=
25000
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
Importiert von unserem Modul msgproto.py.
Eine globale Sammlung von derzeit aktiven Abonnenten. Jedes Mal, wenn sich ein Kunde verbindet, muss er zuerst den Namen des Kanals senden, den er abonniert. Eine Deque enthält alle Abonnenten für einen bestimmten Kanal.
Die Coroutine-Funktion
client()
erzeugt für jede neue Verbindung eine langlebige Coroutine. Sie ist sozusagen ein Callback für den TCP-Server, der inmain()
gestartet wurde. In dieser Zeile habe ich gezeigt, wie der Host und der Port der Gegenstelle ermittelt werden können, zum Beispiel für die Protokollierung.Unser Protokoll für Kunden sieht wie folgt aus:
-
Beim ersten Verbindungsaufbau muss ein Client eine Nachricht mit dem Kanal senden, den er abonnieren möchte (hier
subscribe_chan
). -
Danach sendet ein Kunde während der gesamten Dauer der Verbindung eine Nachricht an einen Kanal, indem er zunächst eine Nachricht mit dem Namen des Zielkanals und anschließend eine Nachricht mit den Daten sendet. Unser Broker sendet solche Datennachrichten an alle Kunden, die diesen Kanalnamen abonniert haben.
-
Füge die Instanz
StreamWriter
zu der globalen Sammlung von Abonnenten hinzu.Eine Endlosschleife, die auf Daten von diesem Kunden wartet. Die erste Nachricht von einem Client muss der Name des Zielkanals sein.
Als Nächstes kommen die eigentlichen Daten, die an den Kanal verteilt werden sollen.
Ermittelt die Anzahl der Abonnenten des Zielkanals.
Eine Sonderbehandlung, wenn der Kanalname mit dem magischen Wort
/queue
beginnt: In diesem Fall senden wir die Daten nur an einen der Abonnenten, nicht an alle. Dies kann für die Arbeitsteilung zwischen mehreren Arbeitern genutzt werden, anstatt des üblichen Pub-Sub-Benachrichtigungsschemas, bei dem alle Abonnenten eines Kanals alle Nachrichten erhalten.Das ist der Grund, warum wir eine Deque und keine Liste verwenden: Durch die Rotation der Deque behalten wir den Überblick, welcher Kunde als nächster an der Reihe ist, um
/queue
zu verteilen. Das scheint teuer zu sein, bis du merkst, dass eine einzelne Deque-Rotation eine O(1)-Operation ist.Nimm nur den Kunden ins Visier, der an erster Stelle steht; dies ändert sich nach jeder Rotation.
Erstelle eine Liste von Coroutines für das Senden der Nachricht an jeden Writer und packe diese dann in
gather()
aus, damit wir warten können, bis alle Sendungen abgeschlossen sind.Diese Zeile ist ein schlimmer Fehler in unserem Programm, aber es ist vielleicht nicht klar, warum: Es mag zwar stimmen, dass alle Sendungen an jeden Abonnenten gleichzeitig erfolgen, aber was passiert, wenn wir einen sehr langsamen Client haben? In diesem Fall wird die
gather()
erst dann beendet, wenn der langsamste Teilnehmer seine Daten erhalten hat. Wir können keine weiteren Daten von dem sendenden Client empfangen, bis allesend_msg()
Coroutines beendet sind. Dadurch wird die gesamte Nachrichtenverteilung auf die Geschwindigkeit des langsamsten Teilnehmers verlangsamt.Wenn wir die Coroutine
client()
verlassen, müssen wir uns aus der globalenSUBSCRIBERS
Sammlung entfernen. Leider ist dies eine O(n)-Operation, die bei einem sehr großen n etwas teuer werden kann. Eine andere Datenstruktur würde dies beheben, aber im Moment trösten wir uns mit dem Wissen, dass die Verbindungen langlebig sein sollen - es sollte also nur wenige Verbindungsabbrüche geben - und dass n wahrscheinlich nicht sehr groß ist (sagen wir ~10.000 als grobe Schätzung) und dieser Code zumindest leicht zu verstehen ist.
Das ist also unser Server; jetzt brauchen wir noch Clients, damit wir eine Ausgabe zeigen können. Zu Demonstrationszwecken werde ich zwei Arten von Clients erstellen: einenSender und einen Hörer. Der Server macht keinen Unterschied; alle Clients sind gleich. Die Unterscheidung zwischen Absender- und Zuhörerverhalten dient nur zu Lehrzwecken. Beispiel 4-3 zeigt den Code für die Listener-Anwendung.
Beispiel 4-3. Listener: ein Toolkit zum Abhören von Nachrichten auf unserem Message Broker
# mq_client_listen.py
import
asyncio
import
argparse
,
uuid
from
msgproto
import
read_msg
,
send_msg
async
def
main
(
args
)
:
me
=
uuid
.
uuid4
(
)
.
hex
[
:
8
]
(
f
'
Starting up
{me}
'
)
reader
,
writer
=
await
asyncio
.
open_connection
(
args
.
host
,
args
.
port
)
(
f
'
I am
{
writer.get_extra_info(
"
sockname
"
)}
'
)
channel
=
args
.
listen
.
encode
(
)
await
send_msg
(
writer
,
channel
)
try
:
while
data
:
=
await
read_msg
(
reader
)
:
(
f
'
Received by
{me}
:
{data[:20]}
'
)
(
'
Connection ended.
'
)
except
asyncio
.
IncompleteReadError
:
(
'
Server closed.
'
)
finally
:
writer
.
close
(
)
await
writer
.
wait_closed
(
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--host
'
,
default
=
'
localhost
'
)
parser
.
add_argument
(
'
--port
'
,
default
=
25000
)
parser
.
add_argument
(
'
--listen
'
,
default
=
'
/topic/foo
'
)
try
:
asyncio
.
run
(
main
(
parser
.
parse_args
(
)
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
Mit dem Modul
uuid
aus der Standardbibliothek kannst du bequem eine "Identität" für diesen Listener erstellen. Wenn du mehrere Instanzen startest, hat jede ihre eigene Identität und du kannst in den Protokollen nachvollziehen, was passiert.Öffne eine Verbindung zum Server.
Der Kanal, der abonniert werden soll, ist ein Eingabeparameter, der in
args.listen
erfasst wird. Kodiere ihn vor dem Senden in Bytes.Nach unseren Protokollregeln (wie in der Broker-Code-Analyse zuvor besprochen) ist das erste, was nach dem Verbindungsaufbau zu tun ist, den Namen des Kanals zu senden, den du abonnieren willst.
Diese Schleife tut nichts anderes, als darauf zu warten, dass Daten auf dem Socket erscheinen.
Mit den Befehlszeilenargumenten für dieses Programm kannst du ganz einfach einen Host, einen Port und einen Kanalnamen angeben, auf den du hören willst.
Der Code für den anderen Client, das in Beispiel 4-4 gezeigte Absenderprogramm, ist ähnlich aufgebaut wie das Hörermodul.
Beispiel 4-4. Sender: ein Toolkit zum Senden von Daten an unseren Message Broker
# mq_client_sender.py
import
asyncio
import
argparse
,
uuid
from
itertools
import
count
from
msgproto
import
send_msg
async
def
main
(
args
)
:
me
=
uuid
.
uuid4
(
)
.
hex
[
:
8
]
(
f
'
Starting up
{me}
'
)
reader
,
writer
=
await
asyncio
.
open_connection
(
host
=
args
.
host
,
port
=
args
.
port
)
(
f
'
I am
{
writer.get_extra_info(
"
sockname
"
)}
'
)
channel
=
b
'
/null
'
await
send_msg
(
writer
,
channel
)
chan
=
args
.
channel
.
encode
(
)
try
:
for
i
in
count
(
)
:
await
asyncio
.
sleep
(
args
.
interval
)
data
=
b
'
X
'
*
args
.
size
or
f
'
Msg
{i}
from
{me}
'
.
encode
(
)
try
:
await
send_msg
(
writer
,
chan
)
await
send_msg
(
writer
,
data
)
except
OSError
:
(
'
Connection ended.
'
)
break
except
asyncio
.
CancelledError
:
writer
.
close
(
)
await
writer
.
wait_closed
(
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--host
'
,
default
=
'
localhost
'
)
parser
.
add_argument
(
'
--port
'
,
default
=
25000
,
type
=
int
)
parser
.
add_argument
(
'
--channel
'
,
default
=
'
/topic/foo
'
)
parser
.
add_argument
(
'
--interval
'
,
default
=
1
,
type
=
float
)
parser
.
add_argument
(
'
--size
'
,
default
=
0
,
type
=
int
)
try
:
asyncio
.
run
(
main
(
parser
.
parse_args
(
)
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
Wie bei der Zuhörerin oder dem Zuhörer solltest du eine Identität einfordern.
Reiche die Hand und stelle eine Verbindung her.
Laut unseren Protokollregeln ist das erste, was nach der Verbindung mit dem Server zu tun ist, den Namen des Kanals anzugeben, den wir abonnieren wollen. Da wir aber ein Sender sind, ist es uns eigentlich egal, ob wir einen Kanal abonnieren wollen. Trotzdem verlangt das Protokoll dies, also geben wir einfach einen Null-Kanal an, den wir abonnieren wollen (wir werden nicht wirklich auf etwas hören).
Sende den Kanal, den du abonnieren willst.
Der Kommandozeilenparameter
args.channel
gibt den Kanal an, an denwir Nachrichten senden wollen. Er muss vor dem Senden zunächst in Bytes umgewandelt werden.itertools.count()
ist wie einewhile True
Schleife, mit dem Unterschied, dass wir eine Iterationsvariable erhalten, die wir verwenden können. Wir verwenden diese in den Debugging-Meldungen, da es so etwas einfacher ist, nachzuvollziehen, welche Meldung von wo aus gesendet wurde.Die Verzögerung zwischen den gesendeten Nachrichten ist ein Eingabeparameter,
args.interval
. Die nächste Zeile erzeugt die Nutzlast der Nachricht. Es handelt sich entweder um einen Bytestring der angegebenen Größe (args.size
) oder um eine beschreibende Nachricht. Diese Flexibilität ist nur zum Testen gedacht.Beachte, dass hier zwei Nachrichten gesendet werden: Die erste ist der Name des Zielkanals und die zweite ist die Nutzlast.
Wie beim Hörer gibt es auch für den Sender eine Reihe von Kommandozeilenoptionen, mit denen er eingestellt werden kann:
channel
legt den Zielkanal fest, an den gesendet werden soll, währendinterval
die Verzögerung zwischen den Sendungen bestimmt. Der Parametersize
bestimmt die Größe der Nutzlast jeder Nachricht.
Jetzt haben wir einen Broker, einen Listener und einen Sender; es ist Zeit, etwas zu sehen. Um die folgenden Codeschnipsel zu erzeugen, habe ich den Server gestartet, dann zwei Listener und dann einen Sender. Nachdem ein paar Nachrichten verschickt worden waren, habe ich den Server mit Strg-C gestoppt. Die Ausgabe des Servers ist in Beispiel 4-5 zu sehen, die des Senders in Beispiel 4-6 und die des Listeners in Beispiel 4-7 und 4-8.
Beispiel 4-5. Ausgabe des Nachrichtenbrokers (Server)
$ mq_server.py Remote ('127.0.0.1', 55382) subscribed to b'/queue/blah' Remote ('127.0.0.1', 55386) subscribed to b'/queue/blah' Remote ('127.0.0.1', 55390) subscribed to b'/null' Sending to b'/queue/blah': b'Msg 0 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 1 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 2 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 3 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 4 from 6b5a8e1d'... Sending to b'/queue/blah': b'Msg 5 from 6b5a8e1d'... ^CBye! Remote ('127.0.0.1', 55382) closing connection. Remote ('127.0.0.1', 55382) closed Remote ('127.0.0.1', 55390) closing connection. Remote ('127.0.0.1', 55390) closed Remote ('127.0.0.1', 55386) closing connection. Remote ('127.0.0.1', 55386) closed
Beispiel 4-6. Absender (Client) Ausgabe
$ mq_client_sender.py --channel /queue/blah Starting up 6b5a8e1d I am ('127.0.0.1', 55390) Connection ended.
Beispiel 4-7. Ausgabe von Listener 1 (Client)
$ mq_client_listen.py --listen /queue/blah Starting up 9ae04690 I am ('127.0.0.1', 55382) Received by 9ae04690: b'Msg 1 from 6b5a8e1d' Received by 9ae04690: b'Msg 3 from 6b5a8e1d' Received by 9ae04690: b'Msg 5 from 6b5a8e1d' Server closed.
Beispiel 4-8. Ausgabe von Listener 2 (Client)
$ mq_client_listen.py --listen /queue/blah Starting up bd4e3baa I am ('127.0.0.1', 55386) Received by bd4e3baa: b'Msg 0 from 6b5a8e1d' Received by bd4e3baa: b'Msg 2 from 6b5a8e1d' Received by bd4e3baa: b'Msg 4 from 6b5a8e1d' Server closed.
Unser Spielzeug-Messenger funktioniert. Der Code ist auch ziemlich einfach zu verstehen, wenn man bedenkt, dass es sich um eine so komplexe Problemdomäne handelt, aber leider ist das Design des Broker-Codes selbst problematisch.
Das Problem ist, dass wir für einen bestimmten Client Nachrichten an die Abonnenten in der gleichen Coroutine senden, in der auch neue Nachrichten empfangen werden. Das bedeutet, dass es sehr lange dauern kann, bis die Zeile await gather(...)
in Beispiel 4-2 abgeschlossen ist, wenn ein Abonnent die gesendeten Nachrichten nur langsam konsumiert, und wir in dieser Zeit keine weiteren Nachrichten empfangen und verarbeiten können.
Stattdessen müssen wir den Empfang von Nachrichten vom Senden von Nachrichten entkoppeln. In der nächsten Fallstudie überarbeiten wir unseren Code, um genau das zu tun.
Fallstudie: Verbesserung der Nachrichtenwarteschlange
In dieser Fallstudie verbessern wir das Design unseres Spielzeug-Nachrichtenbrokers. Die Programme für die Zuhörer und Absender bleiben unverändert. Die besondere Verbesserung des neuen Broker-Designs besteht darin, das Senden und Empfangen von Nachrichten zu entkoppeln. Damit wird das Problem gelöst, dass ein langsamer Abonnent auch den Empfang neuer Nachrichten verlangsamt, wie im vorherigen Abschnitt beschrieben. Der neue Code, der in Beispiel 4-9 gezeigt wird, ist zwar etwas länger, aber nicht sehr viel länger.
Beispiel 4-9. Nachrichtenbroker: verbessertes Design
# mq_server_plus.py
import
asyncio
from
asyncio
import
StreamReader
,
StreamWriter
,
Queue
from
collections
import
deque
,
defaultdict
from
contextlib
import
suppress
from
typing
import
Deque
,
DefaultDict
,
Dict
from
msgproto
import
read_msg
,
send_msg
SUBSCRIBERS
:
DefaultDict
[
bytes
,
Deque
]
=
defaultdict
(
deque
)
SEND_QUEUES
:
DefaultDict
[
StreamWriter
,
Queue
]
=
defaultdict
(
Queue
)
CHAN_QUEUES
:
Dict
[
bytes
,
Queue
]
=
{
}
async
def
client
(
reader
:
StreamReader
,
writer
:
StreamWriter
)
:
peername
=
writer
.
get_extra_info
(
'
peername
'
)
subscribe_chan
=
await
read_msg
(
reader
)
SUBSCRIBERS
[
subscribe_chan
]
.
append
(
writer
)
send_task
=
asyncio
.
create_task
(
send_client
(
writer
,
SEND_QUEUES
[
writer
]
)
)
(
f
'
Remote
{peername}
subscribed to
{subscribe_chan}
'
)
try
:
while
channel_name
:
=
await
read_msg
(
reader
)
:
data
=
await
read_msg
(
reader
)
if
channel_name
not
in
CHAN_QUEUES
:
CHAN_QUEUES
[
channel_name
]
=
Queue
(
maxsize
=
10
)
asyncio
.
create_task
(
chan_sender
(
channel_name
)
)
await
CHAN_QUEUES
[
channel_name
]
.
put
(
data
)
except
asyncio
.
CancelledError
:
(
f
'
Remote
{peername}
connection cancelled.
'
)
except
asyncio
.
IncompleteReadError
:
(
f
'
Remote
{peername}
disconnected
'
)
finally
:
(
f
'
Remote
{peername}
closed
'
)
await
SEND_QUEUES
[
writer
]
.
put
(
None
)
await
send_task
del
SEND_QUEUES
[
writer
]
SUBSCRIBERS
[
subscribe_chan
]
.
remove
(
writer
)
async
def
send_client
(
writer
:
StreamWriter
,
queue
:
Queue
)
:
while
True
:
try
:
data
=
await
queue
.
get
(
)
except
asyncio
.
CancelledError
:
continue
if
not
data
:
break
try
:
await
send_msg
(
writer
,
data
)
except
asyncio
.
CancelledError
:
await
send_msg
(
writer
,
data
)
writer
.
close
(
)
await
writer
.
wait_closed
(
)
async
def
chan_sender
(
name
:
bytes
)
:
with
suppress
(
asyncio
.
CancelledError
)
:
while
True
:
writers
=
SUBSCRIBERS
[
name
]
if
not
writers
:
await
asyncio
.
sleep
(
1
)
continue
if
name
.
startswith
(
b
'
/queue
'
)
:
writers
.
rotate
(
)
writers
=
[
writers
[
0
]
]
if
not
(
msg
:
=
await
CHAN_QUEUES
[
name
]
.
get
(
)
)
:
break
for
writer
in
writers
:
if
not
SEND_QUEUES
[
writer
]
.
full
(
)
:
(
f
'
Sending to
{name}
:
{msg[:19]}
...
'
)
await
SEND_QUEUES
[
writer
]
.
put
(
msg
)
async
def
main
(
*
args
,
*
*
kwargs
)
:
server
=
await
asyncio
.
start_server
(
*
args
,
*
*
kwargs
)
async
with
server
:
await
server
.
serve_forever
(
)
try
:
asyncio
.
run
(
main
(
client
,
host
=
'
127.0.0.1
'
,
port
=
25000
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
In der vorherigen Implementierung gab es nur
SUBSCRIBERS
; jetzt gibt esSEND_QUEUES
undCHAN_QUEUES
als globale Sammlungen. Dies ist eine Folge der vollständigen Entkopplung von Datenempfangund -versand.SEND_QUEUES
hat einen Warteschlangeneintrag für jede Client-Verbindung: Alle Daten, die an diesen Client gesendet werden müssen, müssen in diese Warteschlange gestellt werden. (Wenn du einen Blick in die Zukunft wirfst, holt die Coroutinesend_client()
die Daten vonSEND_QUEUES
und sendet sie.)Bis zu diesem Punkt in der
client()
Coroutine-Funktion ist der Code derselbe wie beim einfachen Server: Der Name des abonnierten Kanals wird empfangen, und wir fügen dieStreamWriter
Instanz für den neuen Client zur globalenSUBSCRIBERS
Sammlung hinzu.Das ist neu: Wir erstellen eine langlebige Aufgabe, die alle Daten an diesen Client sendet. Die Aufgabe wird unabhängig als eigene Coroutine ausgeführt und holt sich die Nachrichten aus der bereitgestellten Warteschlange
SEND_QUEUES[writer]
, um sie zu versenden.Jetzt befinden wir uns in der Schleife, in der wir die Daten empfangen. Erinnere dich daran, dass wir immer zwei Nachrichten erhalten: eine für den Namen des Zielkanals und eine für die Daten. Wir erstellen eine neue, dedizierte
Queue
für jeden Zielkanal. Dafür istCHAN_QUEUES
da: Wenn ein Client Daten in einen Kanal pushen will, stellen wir diese Daten in die entsprechende Warteschlange und hören dann sofort wieder auf weitere Daten. Dieser Ansatz entkoppelt das Verteilen von Nachrichten vom Empfangen von Nachrichten von diesem Client.Wenn es noch keine Warteschlange für den Zielkanal gibt, erstelle eine.
Erstelle eine spezielle und langlebige Aufgabe für diesen Kanal. Die Coroutine
chan_sender()
ist dafür zuständig, Daten aus der Warteschlange des Kanals zu nehmen und sie an die Abonnenten zu verteilen.Platziere die neu empfangenen Daten in der Warteschlange des jeweiligen Kanals. Wenn sich die Warteschlange füllt, warten wir hier, bis Platz für die neuen Daten ist. Hier zu warten bedeutet, dass wir keine neuen Daten aus dem Socket lesen, was bedeutet, dass der Client darauf warten muss, neue Daten in den Socket zu senden. Das ist nicht unbedingt etwas Schlechtes, da es dem Client den so genannten Gegendruck vermittelt (alternativ kannst du hier auch Nachrichten absetzen, wenn der Anwendungsfall das zulässt).
Wenn die Verbindung geschlossen ist, ist es Zeit, aufzuräumen. Die langlebige Aufgabe, die wir für das Senden von Daten an diesen Kunden erstellt haben,
send_task
, kann beendet werden, indemNone
in die WarteschlangeSEND_QUEUES[writer]
gestellt wird (schau dir den Code fürsend_client()
an). Es ist wichtig, einen Wert für die Warteschlange zu verwenden, anstatt sie einfach abzubrechen, denn es kann sein, dass sich bereits Daten in der Warteschlange befinden und wir wollen, dass diese Daten versendet werden, bevorsend_client()
beendet wird.Warte, bis die Absenderaufgabe beendet ist...
...dann entfernen wir den Eintrag in der Sammlung
SEND_QUEUES
(und in der nächsten Zeile entfernen wir auchsock
aus der SammlungSUBSCRIBERS
wie zuvor).Die Coroutine-Funktion
send_client()
ist fast schon ein Lehrbuchbeispiel dafür, wie man Arbeit aus einer Warteschlange abzieht. Beachte, dass die Coroutine nur beendet wird, wennNone
in die Warteschlange gestellt wird. Beachte auch, wie wirCancelledError
innerhalbder Schleife unterdrücken: Das liegt daran, dass wir diese Aufgabe nur dann beenden wollen, wenn wir einNone
in der Warteschlange erhalten. Auf diese Weise können alle anstehenden Daten in der Warteschlange vor dem Beenden gesendet werden.chan_sender()
ist die Verteilungslogik für einen Kanal: Sie sendet Daten von einer speziellen Instanz des KanalsQueue
an alle Abonnenten dieses Kanals. Aber was passiert, wenn es noch keine Abonnenten für diesen Kanal gibt? Wir warten einfach ein bisschen und versuchen es dann erneut. (Beachte aber, dass sich die Warteschlange für diesen Kanal,CHAN_QUEUES[name]
, weiter füllt).Wie in unserer vorherigen Broker-Implementierung machen wir etwas Besonderes für Kanäle, deren Name mit
/queue
beginnt: Wir rotieren die Warteschlange und senden nur an den ersten Eintrag. Dies wirkt wie ein grobes Lastausgleichssystem, da jeder Abonnent unterschiedliche Nachrichten aus derselben Warteschlange erhält. Bei allen anderen Kanälen erhalten alle Abonnenten alle Nachrichten.Wir warten hier auf Daten in der Warteschlange und beenden den Vorgang, wenn
None
empfangen wird. Zurzeit wird dies nirgendwo ausgelöst (so dass diesechan_sender()
Koroutinen ewig leben), aber wenn eine Logik hinzugefügt würde, um diese Channel-Tasks nach einer gewissen Zeit der Inaktivität aufzuräumen, würde es so gemacht werden.Die Daten wurden empfangen, also ist es an der Zeit, sie an die Abonnenten zu senden. Hier wird nicht gesendet, sondern die Daten werden in die eigene Sendewarteschlange eines jeden Teilnehmers gestellt. Diese Entkopplung ist notwendig, um sicherzustellen, dass ein langsamer Abonnent nicht alle anderen, die Daten empfangen, ausbremst. Und wenn der Abonnent so langsam ist, dass sich seine Warteschlange füllt, werden die Daten nicht in seine Warteschlange gestellt, d. h. sie gehen verloren.
Das vorangegangene Design erzeugt die gleiche Ausgabe wie die frühere, vereinfachte Implementierung, aber jetzt können wir sicher sein, dass ein langsamer Listener die Nachrichtenverteilung an andere Listener nicht stört.
Diese beiden Fallstudien zeigen, wie sich die Überlegungen zum Entwurf eines Nachrichtenverteilungssystems weiterentwickelt haben. Ein wichtiger Aspekt war die Erkenntnis, dass das Senden und Empfangen von Daten je nach Anwendungsfall am besten in separaten Coroutines abgewickelt werden sollte. In solchen Fällen können Warteschlangen sehr nützlich sein, um Daten zwischen den verschiedenen Coroutines zu verschieben und sie durch Pufferung zu entkoppeln.
Das wichtigere Ziel dieser Fallstudien war es, zu zeigen, wie die Streams-API in asyncio
es sehr einfach macht, Socket-basierte Anwendungen zu erstellen.
Verdreht
Das Twisted-Projekt ist ein Vorläufer derasyncio
Standardbibliothek und steht seit etwa 14 Jahren für die asynchrone Programmierung in Python. Das Projekt stellt nicht nur die grundlegenden Bausteine wie eine Ereignisschleife zur Verfügung, sondern auch Primitive wie Deferreds, die ein bisschen wie die Futures in asyncio
sind. Das Design von asyncio
wurde stark von Twisted und der umfangreichen Erfahrung seiner Leiter und Betreuer beeinflusst.
Beachte, dass asyncio
Twisted nicht ersetzt. Twisted enthält hochwertige Implementierungen einer großen Anzahl von Internetprotokollen, darunter nicht nur das übliche HTTP, sondern auch XMPP, NNTP, IMAP, SSH, IRC und FTP (sowohl Server als auch Clients). Und die Liste geht noch weiter: DNS? Check. SMTP? Richtig. POP3? Erledigt. Die Verfügbarkeit dieser exzellenten Internetprotokoll-Implementierungen macht Twisted so überzeugend.
Auf der Code-Ebene ist der wichtigste Unterschied zwischen Twisted undasyncio
, abgesehen von der Geschichte und dem historischen Kontext, dass Python lange Zeit keine Sprachunterstützung für Coroutines hatte, was bedeutete, dass Twisted und ähnliche Projekte Wege finden mussten, mit Asynchronität umzugehen, die mit der Standard-Python-Syntax funktionieren.
Die meiste Zeit in der Geschichte von Twisted waren Callbacks das Mittel, mit dem die asynchrone Programmierung durchgeführt wurde, mit all der nichtlinearen Komplexität, die das mit sich bringt. Als es jedoch möglich wurde, Generatoren als behelfsmäßige Coroutines zu verwenden, wurde es plötzlich möglich, den Code in Twisted auf lineare Weise zu gestalten, indem man seinen @defer.inlineCallbacks
Dekorator verwendet, wie in Beispiel 4-10 gezeigt.
Beispiel 4-10. Noch mehr Twisted mit inlined callbacks
@defer
.
inlineCallbacks
def
f
(
)
:
yield
defer
.
returnValue
(
123
)
@defer
.
inlineCallbacks
def
my_coro_func
(
)
:
value
=
yield
f
(
)
assert
value
==
123
Normalerweise müssen in Twisted Instanzen von
Deferred
erstellt und diesen Instanzen Rückrufe hinzugefügt werden, um asynchrone Programme zu erstellen. Vor ein paar Jahren wurde der@inlineCallbacks
Dekorator hinzugefügt, der Generatoren als Coroutines umfunktioniert.Während
@inlineCallbacks
es dir ermöglichte, linearen Code zu schreiben (im Gegensatz zu Callbacks), waren einige Hacks erforderlich, wie zum Beispiel dieser Aufruf vondefer.returnValue()
, der ist, wie du Werte aus@inlineCallbacks
Coroutines zurückgibt.Hier können wir die
yield
sehen, die diese Funktion zu einem Generator macht. Damit@inlineCallbacks
funktioniert, muss mindestens einyield
in der Funktion vorhanden sein, die dekoriert wird.
Seit dem Erscheinen der nativen Coroutines in Python 3.5 hat das Twisted-Team (und insbesondere Amber Brown ) daran gearbeitet, die Unterstützung für die Ausführung von Twisted in der asyncio
Ereignisschleife hinzuzufügen.
Ich möchte dich in diesem Abschnitt nicht davon überzeugen, alle deine Anwendungen als Twisted-asyncio
-Hybride zu erstellen, sondern dich darauf aufmerksam machen, dass derzeit an einer umfassenden Interoperabilität zwischen den beiden Systemen gearbeitet wird.
Für diejenigen unter euch, die Erfahrung mit Twisted haben, könnte Beispiel 4-11verwirrend sein.
Beispiel 4-11. Unterstützung für asyncio in Twisted
# twisted_asyncio.py
from
time
import
ctime
from
twisted
.
internet
import
asyncioreactor
asyncioreactor
.
install
(
)
from
twisted
.
internet
import
reactor
,
defer
,
task
async
def
main
(
)
:
for
i
in
range
(
5
)
:
(
f
'
{
ctime()} Hello
{i}
'
)
await
task
.
deferLater
(
reactor
,
1
,
lambda
:
None
)
defer
.
ensureDeferred
(
main
(
)
)
reactor
.
run
(
)
So sagst du Twisted, dass es die
asyncio
Ereignisschleife als Hauptreactor
verwenden soll. Beachte, dass diese Zeile vor demreactor
kommen muss, das vontwisted.internet
in der folgenden Zeile.Jeder, der mit der Twisted-Programmierung vertraut ist, wird diese Importe wiedererkennen. Wir haben hier nicht genug Platz, um sie ausführlich zu behandeln, aber kurz gesagt ist
reactor
dieTwisted
Version derasyncio
Schleife, unddefer
undtask
sind Namensräume für Werkzeuge, die mit Zeitplannungsprogrammen arbeiten.async def
hier in einem Twisted-Programm zu sehen, sieht seltsam aus, aber das ist tatsächlich das, was uns die neue Unterstützung fürasync/await
gibt: die Möglichkeit, native Coroutines direkt in Twisted-Programmen zu verwenden.In der älteren
@inlineCallbacks
Welt hättest du hieryield from
verwendet, aber jetzt können wirawait
verwenden, genau wie imasyncio
Code. Der andere Teil dieser Zeile,deferLater()
, ist eine alternative Möglichkeit, das Gleiche zu tun wieasyncio.sleep(1)
. Wirawait
eine Zukunft, in der nach einer Sekunde ein "do-nothing"-Callback ausgelöst wird.ensureDeferred()
ist eine Twisted-Version der Zeitplanung einer Coroutine. Dies wäre vergleichbar mitloop.create_task()
oderasyncio.ensure_future()
.Das Ausführen der
reactor
ist dasselbe wieloop.run_forever()
inasyncio
.
Wenn du dieses Skript ausführst, erhältst du die folgende Ausgabe:
$ twisted_asyncio.py Mon Oct 16 16:19:49 2019 Hello 0 Mon Oct 16 16:19:50 2019 Hello 1 Mon Oct 16 16:19:51 2019 Hello 2 Mon Oct 16 16:19:52 2019 Hello 3 Mon Oct 16 16:19:53 2019 Hello 4
Es gibt noch viel mehr über Twisted zu lernen. Vor allem lohnt es sich, die Liste der implementierten Netzwerkprotokolle durchzugehen. Es gibt noch einiges zu tun, aber die Zukunft sieht für die Zusammenarbeit zwischen Twisted und asyncio
sehr rosig aus.
asyncio
wurde so konzipiert, dass wir uns auf eine Zukunft freuen können, in der es möglich sein wird, Code aus vielen asynchronen Frameworks wie Twisted und Tornado in eine einzige Anwendung einzubinden, wobei der gesamte Code in derselben Ereignisschleife läuft.
Die Janus-Warteschlange
Die Janus-Warteschlange (installiert mit pip install janus
) bietet eine Lösung für die Kommunikation zwischen Threads und Coroutines. In der Python-Standardbibliothek gibt es zwei Arten von Warteschlangen:
queue.Queue
-
Eine blockierende Warteschlange, die üblicherweise für die Kommunikation und Pufferung zwischen Threads verwendet wird
asyncio.Queue
-
Eine
async
-kompatible Warteschlange, die üblicherweise für die Kommunikation und Pufferung zwischen Coroutines verwendet wird
Leider ist beides nicht für die Kommunikation zwischen Threads und Coroutines geeignet! Hier kommt Janus ins Spiel: Es handelt sich um eine einzige Warteschlange, die beide APIs zur Verfügung stellt, eine blockierende und eine asynchrone.Beispiel 4-12 generiert Daten innerhalb eines Threads, legt diese Daten in einer Warteschlange ab und konsumiert diese Daten dann von einer Coroutine.
Beispiel 4-12. Verbinden von Coroutines und Threads mit einer Janus-Warteschlange
# janus_demo.py
import
asyncio
import
random
import
time
import
janus
async
def
main
(
)
:
loop
=
asyncio
.
get_running_loop
(
)
queue
=
janus
.
Queue
(
loop
=
loop
)
future
=
loop
.
run_in_executor
(
None
,
data_source
,
queue
)
while
(
data
:
=
await
queue
.
async_q
.
get
(
)
)
is
not
None
:
(
f
'
Got
{data}
off queue
'
)
(
'
Done.
'
)
def
data_source
(
queue
)
:
for
i
in
range
(
10
)
:
r
=
random
.
randint
(
0
,
4
)
time
.
sleep
(
r
)
queue
.
sync_q
.
put
(
r
)
queue
.
sync_q
.
put
(
None
)
asyncio
.
run
(
main
(
)
)
Erstelle eine Janus-Warteschlange. Beachte, dass die Janus-Warteschlange genau wie eine
asyncio.Queue
mit einer bestimmten Ereignisschleife verknüpft wird. Wenn du den Parameterloop
nicht angibst, wird wie üblich der Standardaufrufget_event_loop()
intern verwendet.Unsere
main()
Coroutine-Funktion wartet einfach auf Daten in einer Warteschlange. Diese Zeile hält an, bis Daten vorhanden sind, genau wie der Aufruf vonget()
auf einerasyncio.Queue
Instanz. Das Warteschlangenobjekt hat zwei Gesichter: Dieses heißtasync_q
und bietet die async-kompatible Warteschlangen-API.Drucke eine Nachricht.
In der Funktion
data_source()
wird eine zufälligeint
generiert, die sowohl als Schlafdauer als auch als Datenwert verwendet wird. Beachte, dass der Aufruf vontime.sleep()
blockierend ist, daher muss diese Funktion in einem Thread ausgeführt werden.Lege die Daten in die Janus-Warteschlange. Dies zeigt die andereSeite der Janus-Warteschlange:
sync_q
Die Janus-Warteschlange, die die standardmäßige, blockierendeQueue
API bereitstellt.
Hier ist die Ausgabe:
$ <name> Got 2 off queue Got 4 off queue Got 4 off queue Got 2 off queue Got 3 off queue Got 4 off queue Got 1 off queue Got 1 off queue Got 0 off queue Got 4 off queue Done.
Wenn es möglich ist, ist es besser, kurze Aufträge zu haben, und in diesen Fällen ist eine Warteschlange (für die Kommunikation) nicht notwendig. Das ist jedoch nicht immer möglich, und in solchen Situationen kann die Janus-Warteschlange die bequemste Lösung sein, um Daten zwischen Threads und Coroutines zu puffern und zu verteilen.
aiohttp
aiohttp
bringt alles, was mit HTTP zu tun hat, auf asyncio
, einschließlich Unterstützung für HTTP-Clients und -Server sowie WebSocket-Unterstützung. Kommen wir gleich zu den Codebeispielen, angefangen mit der Einfachheit selbst: "Hello World".
Fallstudie: Hallo Welt
Beispiel 4-13 zeigt einen minimalen Webserver mit aiohttp
.
Beispiel 4-13. Minimales aiohttp-Beispiel
from
aiohttp
import
web
async
def
hello
(
request
)
:
return
web
.
Response
(
text
=
"
Hello, world
"
)
app
=
web
.
Application
(
)
app
.
router
.
add_get
(
'
/
'
,
hello
)
web
.
run_app
(
app
,
port
=
8080
)
Beachte, dass in diesem Code keine Schleifen, Tasks oder Futures erwähnt werden: Die Entwickler des Frameworks aiohttp
haben all das vor uns verborgen und eine sehr saubere API hinterlassen. Dies wird bei den meisten Frameworks, die auf asyncio
aufbauen, der Fall sein. Das Framework wurde so konzipiert, dass die Entwickler nur die Teile auswählen können, die sie benötigen, und diese in ihrer bevorzugten API kapseln.
Fallstudie: Scraping der Nachrichten
aiohttp
kann sowohl als Server als auch als Client-Bibliothek verwendet werden, wie die sehr beliebte (aber blockierende!)requests
Bibliothek. Ich wollte aiohttp
anhand eines Beispiels vorstellen, das beide Funktionen beinhaltet.
In dieser Fallstudie implementieren wir eine Website, die hinter den Kulissen Web Scraping betreibt. Die Anwendung wird zwei Nachrichten-Websites scrapen und die Schlagzeilen auf einer Ergebnisseite zusammenfassen. Hier ist die Strategie:
-
Ein Browser-Client stellt eine Web-Anfrage an http://localhost:8080/news.
-
Unser Webserver empfängt die Anfrage und holt dann im Backend die HTML-Daten von mehreren Nachrichten-Websites ab.
-
Die Daten jeder Seite werden nach Schlagzeilen durchsucht.
-
Die Überschriften werden sortiert und in der HTML-Antwort formatiert, die wir an den Browser-Client zurückschicken.
Abbildung 4-1 zeigt die Ausgabe.
Web Scraping ist heutzutage ziemlich schwierig geworden. Wenn du zum Beispielrequests.get('http://edition.cnn.com')
ausprobierst, wirst du feststellen, dass die Antwort nur sehr wenige verwertbare Daten enthält! Es wird immer wichtiger, JavaScript lokal ausführen zu können, um Daten zu erhalten, denn viele Websites verwenden JavaScript, um ihre Inhalte zu laden. Der Prozess, bei dem dieses JavaScript ausgeführt wird, um die endgültige, vollständige HTML-Ausgabe zu erzeugen, wird Rendering genannt.
Für das Rendering von verwenden wir ein nettes Projekt namensSplash, das sich selbst als "JavaScript Rendering Service" bezeichnet. Es kann in einemDocker-Container laufen und bietet eine API für das Rendering anderer Websites. Intern verwendet es eine (JavaScript-fähige) WebKit-Engine, um eine Website vollständig zu laden und zu rendern. Das werden wir nutzen, um Websitedaten zu erhalten. Unser aiohttp
Server, der inBeispiel 4-14 gezeigt wird, ruft diese Splash API auf, um die Seitendaten zu erhalten.
Tipp
Um den Splash-Container zu erhalten und zu starten, führe diese Befehle in deiner Shell aus:
$ docker pull scrapinghub/splash $ docker run --rm -p 8050:8050 scrapinghub/splash
Unser Server-Backend wird die Splash-API unter http://localhost:8050 aufrufen .
Beispiel 4-14. Code für den News Scraper
from
asyncio
import
gather
,
create_task
from
string
import
Template
from
aiohttp
import
web
,
ClientSession
from
bs4
import
BeautifulSoup
async
def
news
(
request
)
:
sites
=
[
(
'
http://edition.cnn.com
'
,
cnn_articles
)
,
(
'
http://www.aljazeera.com
'
,
aljazeera_articles
)
,
]
tasks
=
[
create_task
(
news_fetch
(
*
s
)
)
for
s
in
sites
]
await
gather
(
*
tasks
)
items
=
{
text
:
(
f
'
<div class=
"
box
{kind}
"
>
'
f
'
<span>
'
f
'
<a href=
"
{href}
"
>
{text}
</a>
'
f
'
</span>
'
f
'
</div>
'
)
for
task
in
tasks
for
href
,
text
,
kind
in
task
.
result
(
)
}
content
=
'
'
.
join
(
items
[
x
]
for
x
in
sorted
(
items
)
)
page
=
Template
(
open
(
'
index.html
'
)
.
read
(
)
)
return
web
.
Response
(
body
=
page
.
safe_substitute
(
body
=
content
)
,
content_type
=
'
text/html
'
,
)
async
def
news_fetch
(
url
,
postprocess
)
:
proxy_url
=
(
f
'
http://localhost:8050/render.html?
'
f
'
url=
{url}
&timeout=60&wait=1
'
)
async
with
ClientSession
(
)
as
session
:
async
with
session
.
get
(
proxy_url
)
as
resp
:
data
=
await
resp
.
read
(
)
data
=
data
.
decode
(
'
utf-8
'
)
return
postprocess
(
url
,
data
)
def
cnn_articles
(
url
,
page_data
)
:
soup
=
BeautifulSoup
(
page_data
,
'
lxml
'
)
def
match
(
tag
)
:
return
(
tag
.
text
and
tag
.
has_attr
(
'
href
'
)
and
tag
[
'
href
'
]
.
startswith
(
'
/
'
)
and
tag
[
'
href
'
]
.
endswith
(
'
.html
'
)
and
tag
.
find
(
class_
=
'
cd__headline-text
'
)
)
headlines
=
soup
.
find_all
(
match
)
return
[
(
url
+
hl
[
'
href
'
]
,
hl
.
text
,
'
cnn
'
)
for
hl
in
headlines
]
def
aljazeera_articles
(
url
,
page_data
)
:
soup
=
BeautifulSoup
(
page_data
,
'
lxml
'
)
def
match
(
tag
)
:
return
(
tag
.
text
and
tag
.
has_attr
(
'
href
'
)
and
tag
[
'
href
'
]
.
startswith
(
'
/news
'
)
and
tag
[
'
href
'
]
.
endswith
(
'
.html
'
)
)
headlines
=
soup
.
find_all
(
match
)
return
[
(
url
+
hl
[
'
href
'
]
,
hl
.
text
,
'
aljazeera
'
)
for
hl
in
headlines
]
app
=
web
.
Application
(
)
app
.
router
.
add_get
(
'
/news
'
,
news
)
web
.
run_app
(
app
,
port
=
8080
)
Die Funktion
news()
ist der Handler für die URL /news auf unserem Server. Sie gibt die HTML-Seite mit allen Schlagzeilen zurück.Hier haben wir nur zwei Nachrichten-Websites, die wir auslesen können: CNN und Al Jazeera. Es könnten leicht weitere hinzugefügt werden, aber dann müssten auch zusätzliche Postprozessoren hinzugefügt werden, genau wie die Funktionen
cnn_articles()
undaljazeera_articles()
, die für die Extraktion von Schlagzeilendaten angepasst werden.Für jede Nachrichtenseite erstellen wir eine Aufgabe, um die HTML-Seitendaten für ihre Startseite zu holen und zu verarbeiten. Beachte, dass wir das Tupel auspacken (
(*s)
), da die Coroutine-Funktionnews_fetch()
sowohl die URL als auch die Nachbearbeitungsfunktion als Parameter benötigt. Jeder Aufruf vonnews_fetch()
gibt eine Liste von Tupelnals Überschriftenergebnisse zurück, in der Form<article URL>
,<article title>
.Alle Aufgaben werden in einem einzigen
Future
gesammelt (gather()
gibt einen Future zurück, der den Zustand aller gesammelten Aufgaben repräsentiert), und dann werden wir sofortawait
den Abschluss dieses Futures. Diese Zeile hält an, bis der Future abgeschlossen ist.Da alle
news_fetch()
Aufgaben nun abgeschlossen sind, sammeln wir alle Ergebnisse in einem Wörterbuch. Beachte, wie verschachtelte Comprehensions verwendet werden, um über die Aufgaben zu iterieren und dann über die Liste der Tupel, die von jeder Aufgabe zurückgegeben werden. Wir verwenden f-strings auch, um Daten direkt zu ersetzen, einschließlich der Art der Seite, die in CSS verwendet wird, um den Hintergrund vondiv
zu färben.In diesem Wörterbuch ist der Schlüssel der Titel der Überschrift und der Wert ein HTML-String für
div
, der auf unserer Ergebnisseite angezeigt wird.Unser Webserver wird HTML zurückgeben. Wir laden die HTML-Daten aus einer lokalen Datei namens index.html. Diese Datei findest du inBeispiel B-1, falls du die Fallstudie selbst nachbauen möchtest.
Wir ersetzen die gesammelte Überschrift
div
in der Vorlage und geben die Seite an den Browser-Client zurück. So entsteht die in Abbildung 4-1 gezeigte Seite.Hier, in der
news_fetch()
Coroutine-Funktion, haben wir eine kleine Vorlage für den Zugriff auf die Splash-API (die bei mir in einem lokalen Docker-Container auf Port 8050 läuft). Dies zeigt, wieaiohttp
als HTTP-Client verwendet werden kann.Der Standardweg ist, eine
ClientSession()
Instanz zu erstellen und dann dieget()
Methode auf der Sitzungsinstanz zu verwenden, um den REST-Aufruf durchzuführen. In der nächsten Zeile werden die Antwortdaten abgefragt. Beachte, dass diese Coroutine nie blockiert, weil wir immer mit Coroutines arbeiten, also mitasync with
undawait
: Wir können viele tausend dieser Anfragen bearbeiten, auch wenn dieser Vorgang (news_fetch()
) relativ langsam ist, weil wir intern Webaufrufe durchführen.Nachdem wir die Daten erhalten haben, rufen wir die Nachbearbeitungsfunktion auf. Für CNN ist das
cnn_articles()
und für Al Jazeeraaljazeera_articles()
.Wir haben nur Platz für einen kurzen Blick auf die Nachbearbeitung. Nachdem wir die Seitendaten erhalten haben, verwenden wir die Beautiful Soup 4-Bibliothek, um Überschriften zu extrahieren.
Die Funktion
match()
gibt alle übereinstimmenden Tags zurück (ich habe den HTML-Quelltext dieser Nachrichten-Websites manuell überprüft, um herauszufinden, welche Kombination von Filtern die besten Tags extrahiert), und dann geben wir eine Liste von Tupeln zurück, die dem Format<article URL>
,<article title>
.Dies ist der analoge Postprozessor für Al Jazeera. Die
match()
Bedingung ist etwas anders, aber ansonsten ist sie die gleiche wie die von CNN.
Im Allgemeinen wirst du feststellen, dass aiohttp
eine einfache API hat und dir bei der Entwicklung deiner Anwendungen "nicht im Weg steht".
Im nächsten Abschnitt sehen wir uns die Verwendung von ZeroMQ mit asyncio
an, was die Socket-Programmierung auf seltsame Weise sehr angenehm macht.
ØMQ (ZeroMQ)
Programmieren ist eine Wissenschaft, die sich als Kunst verkleidet, denn die meisten von uns verstehen die Physik der Software nicht und sie wird selten, wenn überhaupt, gelehrt. Die Physik der Software besteht nicht aus Algorithmen, Datenstrukturen, Sprachen und Abstraktionen. Das sind nur Werkzeuge, die wir herstellen, benutzen und wegwerfen. Die wahre Physik der Software ist die Physik der Menschen. Genauer gesagt geht es um unsere Grenzen, wenn es um Komplexität geht, und um unseren Wunsch, zusammenzuarbeiten, um große Probleme in Teilen zu lösen. Das ist die Wissenschaft des Programmierens: Wenn du Bausteine entwickelst, die die Menschen verstehen und einfach benutzen können, werden sie zusammenarbeiten, um auch die größten Probleme zu lösen.
Pieter Hintjens, ZeroMQ: Messaging für viele Anwendungen
ØMQ (oder ZeroMQ) ist eine beliebte sprachunabhängige Bibliothek für Netzwerkanwendungen: Sie bietet "intelligente" Sockets. Wenn du ØMQ-Sockets in deinem Code erstellst, ähneln sie normalen Sockets, mit erkennbaren Methodennamen wie recv()
undsend()
und so weiter - aber intern erledigen diese Sockets einige der lästigen und mühsamen Aufgaben, die für die Arbeit mit herkömmlichen Sockets erforderlich sind.
So musst du nicht dein eigenes Protokoll erfinden und die Bytes auf der Leitung zählen, um herauszufinden, wann alle Bytes für eine bestimmte Nachricht angekommen sind - du sendest einfach das, was du als "Nachricht" betrachtest, und das Ganze kommt am anderen Ende intakt an.
Eine weitere großartige Funktion ist die automatische Wiederverbindungslogik. Wenn der Server ausfällt und später wieder hochkommt, wird der ØMQ-Socket des Clientsautomatisch wieder verbunden. Und was noch besser ist: Nachrichten, die dein Code an den Socket sendet, werden während der Unterbrechung gepuffert, so dass sie auch dann noch gesendet werden, wenn der Server zurückkehrt. Dies sind einige der Gründe, warum ØMQ manchmal alsbrokerloses Messaging bezeichnet wird: Es bietet einige der Funktionen von Message Broker Software direkt in den Socket-Objekten selbst.
ØMQ-Sockets sind intern bereits asynchron implementiert (sie können also viele Tausend gleichzeitige Verbindungen aufrechterhalten, selbst wenn sie in Threaded Code verwendet werden), aber dies ist hinter der ØMQ-API verborgen. DiePyZMQ-Python-Bindings für die ØMQ-Bibliothek unterstützen Asyncio, und in diesem Abschnitt werden wir uns einige Beispiele dafür ansehen, wie du diese intelligenten Sockets in deine Python-Anwendungen einbauen kannst.
Fallstudie: Mehrere Steckdosen
Wenn ØMQ bereits asynchrone Sockets zur Verfügung stellt, die mit Threading genutzt werden können, was bringt es dann, ØMQ mit asyncio
zu nutzen? Die Antwort ist sauberer Code.
Zur Veranschaulichung schauen wir uns eine kleine Fallstudie an, in der du mehrere ØMQ-Sockets in der gleichen Anwendung verwendest. Beispiel 4-15zeigt zunächst die blockierende Version (dieses Beispiel stammt aus dem zguide, dem offiziellen Leitfaden für ØMQ).
Beispiel 4-15. Der traditionelle ØMQ-Ansatz
# poller.py
import
zmq
context
=
zmq
.
Context
(
)
receiver
=
context
.
socket
(
zmq
.
PULL
)
receiver
.
connect
(
"
tcp://localhost:5557
"
)
subscriber
=
context
.
socket
(
zmq
.
SUB
)
subscriber
.
connect
(
"
tcp://localhost:5556
"
)
subscriber
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
poller
=
zmq
.
Poller
(
)
poller
.
register
(
receiver
,
zmq
.
POLLIN
)
poller
.
register
(
subscriber
,
zmq
.
POLLIN
)
while
True
:
try
:
socks
=
dict
(
poller
.
poll
(
)
)
except
KeyboardInterrupt
:
break
if
receiver
in
socks
:
message
=
receiver
.
recv_json
(
)
(
f
'
Via PULL:
{message}
'
)
if
subscriber
in
socks
:
message
=
subscriber
.
recv_json
(
)
(
f
'
Via SUB:
{message}
'
)
ØMQ-Sockets haben Typen. Dies ist ein
PULL
Socket. Du kannst ihn dir als eine Art Socket vorstellen, der nur empfängt und von einem anderen Socket gefüttert wird,der nur sendet und der vom TypPUSH
ist.Der
SUB
Socket ist eine andere Art von Socket, der nur empfängt, und er wird mit einemPUB
Socket gefüttert, der nur sendet.Wenn du Daten zwischen mehreren Sockets in einer ØMQ-Anwendung mit Threads austauschen musst, brauchst du einen Poller. Das liegt daran, dass diese Sockets nicht thread-sicher sind. Du kannst also nicht
recv()
auf verschiedenen Sockets in verschiedenen Threads verwenden.1Er funktioniert ähnlich wie der Systemaufruf
select()
. Der Poller gibt die Blockade auf, wenn auf einem der registrierten Sockets Daten zum Empfang bereitstehen, und dann liegt es an dir, die Daten abzuziehen und etwas damit zu tun. Der großeif
Block ist, wie du den richtigen Socket erkennst.
Die Verwendung einer Poller-Schleife und eines expliziten Socket-Auswahlblocks lässt den Code etwas klobig aussehen, aber dieser Ansatz vermeidet Probleme mit der Thread-Sicherheit, da er garantiert, dass derselbe Socket nicht von verschiedenen Threads verwendet wird.
Beispiel 4-16 zeigt den Servercode.
Beispiel 4-16. Server-Code
# poller_srv.py
import
zmq
,
itertools
,
time
context
=
zmq
.
Context
()
pusher
=
context
.
socket
(
zmq
.
PUSH
)
pusher
.
bind
(
"tcp://*:5557"
)
publisher
=
context
.
socket
(
zmq
.
PUB
)
publisher
.
bind
(
"tcp://*:5556"
)
for
i
in
itertools
.
count
():
time
.
sleep
(
1
)
pusher
.
send_json
(
i
)
publisher
.
send_json
(
i
)
Dieser Code ist für die Diskussion nicht wichtig, aber kurz: Es gibt einen PUSH
Socket und einen PUB
Socket, wie ich bereits sagte, und eine Schleife darin, die jede Sekunde Daten an beide Sockets sendet. Hier ist ein Beispiel für die Ausgabe von poller.py (Achtung: beide Programme müssen laufen):
$ poller.py Via PULL: 0 Via SUB: 0 Via PULL: 1 Via SUB: 1 Via PULL: 2 Via SUB: 2 Via PULL: 3 Via SUB: 3
Der Code funktioniert, aber unser Interesse gilt hier nicht der Frage, ob der Code läuft, sondern ob asyncio
etwas für die Struktur vonpoller.py zu bieten hat. Das Wichtigste ist, dass unserasyncio
Code in einem einzigen Thread ausgeführt wird, was bedeutet, dass es in Ordnung ist, verschiedene Sockets in verschiedenen Coroutineszu behandeln - undgenau das werden wir auch tun.
Natürlichmusste sich jemand die Mühe machen, die Unterstützung für Coroutines in pyzmq
(die Python-Client-Bibliothek für ØMQ) selbst einzubauen, damit das funktioniert, es war also nicht kostenlos. Aber wir können diese harte Arbeit nutzen, um die "traditionelle" Codestruktur zu verbessern, wie in Beispiel 4-17 gezeigt.
Beispiel 4-17. Saubere Trennung mit asyncio
# poller_aio.py
import
asyncio
import
zmq
from
zmq
.
asyncio
import
Context
context
=
Context
(
)
async
def
do_receiver
(
)
:
receiver
=
context
.
socket
(
zmq
.
PULL
)
receiver
.
connect
(
"
tcp://localhost:5557
"
)
while
message
:
=
await
receiver
.
recv_json
(
)
:
(
f
'
Via PULL:
{message}
'
)
async
def
do_subscriber
(
)
:
subscriber
=
context
.
socket
(
zmq
.
SUB
)
subscriber
.
connect
(
"
tcp://localhost:5556
"
)
subscriber
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
while
message
:
=
await
subscriber
.
recv_json
(
)
:
(
f
'
Via SUB:
{message}
'
)
async
def
main
(
)
:
await
asyncio
.
gather
(
do_receiver
(
)
,
do_subscriber
(
)
,
)
asyncio
.
run
(
main
(
)
)
Dieses Codebeispiel macht dasselbe wie Beispiel 4-15, nur dass wir jetzt die Vorteile von Coroutines nutzen, um alles neu zu strukturieren. Jetzt können wir jeden Socket einzeln behandeln. Ich habe zwei Coroutine-Funktionen erstellt, eine für jeden Socket; diese hier ist für den Socket
PULL
.Ich verwende die
asyncio
Unterstützung inpyzmq
, was bedeutet, dass allesend()
undrecv()
Aufrufe dasawait
Schlüsselwort verwenden müssen. DasPoller
erscheint nirgendwo mehr, weil es in dieasyncio
Ereignisschleife selbst integriert wurde.Dies ist der Handler für den Socket
SUB
. Die Struktur ist der desPULL
Socket-Handlers sehr ähnlich, aber das hätte nicht sein müssen. Wäre eine komplexere Logik erforderlich gewesen, hätte ich sie hier einfach hinzufügen können, und zwar vollständig gekapselt imSUB
-Handler-Code.Auch die
asyncio
-kompatiblen Sockets benötigen das Schlüsselwortawait
zum Senden und Empfangen.
Die Ausgabe ist dieselbe wie vorher, deshalb zeige ich sie nicht.
Die Verwendung von Coroutines hat meiner Meinung nach einen erstaunlich positiven Effekt auf den Codeaufbau in diesen Beispielen. In echtem Produktionscode mit vielen ØMQ-Sockets könnten die Coroutine-Handler für jeden einzelnen sogar in separaten Dateien stehen, was mehr Möglichkeiten für eine bessere Codestruktur bietet. Und selbst bei Programmen mit einem einzigen Lese-/Schreib-Socket ist es sehr einfach, getrennte Coroutines für das Lesen und Schreiben zu verwenden, falls nötig.
Der verbesserte Code sieht dem Code mit Threads sehr ähnlich, und für das hier gezeigte Beispiel funktioniert derselbe Refactor auch für Threads: Die blockierenden Funktionen do_receiver()
und do_subscriber()
werden in separaten Threads ausgeführt. Aber willst du dich wirklich mit dem Potenzialfür Race Conditions auseinandersetzen, vor allem, wenn deine Anwendung mit der Zeit an Funktionen und Komplexität zunimmt?
Hier gibt es eine Menge zu entdecken, und wie ich schon sagte, macht es Spaß, mit diesen magischen Sockets zu spielen. In der nächsten Fallstudie werden wir uns eine praktischere Anwendung von ØMQ ansehen.
Fallstudie: Überwachung der Anwendungsleistung
Mit den modernen, containerisierten, Microservice-basierten Bereitstellungspraktiken von heute sind einige Dinge, die früher trivial waren, wie z.B. die Überwachung der CPU- und Speichernutzung deiner Apps, etwas komplizierter geworden, als wenn du einfach top
laufen lässt. In den letzten Jahren sind mehrere kommerzielle Produkte auf den Markt gekommen, die sich mit diesen Problemen befassen, aber ihre Kosten können für kleine Startup-Teams und Hobbyisten unerschwinglich sein.
In dieser Fallstudie werde ich ØMQ und asyncio
nutzen, um einen Prototyp für die Überwachung verteilter Anwendungen zu entwickeln. Unser Entwurf besteht aus drei Teilen:
- Anwendungsschicht
-
Diese Schicht enthält alle unsere Anwendungen. Beispiele dafür sind ein Microservice "Kunden", ein Microservice "Buchungen", ein Microservice "E-Mailer" usw. Ich werde jeder unserer Anwendungen einen ØMQ "sendenden" Socket hinzufügen. Dieser Socket sendet Leistungskennzahlen an einen zentralen Server.
- Ebene der Sammlung
-
Der zentrale Server stellt einen ØMQ-Socket zur Verfügung, um die Daten von allen laufenden Anwendungsinstanzen zu sammeln. Der Server stellt außerdem eine Webseite zur Verfügung, auf der Leistungsdiagramme im Zeitverlauf angezeigt werden und die Daten live gestreamt werden, sobald sie eingehen.
- Visualisierungsebene
-
Dies ist die Webseite, die aufgerufen wird. Wir zeigen die gesammelten Daten in einer Reihe von Diagrammen an, die sich in Echtzeit aktualisieren. Um die Codebeispiele zu vereinfachen, verwende ich die praktische Smoothie ChartsJavaScript-Bibliothek, die alle notwendigen clientseitigen Funktionen bietet.
Die Backend-Applikation (Anwendungsschicht), die Metriken erzeugt, wird in Beispiel 4-18 gezeigt.
Beispiel 4-18. Die Anwendungsschicht: Erzeugung von Metriken
import
argparse
import
asyncio
from
random
import
randint
,
uniform
from
datetime
import
datetime
as
dt
from
datetime
import
timezone
as
tz
from
contextlib
import
suppress
import
zmq
,
zmq
.
asyncio
,
psutil
ctx
=
zmq
.
asyncio
.
Context
(
)
async
def
stats_reporter
(
color
:
str
)
:
p
=
psutil
.
Process
(
)
sock
=
ctx
.
socket
(
zmq
.
PUB
)
sock
.
setsockopt
(
zmq
.
LINGER
,
1
)
sock
.
connect
(
'
tcp://localhost:5555
'
)
with
suppress
(
asyncio
.
CancelledError
)
:
while
True
:
await
sock
.
send_json
(
dict
(
color
=
color
,
timestamp
=
dt
.
now
(
tz
=
tz
.
utc
)
.
isoformat
(
)
,
cpu
=
p
.
cpu_percent
(
)
,
mem
=
p
.
memory_full_info
(
)
.
rss
/
1024
/
1024
)
)
await
asyncio
.
sleep
(
1
)
sock
.
close
(
)
async
def
main
(
args
)
:
asyncio
.
create_task
(
stats_reporter
(
args
.
color
)
)
leak
=
[
]
with
suppress
(
asyncio
.
CancelledError
)
:
while
True
:
sum
(
range
(
randint
(
1
_000
,
10
_000_000
)
)
)
await
asyncio
.
sleep
(
uniform
(
0
,
1
)
)
leak
+
=
[
0
]
*
args
.
leak
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--color
'
,
type
=
str
)
parser
.
add_argument
(
'
--leak
'
,
type
=
int
,
default
=
0
)
args
=
parser
.
parse_args
(
)
try
:
asyncio
.
run
(
main
(
args
)
)
except
KeyboardInterrupt
:
(
'
Leaving...
'
)
ctx
.
term
(
)
Diese Coroutine-Funktion wird als langlebige Coroutine ausgeführt und sendet kontinuierlich Daten an den Serverprozess.
Erstelle einen ØMQ-Socket. Wie du weißt, gibt es verschiedene Arten von Sockets; dieser ist ein
PUB
Typ, der es ermöglicht, Einwegnachrichten an einen anderen ØMQ-Socket zu senden. Dieser Socket hat - wie der ØMQ-Leitfaden sagt - Superkräfte. Er kümmert sich automatisch um die Wiederverbindung und die Pufferung der Nachrichten für uns.Verbinde dich mit dem Server.
Unsere Shutdown-Sequenz wird von
KeyboardInterrupt
gesteuert, weiter unten. Wenn dieses Signal empfangen wird, werden alle Aufgaben abgebrochen. Hier behandle ich das ausgelösteCancelledError
mit dem praktischensuppress()
Kontextmanager aus demcontextlib
Standardbibliotheksmodul.Iteriert ewig und sendet Daten an den Server.
Da ØMQ in der Lage ist, mit kompletten Nachrichten zu arbeiten und nicht nur mit Teilen eines Bytestroms, öffnet es die Tür zu einer Reihe nützlicher Wrapper um das übliche
sock.send()
Idiom: Hier verwende ich eine dieser Hilfsmethoden,send_json()
, die das Argument automatisch in JSON serialisiert. So können wirdict()
direkt verwenden.Eine zuverlässige Methode zur Übertragung von Datumsangaben ist das ISO 8601-Format. Das gilt vor allem, wenn du Datumsdaten zwischen Software in verschiedenen Sprachen übertragen musst, da die meisten Sprachimplementierungen mit diesem Standard arbeiten können.
Um hier zu landen, müssen wir die
CancelledError
Ausnahme erhalten haben, die aus dem Abbruch der Aufgabe resultiert. Der ØMQ-Socket muss geschlossen werden, damit das Programm heruntergefahren werden kann.Die Funktion
main()
symbolisiert die eigentliche Microservice-Anwendung. Mit dieser Summe über Zufallszahlen wird Scheinarbeit produziert, nur um uns einige Daten ungleich Null zu liefern, die wir etwas später in der Visualisierungsschicht betrachten können.Ich werde mehrere Instanzen dieser Anwendung erstellen, so dass es praktisch ist, wenn ich sie (später in den Diagrammen) mit einem
--color
Parameter unterscheiden kann.Schließlich kann der ØMQ-Kontext beendet werden.
Das Hauptaugenmerk liegt auf der Funktion stats_reporter()
. Über sie werden die (von der nützlichen psutil
Bibliothek gesammelten) Metrikdaten ausgegeben. Der Rest des Codes kann als typische Microservice-Anwendung angesehen werden.
Der Servercode in Beispiel 4-19 sammelt alle Daten und liefert sie an einen Webclient.
Beispiel 4-19. Die Sammelschicht: Dieser Server sammelt Prozessstatistiken
# metric-server.py
import
asyncio
from
contextlib
import
suppress
import
zmq
import
zmq
.
asyncio
import
aiohttp
from
aiohttp
import
web
from
aiohttp_sse
import
sse_response
from
weakref
import
WeakSet
import
json
# zmq.asyncio.install()
ctx
=
zmq
.
asyncio
.
Context
(
)
connections
=
WeakSet
(
)
async
def
collector
(
)
:
sock
=
ctx
.
socket
(
zmq
.
SUB
)
sock
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
sock
.
bind
(
'
tcp://*:5555
'
)
with
suppress
(
asyncio
.
CancelledError
)
:
while
data
:
=
await
sock
.
recv_json
(
)
:
(
data
)
for
q
in
connections
:
await
q
.
put
(
data
)
sock
.
close
(
)
async
def
feed
(
request
)
:
queue
=
asyncio
.
Queue
(
)
connections
.
add
(
queue
)
with
suppress
(
asyncio
.
CancelledError
)
:
async
with
sse_response
(
request
)
as
resp
:
while
data
:
=
await
queue
.
get
(
)
:
(
'
sending data:
'
,
data
)
resp
.
send
(
json
.
dumps
(
data
)
)
return
resp
async
def
index
(
request
)
:
return
aiohttp
.
web
.
FileResponse
(
'
./charts.html
'
)
async
def
start_collector
(
app
)
:
app
[
'
collector
'
]
=
app
.
loop
.
create_task
(
collector
(
)
)
async
def
stop_collector
(
app
)
:
(
'
Stopping collector...
'
)
app
[
'
collector
'
]
.
cancel
(
)
await
app
[
'
collector
'
]
ctx
.
term
(
)
if
__name__
==
'
__main__
'
:
app
=
web
.
Application
(
)
app
.
router
.
add_route
(
'
GET
'
,
'
/
'
,
index
)
app
.
router
.
add_route
(
'
GET
'
,
'
/feed
'
,
feed
)
app
.
on_startup
.
append
(
start_collector
)
app
.
on_cleanup
.
append
(
stop_collector
)
web
.
run_app
(
app
,
host
=
'
127.0.0.1
'
,
port
=
8088
)
Die eine Hälfte dieses Programms empfängt Daten von anderen Anwendungen, die andere Hälfte liefert Daten an die Browser-Clients über Server-sent events (SSEs). Ich verwende eine
WeakSet()
Jeder verbundene Client hat eine zugehörigeQueue()
Instanz, sodass dieseconnections
Kennung eigentlich eine Reihe von Warteschlangen ist.Erinnere dich daran, dass ich in der Anwendungsschicht einen
zmq.PUB
Socket verwendet habe; hier in der Sammlungsschicht verwende ich seinen Partner, denzmq.SUB
Socket-Typ. Dieser ØMQ-Socket kann nur empfangen, aber nicht senden.Für den Socket-Typ
zmq.SUB
ist die Angabe eines Abonnementnamens erforderlich, aber für unsere Zwecke nehmen wir einfach alles, was reinkommt - daher der leere Themenname.Ich binde die
zmq.SUB
Buchse. Denk mal kurz darüber nach. In Pub-Sub-Konfigurationen musst du normalerweise das Pub-Ende zum Server machen (bind()
) und das Sub-Ende zum Client (connect()
). Bei ØMQ ist das anders: Jedes Ende kann der Server sein. Für unseren Anwendungsfall ist das wichtig, denn jede unserer Anwendungsschicht-Instanzen wird sich mit demselben Domänennamen des Sammlungsservers verbinden, und nicht andersherum.Die Unterstützung für
asyncio
inpyzmq
ermöglicht es uns,await
Daten von unseren verbundenen Apps zu erhalten. Und nicht nur das, die eingehenden Daten werden automatisch aus JSON deserialisiert (ja, das bedeutet, dassdata
eindict()
ist).Erinnere dich daran, dass unser
connections
Set eine Warteschlange für jeden verbundenen Webclient enthält. Jetzt, da die Daten empfangen wurden, ist es an der Zeit, sie an alle Clients zu senden: Die Daten werden in jede Warteschlange gestellt.Die Funktion
feed()
coroutine erstellt Coroutines für jeden verbundenen Webclient. Intern werden die vom Server gesendeten Ereignisse verwendet, um Daten an die Webclients zu senden.Wie bereits beschrieben, hat jeder Webclient seine eigene
queue
Instanz, um Daten von dercollector()
Coroutine zu empfangen. Die Instanzqueue
wird demconnections
Set hinzugefügt, aber daconnections
ein Weakset ist, wird der Eintrag automatisch ausconnections
entfernt, wennqueue
nicht mehr verfügbar ist, d.h. wenn ein Webclient die Verbindung trennt. Weakrefs sind großartig, um diese Art von Buchhaltungsaufgaben zu vereinfachen.Das Paket
aiohttp_sse
stellt den Kontextmanagersse_response()
zur Verfügung. Dieser gibt uns einen Rahmen, in dem wir Daten an den Webclient weitergeben können.Wir bleiben mit dem Webclient verbunden und warten auf Daten in der Warteschlange dieses speziellen Clients.
Sobald die Daten eintreffen (innerhalb von
collector()
), werden sie an den verbundenen Webclient gesendet. Beachte, dass ich hier dasdata
Diktat reserialisiere. Eine Optimierung dieses Codes wäre es, die Deserialisierung von JSON incollector()
zu vermeiden und stattdessensock.recv_string()
zu verwenden, um den Serialisierungsvorgang zu umgehen. In einem realen Szenario könntest du natürlich auch im Collector deserialisieren und eine Validierung der Daten durchführen, bevor du sie an den Browser-Client sendest. So viele Möglichkeiten!Der Endpunkt
index()
ist der primäre Seitenladepunkt, und hier wird eine statische Datei namens charts.html bereitgestellt.Die Bibliothek
aiohttp
bietet uns die Möglichkeit, zusätzliche langlebige Coroutines einzubinden, die wir eventuell benötigen. Mit der Coroutinecollector()
haben wir genau diese Situation, also erstelle ich eine Startup-Coroutine,start_collector()
, und eine Shutdown-Coroutine. Diese werden in bestimmten Phasen deraiohttp
Startup- und Shutdown-Sequenz aufgerufen. Beachte, dass ich die Collector-Aufgabe in dieapp
selbst einfüge, die ein Mapping-Protokoll implementiert, damit du sie wie ein Diktat verwenden kannst.Ich beziehe unsere
collector()
Coroutine über denapp
Bezeichner und rufecancel()
auf.Schließlich kannst du sehen, wo die benutzerdefinierten Startup- und Shutdown-Coroutinen eingehängt werden: Die Instanz
app
bietet Hooks, an die unsere benutzerdefinierten Coroutinen angehängt werden können.
Jetzt fehlt nur noch die Visualisierungsebene, wie in Beispiel 4-20 gezeigt. Ich verwende dieSmoothie Charts-Bibliothek, um scrollende Diagramme zu erstellen. Das komplette HTML für unsere wichtigste (und einzige) Webseite,charts.html, findest du in Beispiel B-1. Es gibt zu viel HTML, CSS und JavaScript, um es in diesem Abschnitt darzustellen, aber ich möchte ein paar Punkte hervorheben, wie die vom Server gesendeten Ereignisse in JavaScript im Browser-Client behandelt werden.
Beispiel 4-20. Die Visualisierungsebene, was eine schicke Umschreibung für "der Browser" ist
<
snip
>
var
evtSource
=
new
EventSource
(
"/feed"
)
;
evtSource
.
onmessage
=
function
(
e
)
{
var
obj
=
JSON
.
parse
(
e
.
data
)
;
if
(
!
(
obj
.
color
in
cpu
)
)
{
add_timeseries
(
cpu
,
cpu_chart
,
obj
.
color
)
;
}
if
(
!
(
obj
.
color
in
mem
)
)
{
add_timeseries
(
mem
,
mem_chart
,
obj
.
color
)
;
}
cpu
[
obj
.
color
]
.
append
(
Date
.
parse
(
obj
.
timestamp
)
,
obj
.
cpu
)
;
mem
[
obj
.
color
]
.
append
(
Date
.
parse
(
obj
.
timestamp
)
,
obj
.
mem
)
;
}
;
<
snip
>
Erstelle eine neue
EventSource()
Instanz mit der URL /feed. Der Browser wird sich mit /feed auf unserem Server verbinden (metric_server.py). Beachte, dass der Browser automatisch versucht, die Verbindung wiederherzustellen, wenn sie unterbrochen wird. Vom Server gesendete Ereignisse werden oft übersehen, aber in vielen Situationen sind sie aufgrund ihrer Einfachheit den WebSockets vorzuziehen.Das Ereignis
onmessage
wird jedes Mal ausgelöst, wenn der Server Daten sendet. Hier werden die Daten als JSON geparst.Der
cpu
Bezeichner ist eine Zuordnung einer Farbe zu einerTimeSeries()
Instanz (mehr dazu in Beispiel B-1). Hier erhalten wir diese Zeitreihe und fügen ihr Daten hinzu. Außerdem holen wir uns den Zeitstempel und parsen ihn, um das richtige Format für das Diagramm zu erhalten.
Jetzt können wir den Code ausführen. Um die ganze Show in Gang zu bringen, sind eine Reihe von Befehlszeilenanweisungen erforderlich, von denen die erste darin besteht, den Datensammelprozess zu starten:
$ metric-server.py ======== Running on http://127.0.0.1:8088 ======== (Press CTRL+C to quit)
Der nächste Schritt besteht darin, alle Microservice-Instanzen zu starten. Diese werden ihre CPU- und Speichernutzungsdaten an den Collector senden. Jeder wird durch eine andere Farbe gekennzeichnet, die in der Befehlszeile angegeben wird. Beachte, wie zwei der Microservices angewiesen werden, Speicher zu lecken:
$ backend-app.py --color red & $ backend-app.py --color blue --leak 10000 & $ backend-app.py --color green --leak 100000 &
Abbildung 4-2 zeigt unser Endprodukt in einem Browser. Du wirst mir glauben müssen, dass die Graphen wirklich animiert sind. Du wirst in den vorangegangenen Befehlszeilen sehen, dass ich bei Blau etwas und bei Grün viel Speicherplatz verloren habe. Ich musste den grünen Dienst sogar ein paar Mal neu starten, um zu verhindern, dass er über 100 MB ansteigt.
Was an diesem Projekt besonders interessant ist, ist Folgendes: Jede der laufenden Instanzen in jedem Teil dieses Stacks kann neu gestartet werden, und es ist kein Code zur Wiederherstellung der Verbindung erforderlich. Die ØMQ-Sockets und die EventSource()
JavaScript-Instanz im Browser verbinden sich auf magische Weise wieder und machen da weiter, wo sie aufgehört haben.
Im nächsten Abschnitt widmen wir uns den Datenbanken und der Frage, wie asyncio
genutzt werden kann, um ein System zur Cache-Invalidierung zu entwickeln.
asyncpg und Sanic
Die Bibliothekasyncpg
ermöglicht den Client-Zugriff auf die PostgreSQL-Datenbank, unterscheidet sich aber von anderenasyncio
-kompatiblen Postgres-Client-Bibliotheken, indem sie den Schwerpunkt auf Geschwindigkeit legt.asyncpg
wurde von Yury Selivanov, einem der wichtigsten asyncio
Python-Entwickler, verfasst, der auch Autor des uvloop-Projekts ist.Es gibt keine Abhängigkeiten von Drittanbietern, obwohlCython erforderlich ist, wenn du vom Quellcode installierst.
asyncpg
erreicht seine Geschwindigkeit, indem es direkt gegen das PostgreSQL-Binärprotokoll arbeitet. Weitere Vorteile dieses Low-Level-Ansatzes sind die Unterstützung vonPrepared Statementsund scrollbaren Cursors.
Wir werden uns eine Fallstudie ansehen, in der asyncpg
für die Cache-Ungültigkeitserklärung verwendet wird. Zuvor ist es jedoch nützlich, ein grundlegendes Verständnis für die API asyncpg
zu bekommen. Für den gesamten Code in diesem Abschnitt benötigen wir eine laufende Instanz von PostgreSQL. Das geht am einfachsten mit Docker, indem du den folgenden Befehl verwendest:
$ docker run -d --rm -p 55432:5432 postgres
Beachte, dass ich den Port 55432 und nicht den Standardport 5432 verwendet habe, falls du bereits eine Instanz der Datenbank auf dem Standardport laufen hast. Beispiel 4-21 zeigt kurz, wie man asyncpg
verwendet, um mit PostgreSQL zu kommunizieren.
Beispiel 4-21. Grundlegende Demo von asyncpg
# asyncpg-basic.py
import
asyncio
import
asyncpg
import
datetime
from
util
import
Database
async
def
main
(
)
:
async
with
Database
(
'
test
'
,
owner
=
True
)
as
conn
:
await
demo
(
conn
)
async
def
demo
(
conn
:
asyncpg
.
Connection
)
:
await
conn
.
execute
(
'''
CREATE TABLE users(
id serial PRIMARY KEY,
name text,
dob date
)
'''
)
pk
=
await
conn
.
fetchval
(
'
INSERT INTO users(name, dob) VALUES($1, $2)
'
'
RETURNING id
'
,
'
Bob
'
,
datetime
.
date
(
1984
,
3
,
1
)
)
async
def
get_row
(
)
:
return
await
conn
.
fetchrow
(
'
SELECT * FROM users WHERE name = $1
'
,
'
Bob
'
)
(
'
After INSERT:
'
,
await
get_row
(
)
)
await
conn
.
execute
(
'
UPDATE users SET dob = $1 WHERE id=1
'
,
datetime
.
date
(
1985
,
3
,
1
)
)
(
'
After UPDATE:
'
,
await
get_row
(
)
)
await
conn
.
execute
(
'
DELETE FROM users WHERE id=1
'
)
(
'
After DELETE:
'
,
await
get_row
(
)
)
if
__name__
==
'
__main__
'
:
asyncio
.
run
(
main
(
)
)
Ich habe einige Textbausteine in einem kleinen
util
Modul versteckt, um die Dinge zu vereinfachen und die Kernaussage beizubehalten.Die Klasse
Database
stellt uns einen Kontextmanager zur Verfügung, der eine neue Datenbank für uns erstellt - in diesem Fall mit dem Namentest
- und diese Datenbank zerstört, wenn der Kontextmanager beendet wird. Das ist sehr nützlich, wenn wir mit Ideen im Code experimentieren. Da kein Zustand zwischen den Experimenten übertragen wird, kannst du jedes Mal mit einer sauberen Datenbank beginnen. Beachte, dass es sich um einenasync with
Kontextmanager handelt; wir werden später mehr darüber sprechen, aber im Moment liegt der Schwerpunkt dieser Demo auf den Vorgängen innerhalb der Coroutinedemo()
.Der
Database
Kontextmanager hat uns eineConnection
Instanz zur Verfügung gestellt, die sofort verwendet wird, um eine neue Tabelle,users
, zu erstellen.Ich benutze
fetchval()
, um einen neuen Datensatz einzufügen. Ich hätte auchexecute()
zum Einfügen verwenden können, aber der Vorteil der Verwendung vonfetchval()
ist, dass ich denid
des neu eingefügten Datensatzes erhalten kann, den ich im Bezeichnerpk
speichere.Beachte, dass ich Parameter (
$1
und$2
) verwende, um Daten an die SQL-Abfrage zu übergeben. Verwende niemals String-Interpolation oder Verkettung, um Abfragen zu erstellen, da dies ein Sicherheitsrisiko darstellt!Im weiteren Verlauf der Demo werde ich die Daten in der Tabelle
users
bearbeiten, also erstelle ich eine neue Coroutine-Funktion, die einen Datensatz in der Tabelle abruft. Diese Funktion wird mehrere Male aufgerufen werden.Beim Abrufen von Daten ist es viel nützlicher, die
fetch
-basierten Methoden zu verwenden, da dieseRecord
Objekte zurückgeben.asyncpg
wandelt die Datentypen automatisch in die für Python am besten geeigneten Typen um.Ich verwende sofort die
get_row()
Hilfe, um den neu eingefügten Datensatz anzuzeigen.Ich ändere Daten, indem ich den Befehl
UPDATE
für SQL verwende. Es ist eine kleine Änderung: Der Jahreswert im Geburtsdatum wird um ein Jahr geändert. Wie zuvor wird dies mit der Methodeexecute()
der Verbindung durchgeführt. Der Rest der Code-Demo folgt der gleichen Struktur wie bisher, und ein paar Zeilen weiter folgt einDELETE
, gefolgt von einem weiterenprint()
.
Hier ist die Ausgabe, wenn du das Skript ausführst:
$ asyncpg-basic.py After INSERT: <Record id=1 name='Bob' dob=datetime.date(1984, 3, 1)> After UPDATE: <Record id=1 name='Bob' dob=datetime.date(1985, 3, 1)> After DELETE: None
Beachte, wie der Datumswert in unserem Record
Objekt in ein Python date
Objekt umgewandelt wurde: asyncpg
hat den Datentyp automatisch vom SQL-Typ in sein Python-Gegenstück umgewandelt. Eine große Tabelle mitTypkonvertierungen in der Dokumentation von asyncpg
beschreibt alle Typzuordnungen, die in die Bibliothek eingebaut sind.
Der vorangegangene Code ist sehr einfach, vielleicht sogar grob, wenn du an die Annehmlichkeiten von objektrelationalen Mappern (ORMs) wie SQLAlchemy oder dem eingebauten ORM des Django Web Frameworks gewöhnt bist. Am Ende dieses Kapitels erwähne ich einige Bibliotheken von Drittanbietern, die Zugang zu ORMs oder ORM-ähnlichen Funktionen für asyncpg
bieten.
Beispiel 4-22 zeigt mein Boilerplate Database
Objekt im utils
Modul; vielleicht findest du es nützlich, etwas Ähnliches für deine eigenen Experimente zu erstellen.
Beispiel 4-22. Nützliche Hilfsmittel für deine asyncpg-Experimente
# util.py
import
argparse
,
asyncio
,
asyncpg
from
asyncpg
.
pool
import
Pool
DSN
=
'
postgresql://
{user}
@
{host}
:
{port}
'
DSN_DB
=
DSN
+
'
/
{name}
'
CREATE_DB
=
'
CREATE DATABASE
{name}
'
DROP_DB
=
'
DROP DATABASE
{name}
'
class
Database
:
def
__init__
(
self
,
name
,
owner
=
False
,
*
*
kwargs
)
:
self
.
params
=
dict
(
user
=
'
postgres
'
,
host
=
'
localhost
'
,
port
=
55432
,
name
=
name
)
self
.
params
.
update
(
kwargs
)
self
.
pool
:
Pool
=
None
self
.
owner
=
owner
self
.
listeners
=
[
]
async
def
connect
(
self
)
-
>
Pool
:
if
self
.
owner
:
await
self
.
server_command
(
CREATE_DB
.
format
(
*
*
self
.
params
)
)
self
.
pool
=
await
asyncpg
.
create_pool
(
DSN_DB
.
format
(
*
*
self
.
params
)
)
return
self
.
pool
async
def
disconnect
(
self
)
:
"""Destroy the database"""
if
self
.
pool
:
releases
=
[
self
.
pool
.
release
(
conn
)
for
conn
in
self
.
listeners
]
await
asyncio
.
gather
(
*
releases
)
await
self
.
pool
.
close
(
)
if
self
.
owner
:
await
self
.
server_command
(
DROP_DB
.
format
(
*
*
self
.
params
)
)
async
def
__aenter__
(
self
)
-
>
Pool
:
return
await
self
.
connect
(
)
async
def
__aexit__
(
self
,
*
exc
)
:
await
self
.
disconnect
(
)
async
def
server_command
(
self
,
cmd
)
:
conn
=
await
asyncpg
.
connect
(
DSN
.
format
(
*
*
self
.
params
)
)
await
conn
.
execute
(
cmd
)
await
conn
.
close
(
)
async
def
add_listener
(
self
,
channel
,
callback
)
:
conn
:
asyncpg
.
Connection
=
await
self
.
pool
.
acquire
(
)
await
conn
.
add_listener
(
channel
,
callback
)
self
.
listeners
.
append
(
conn
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--cmd
'
,
choices
=
[
'
create
'
,
'
drop
'
]
)
parser
.
add_argument
(
'
--name
'
,
type
=
str
)
args
=
parser
.
parse_args
(
)
d
=
Database
(
args
.
name
,
owner
=
True
)
if
args
.
cmd
==
'
create
'
:
asyncio
.
run
(
d
.
connect
(
)
)
elif
args
.
cmd
==
'
drop
'
:
asyncio
.
run
(
d
.
disconnect
(
)
)
else
:
parser
.
print_help
(
)
Die Klasse
Database
ist nur ein ausgefallener Kontextmanager zum Erstellen und Löschen einer Datenbank in einer PostgreSQL-Instanz. Der Datenbankname wird dem Konstruktor übergeben.(Hinweis: Die Reihenfolge der Aufrufe im Code weicht absichtlich von dieser Liste ab). Dies ist ein asynchroner Kontextmanager. Anstelle der üblichen Methoden
__enter__()
und__exit__()
verwende ich ihre Gegenstücke__aenter__()
und__aexit__()
.Hier, auf der Eingabeseite, erstelle ich die neue Datenbank und gebe eine Verbindung zu dieser neuen Datenbank zurück.
server_command()
ist eine weitere Hilfsmethode, die ein paar Zeilen weiter unten definiert ist. Ich verwende sie, um den Befehl zum Erstellen unserer neuen Datenbank auszuführen.Dann stelle ich eine Verbindung zu der neu erstellten Datenbank her. Beachte, dass ich einige Details über die Verbindung fest einprogrammiert habe: Das ist Absicht, denn ich wollte die Codebeispiele klein halten. Du könntest dies leicht verallgemeinern, indem du Felder für den Benutzernamen, den Hostnamen und den Port erstellst.
Auf der Beendigungsseite des Kontextmanagers schließe ich die Verbindung und...
...die Datenbank zerstören.
Der Vollständigkeit halber: Dies ist unsere Utility-Methode zur Ausführung von Befehlen gegen den PostgreSQL-Server selbst. Sie erstellt zu diesem Zweck eine Verbindung, führt den angegebenen Befehl aus und beendet sich.
Diese Funktion erstellt eine dauerhafte Socket-Verbindung zur Datenbank, die auf Ereignisse wartet. Dieser Mechanismus wird in der nächsten Fallstudie vorgestellt.
Vorsicht
In Punkt 8 des vorangegangenen Codes habe ich für jeden Kanal, den ich abhören will, eine eigene Verbindung erstellt. Das ist teuer, denn es bedeutet, dass ein PostgreSQL-Worker für jeden Kanal, der abgehört wird, komplett ausgelastet ist. Viel besser wäre es, eine Verbindung für mehrere Kanäle zu verwenden. Wenn du dieses Beispiel durchgearbeitet hast, kannst du versuchen, den Code so zu ändern, dass eine einzige Verbindung für mehrere Channel Listener verwendet wird.
Jetzt, wo du die grundlegenden Bausteine von asyncpg
kennst, können wir sie mit einer wirklich unterhaltsamen Fallstudie weiter erforschen: die Verwendung der in PostgreSQL eingebauten Unterstützung für das Senden von Ereignisbenachrichtigungen, um eine Cache-Ungültigkeitserklärung durchzuführen!
Fallstudie: Cache-Invalidierung
Es gibt zwei schwierige Dinge in der Informatik: Cache-Invalidierung, das Benennen von Dingen, und Off-by-One-Fehler.
Phil Karlton
Bei Webservices und Webanwendungen kommt es häufig vor, dass die Persistenzschicht, d. h. die Datenbank (DB), eher zum Leistungsengpass wird als jeder andere Teil des Stacks. Die Anwendungsschicht kann in der Regel horizontal skaliert werden, indem mehr Instanzen betrieben werden, während dies bei einer Datenbank schwieriger ist.
Deshalb ist es gängige Praxis, nach Designoptionen zu suchen, die eine übermäßige Interaktion mit der Datenbank begrenzen können. Die gebräuchlichste Option ist die Verwendung von Caching, um sich an zuvor abgefragte Datenbankergebnisse zu "erinnern" und sie bei Bedarf wieder abzurufen, so dass ein erneuter Aufruf der Datenbank für dieselben Informationen vermieden wird.
Was passiert aber, wenn eine deiner App-Instanzen neue Daten in die Datenbank schreibt, während eine andere App-Instanz noch die alten, veralteten Daten aus ihrem internen Cache zurückgibt? Das ist ein klassisches Cache-Invalidierungsproblem, das sich nur sehr schwer lösen lässt.
Unsere Angriffsstrategie sieht folgendermaßen aus:
-
Jede App-Instanz hat einen In-Memory-Cache für DB-Abfragen.
-
Wenn man neue Daten in die Datenbank schreibt, informiert die Datenbank alle verbundenen App-Instanzen über die neuen Daten.
-
Jede App-Instanz aktualisiert dann ihren internen Cache entsprechend.
In dieser Fallstudie wird gezeigt, wie PostgreSQL mit seiner eingebauten Unterstützung für Ereignisaktualisierungen über dieLISTEN
undNOTIFY
Befehle, einfach mitteilen kann, wenn sich die Daten geändert haben.
asyncpg
unterstützt bereits die LISTEN
/NOTIFY
API. Diese Funktion von PostgreSQL ermöglicht es deiner Anwendung, Ereignisse in einem benannten Kanal zu abonnieren und Ereignisse an benannte Kanäle zu senden. PostgreSQL kann fast zu einer leichteren Version vonRabbitMQ oderActiveMQ werden!
In dieser Fallstudie gibt es mehr bewegliche Teile als sonst, und das macht es schwierig, sie in der üblichen linearen Form zu präsentieren. Stattdessen fangen wir mit dem Endprodukt an und arbeiten uns rückwärts zur zugrunde liegenden Implementierung vor.
Unsere App bietet einen JSON-basierten API-Server für die Verwaltung der Lieblingsgerichte der Gäste in unserem Roboterrestaurant. Die Datenbank enthält nur eine Tabelle, patron
, mit nur zwei Feldern: name
und fav_dish
. Unsere API ermöglicht die üblichen vier Operationen: Erstellen, Lesen,Aktualisieren und Löschen (CRUD).
Im Folgenden siehst du ein Beispiel für die Interaktion mit unserer API unter curl
. Es zeigt, wie du einen neuen Eintrag in unserer Datenbank erstellst (ich habe noch nicht gezeigt, wie du den Server auf localhost:8000 startest; das kommt später):
$ curl -d '{"name": "Carol", "fav_dish": "SPAM Bruschetta"}' \ -H "Content-Type: application/json" \ -X POST \ http://localhost:8000/patron {"msg":"ok","id":37}
Der Parameter -d
ist für Daten,2 -H
ist für die HTTP-Header, -X
ist für die HTTP-Anforderungsmethode (Alternativen sind GET
, DELETE
, PUT
und einige andere) und die URL ist für unseren API-Server. Zu dem Code dafür kommen wir gleich.
In der Ausgabe sehen wir, dass die Erstellung ok
war, und dass id
der Primärschlüssel des neuen Datensatzes in der Datenbank ist.
In den nächsten paar Shell-Schnipseln werden wir die anderen drei Operationen durchgehen:lesen, aktualisieren und löschen. Mit diesem Befehl können wir den soeben erstellten Datensatz eines Gönners lesen:
$ curl -X GET http://localhost:8000/patron/37
{"id":37,"name":"Carol","fav_dish":"SPAM Bruschetta"}
Das Lesen der Daten ist ziemlich einfach. Beachte, dass die id
des gewünschten Datensatzes in der URL angegeben werden muss.
Als Nächstes aktualisieren wir den Datensatz und überprüfen die Ergebnisse:
$ curl -d '{"name": "Eric", "fav_dish": "SPAM Bruschetta"}' \ -H "Content-Type: application/json" \ -X PUT \ http://localhost:8000/patron/37 $ curl -X GET http://localhost:8000/patron/37 {"msg":"ok"} {"id":37,"name":"Eric","fav_dish":"SPAM Bruschetta"}
Das Aktualisieren einer Ressource ist ähnlich wie das Erstellen einer Ressource, mit zwei wichtigen Unterschieden:
-
Die HTTP-Anforderungsmethode (
-X
) lautetPUT
, nichtPOST
. -
Die URL benötigt jetzt das Feld
id
, um anzugeben, welche Ressource aktualisiert werden soll.
Schließlich können wir den Datensatz löschen und seine Löschung mit den folgenden Befehlen überprüfen:
$ curl -X DELETE http://localhost:8000/patron/37 $ curl -X GET http://localhost:8000/patron/37 {"msg":"ok"} null
Wie du sehen kannst, wird null
zurückgegeben, wenn du versuchst,GET
einen Datensatz zu finden, der nicht existiert.
Bis jetzt sieht das alles recht gewöhnlich aus, aber unser Ziel ist nicht nur eine CRUD-API, sondern wir wollen uns auch die Cache-Invalidierung ansehen. Richten wir also unsere Aufmerksamkeit auf den Cache. Da wir nun ein grundlegendes Verständnis der API unserer App haben, können wir uns die Anwendungsprotokolle ansehen, um die Zeitdaten für jede Anfrage zu ermitteln: Daraus können wir ersehen, welche Anfragen im Cache gespeichert sind und welche auf die DB treffen.
Wenn der Server zum ersten Mal gestartet wird, ist der Cache leer; es handelt sich schließlich um einen Speicher-Cache. Wir werden unseren Server starten und dann in einer separaten Shell zwei GET
Anfragen kurz hintereinander ausführen:
$ curl -X GET http://localhost:8000/patron/29 $ curl -X GET http://localhost:8000/patron/29 {"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"} {"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"}
Wir gehen davon aus, dass wir beim ersten Mal, wenn wir unseren Datensatz abrufen, einen Fehler im Cache haben werden und beim zweiten Mal einen Treffer. Das können wir im Protokoll des API-Servers selbst sehen (der erste Sanic-Webserver, der auf localhost:8000 läuft):
$ sanic_demo.py 2019-09-29 16:20:33 - (sanic)[DEBUG]: ▄▄▄▄▄ ▀▀▀██████▄▄▄ _______________ ▄▄▄▄▄ █████████▄ / \ ▀▀▀▀█████▌ ▀▐▄ ▀▐█ | Gotta go fast! | ▀▀█████▄▄ ▀██████▄██ | _________________/ ▀▄▄▄▄▄ ▀▀█▄▀█════█▀ |/ ▀▀▀▄ ▀▀███ ▀ ▄▄ ▄███▀▀██▄████████▄ ▄▀▀▀▀▀▀█▌ ██▀▄▄▄██▀▄███▀ ▀▀████ ▄██ ▄▀▀▀▄██▄▀▀▌████▒▒▒▒▒▒███ ▌▄▄▀ ▌ ▐▀████▐███▒▒▒▒▒▐██▌ ▀▄▄▄▄▀ ▀▀████▒▒▒▒▄██▀ ▀▀█████████▀ ▄▄██▀██████▀█ ▄██▀ ▀▀▀ █ ▄█ ▐▌ ▄▄▄▄█▌ ▀█▄▄▄▄▀▀▄ ▌ ▐ ▀▀▄▄▄▀ ▀▀▄▄▀ 2019-09-29 16:20:33 (sanic): Goin' Fast @ http://0.0.0.0:8000 2019-09-29 16:20:33 (sanic): Starting worker [10366] 2019-09-29 16:25:27 (perf): id=37 Cache miss 2019-09-29 16:25:27 (perf): get Elapsed: 4.26 ms 2019-09-29 16:25:27 (perf): get Elapsed: 0.04 ms
Alles bis zu dieser Zeile ist die standardmäßige
sanic
Startup-Logmeldung.Wie beschrieben, führt die erste
GET
zu einem Cache-Miss, weil der Server gerade erst gestartet ist.Das ist von unserem ersten
curl -X GET
. Ich habe den API-Endpunkten einige Zeitfunktionen hinzugefügt. Hier können wir sehen, dass der Handler für dieGET
Anfrage ~4 ms brauchte.Die zweite
GET
liefert Daten aus dem Cache und die viel schnelleren (100x schneller!) Zeitdaten.
So weit nichts Ungewöhnliches. Viele Webanwendungen nutzen das Caching auf diese Weise.
Starten wir nun eine zweite App-Instanz auf Port 8001 (die erste Instanz war auf Port 8000):
$ sanic_demo.py --port 8001 <snip> 2017-10-02 08:09:56 - (sanic): Goin' Fast @ http://0.0.0.0:8001 2017-10-02 08:09:56 - (sanic): Starting worker [385]
Beide Instanzen verbinden sich natürlich mit der gleichen Datenbank. Jetzt, wo beide API-Server-Instanzen laufen, ändern wir die Daten für den KundenJohn, der offensichtlich nicht genug Spam auf seinem Speiseplan hat. Hier führen wir eine UPDATE
gegen die erste App-Instanz an Port 8000 aus:
$ curl -d '{"name": "John Cleese", "fav_dish": "SPAM on toast"}' \ -H "Content-Type: application/json" \ -X PUT \ http://localhost:8000/patron/29 {"msg":"ok"}
Unmittelbar nach diesem Aktualisierungsereignis auf nur einer der App-Instanzen melden beideAPI-Server, 8000 und 8001, das Ereignis in ihren jeweiligen Protokollen:
2019-10-02 08:35:49 - (perf)[INFO]: Got DB event: { "table": "patron", "id": 29, "type": "UPDATE", "data": { "old": { "id": 29, "name": "John Cleese", "fav_dish": "Gravy on Toast" }, "new": { "id": 29, "name": "John Cleese", "fav_dish": "SPAM on toast" }, "diff": { "fav_dish": "SPAM on toast" } } }
Die Datenbank hat das Update-Ereignis an beide App-Instanzen zurückgemeldet. Wir haben aber noch keine Anfragen an die App-Instanz 8001 gestellt - bedeutet das, dass die neuen Daten dort bereits zwischengespeichert sind?
Um das zu überprüfen, können wir eine GET
auf dem zweiten Server auf Port 8001 ausführen:
$ curl -X GET http://localhost:8001/patron/29 {"id":29,"name":"John Cleese","fav_dish":"SPAM on toast"}
Die Timing-Informationen in der Log-Ausgabe zeigen, dass wir die Daten tatsächlich direkt aus dem Cache beziehen, auch wenn dies unsere erste Anfrage ist:
2019-10-02 08:46:45 - (perf)[INFO]: get Elapsed: 0.04 ms
Das Ergebnis ist, dass alle verbundenen App-Instanzen benachrichtigt werden, wenn sich die Datenbank ändert, damit sie ihre Caches aktualisieren können.
Nach dieser Erklärung können wir uns nun dieasyncpg
Code-Implementierung ansehen, die erforderlich ist, damit unsere Cache-Ungültigkeitserklärung tatsächlich funktioniert. Der grundlegende Entwurf für den in Beispiel 4-23 gezeigten Servercode sieht wie folgt aus:
-
Wir haben eine einfache Web-API, die das neue,
asyncio
-kompatibleSanic Web-Framework nutzt. -
Die Daten werden in einer PostgreSQL-Instanz im Backend gespeichert, aber die API wird über mehrere Instanzen der Web-API-App-Server bedient.
-
Die App-Server werden die Daten aus der Datenbank zwischenspeichern.
-
Die App-Server abonnieren Ereignisse über
asyncpg
in bestimmten Tabellen der DB und erhalten Aktualisierungsbenachrichtigungen, wenn die Daten in der DB-Tabelle geändert wurden. So können die App-Server ihre individuellen In-Memory-Caches aktualisieren.
Beispiel 4-23. API-Server mit Sanic
# sanic_demo.py
import
argparse
from
sanic
import
Sanic
from
sanic
.
views
import
HTTPMethodView
from
sanic
.
response
import
json
from
util
import
Database
from
perf
import
aelapsed
,
aprofiler
import
model
app
=
Sanic
(
)
@aelapsed
async
def
new_patron
(
request
)
:
data
=
request
.
json
id
=
await
model
.
add_patron
(
app
.
pool
,
data
)
return
json
(
dict
(
msg
=
'
ok
'
,
id
=
id
)
)
class
PatronAPI
(
HTTPMethodView
,
metaclass
=
aprofiler
)
:
async
def
get
(
self
,
request
,
id
)
:
data
=
await
model
.
get_patron
(
app
.
pool
,
id
)
return
json
(
data
)
async
def
put
(
self
,
request
,
id
)
:
data
=
request
.
json
ok
=
await
model
.
update_patron
(
app
.
pool
,
id
,
data
)
return
json
(
dict
(
msg
=
'
ok
'
if
ok
else
'
bad
'
)
)
async
def
delete
(
self
,
request
,
id
)
:
ok
=
await
model
.
delete_patron
(
app
.
pool
,
id
)
return
json
(
dict
(
msg
=
'
ok
'
if
ok
else
'
bad
'
)
)
@app
.
listener
(
'
before_server_start
'
)
async
def
db_connect
(
app
,
loop
)
:
app
.
db
=
Database
(
'
restaurant
'
,
owner
=
False
)
app
.
pool
=
await
app
.
db
.
connect
(
)
await
model
.
create_table_if_missing
(
app
.
pool
)
await
app
.
db
.
add_listener
(
'
chan_patron
'
,
model
.
db_event
)
@app
.
listener
(
'
after_server_stop
'
)
async
def
db_disconnect
(
app
,
loop
)
:
await
app
.
db
.
disconnect
(
)
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--port
'
,
type
=
int
,
default
=
8000
)
args
=
parser
.
parse_args
(
)
app
.
add_route
(
new_patron
,
'
/patron
'
,
methods
=
[
'
POST
'
]
)
app
.
add_route
(
PatronAPI
.
as_view
(
)
,
'
/patron/<id:int>
'
)
app
.
run
(
host
=
"
0.0.0.0
"
,
port
=
args
.
port
)
Das Hilfsprogramm
Database
, wie oben beschrieben. Er stellt die Methoden bereit, die für die Verbindung mit der Datenbank erforderlich sind.Zwei weitere Tools, die ich zusammengeschustert habe, um die verstrichene Zeit für jeden API-Endpunkt zu protokollieren. Ich habe dies in der vorherigen Diskussion verwendet, um zu erkennen, wann
GET
aus dem Cache zurückgegeben wurde. Die Implementierungen füraelapsed()
undaprofiler()
sind für diese Fallstudie nicht wichtig, aber du kannst sie in Beispiel B-1 erhalten.Wir erstellen die Hauptinstanz der Sanic App.
Diese Coroutine-Funktion ist für die Erstellung neuer Gönnereinträge zuständig. In einem
add_route()
-Aufruf am Ende des Codes istnew_patron()
mit dem Endpunkt/patron
verbunden, nur für die HTTP-MethodePOST
. Der@aelapsed
Dekorator ist nicht Teil der Sanic API: Er ist meine eigene Erfindung und dient lediglich dazu, die Zeiten für jeden Aufruf zu protokollieren.Sanic bietet eine sofortige Deserialisierung der empfangenen JSON-Daten, indem es das
.json
Attribut desrequest
Objekts verwendet.Das Modul
model
, das ich importiert habe, ist das Modell für unsere Tabellepatron
in der Datenbank. Im nächsten Code-Listing gehe ich näher darauf ein; im Moment musst du nur wissen, dass alle Datenbankabfragen und SQL in diesemmodel
Modul enthalten sind. Hier übergebe ich den Verbindungspool für die Datenbank, und das gleiche Muster wird für die gesamte Interaktion mit dem Datenbankmodell in dieser Funktion und in der KlassePatronAPI
weiter unten verwendet.Es wird ein neuer Primärschlüssel,
id
, erstellt, der als JSON an den Aufrufer zurückgegeben wird.Während die Erstellung in der Funktion
new_patron()
abgewickelt wird, werden alle anderen Interaktionen in dieser klassenbasierten Ansicht abgewickelt, die von Sanic zur Verfügung gestellt wird. Alle Methoden dieser Klasse sind mit derselben URL verbunden,/patron/<id:int>
, die du in der Funktionadd_route()
ganz unten sehen kannst. Beachte, dass der URL-Parameterid
an jede der Methoden übergeben wird, und dass dieser Parameter für alle drei Endpunkte erforderlich ist.Das Argument
metaclass
kannst du getrost ignorieren: Es dient lediglich dazu, jede Methode mit dem@aelapsed
Dekorator zu umhüllen, damit die Zeitangaben in den Protokollen ausgegeben werden. Auch dies ist nicht Teil der Sanic API, sondern meine eigene Erfindung, um Zeitdaten zu protokollieren.Die Interaktion mit dem Modell wird wie bisher im Modul
model
durchgeführt.Wenn das Modell einen Fehler bei der Aktualisierung meldet, ändere ich die Antwortdaten. Ich habe dies für Leser eingefügt, die Pythons Version des ternären Operators noch nicht kennen.
Die
@app.listener
Dekoratoren sind Hooks, die Sanic zur Verfügung stellt, um dir die Möglichkeit zu geben, zusätzliche Aktionen während des Starts und des Herunterfahrens hinzuzufügen. Dieser Dekorator,before_server_start
, wird vor dem Start des API-Servers aufgerufen. Dies scheint ein guter Ort zu sein, um unsere Datenbankverbindung zu initialisieren.Verwende den
Database
Helfer, um eine Verbindung zu unserer PostgreSQL-Instanz herzustellen. Die DB, mit der wir uns verbinden, istrestaurant
.Besorge dir einen Verbindungspool zu unserer Datenbank.
Verwende unser Modell (für die Tabelle
patron
), um die Tabelle zu erstellen, wenn sie fehlt.Verwende unser Modell, um einen dedicated_listener für Datenbankereignisse zu erstellen, der auf den Kanal
chan_patron
hört. Die Callback-Funktion für diese Ereignisse istmodel.db_event()
, die ich im nächsten Listing erläutern werde. Der Callback wird jedes Mal aufgerufen, wenn die Datenbank den Kanal aktualisiert.after_server_stop
ist der Haken für Aufgaben, die beim Herunterfahren passieren müssen. Hier trennen wir die Verbindung mit der Datenbank.Dieser
add_route()
Aufruf sendetPOST
Anfragen für die/patron
URL an dienew_patron()
Coroutine-Funktion.Dieser
add_route()
Aufruf sendet alle Anfragen für die/patron/<id:int>
URL an diePatronAPI
klassenbasierte Ansicht. Die Methodennamen in dieser Klasse bestimmen, welche Methode aufgerufen wird: EineGET
HTTP-Anfrage ruft diePatronAPI.get()
Methode auf, und so weiter.
Der vorangehende Code enthält die gesamte HTTP-Behandlung für unseren Server sowie Aufgaben zum Starten und Herunterfahren, wie das Einrichten eines Verbindungspools zur Datenbank und - ganz wichtig - das Einrichten eines db-event
Listeners auf dem chan_patron
Kanal auf dem DB-Server.
Beispiel 4-24 zeigt das Modell für die Tabelle patron
in der Datenbank.
Beispiel 4-24. DB-Modell für die Tabelle "patron
# model.py
import
logging
from
json
import
loads
,
dumps
from
triggers
import
(
create_notify_trigger
,
add_table_triggers
)
from
boltons
.
cacheutils
import
LRU
logger
=
logging
.
getLogger
(
'
perf
'
)
CREATE_TABLE
=
(
'
CREATE TABLE IF NOT EXISTS patron(
'
'
id serial PRIMARY KEY, name text,
'
'
fav_dish text)
'
)
INSERT
=
(
'
INSERT INTO patron(name, fav_dish)
'
'
VALUES ($1, $2) RETURNING id
'
)
SELECT
=
'
SELECT * FROM patron WHERE id = $1
'
UPDATE
=
'
UPDATE patron SET name=$1, fav_dish=$2 WHERE id=$3
'
DELETE
=
'
DELETE FROM patron WHERE id=$1
'
EXISTS
=
"
SELECT to_regclass(
'
patron
'
)
"
CACHE
=
LRU
(
max_size
=
65536
)
async
def
add_patron
(
conn
,
data
:
dict
)
-
>
int
:
return
await
conn
.
fetchval
(
INSERT
,
data
[
'
name
'
]
,
data
[
'
fav_dish
'
]
)
async
def
update_patron
(
conn
,
id
:
int
,
data
:
dict
)
-
>
bool
:
result
=
await
conn
.
execute
(
UPDATE
,
data
[
'
name
'
]
,
data
[
'
fav_dish
'
]
,
id
)
return
result
==
'
UPDATE 1
'
async
def
delete_patron
(
conn
,
id
:
int
)
:
result
=
await
conn
.
execute
(
DELETE
,
id
)
return
result
==
'
DELETE 1
'
async
def
get_patron
(
conn
,
id
:
int
)
-
>
dict
:
if
id
not
in
CACHE
:
logger
.
info
(
f
'
id=
{id}
Cache miss
'
)
record
=
await
conn
.
fetchrow
(
SELECT
,
id
)
CACHE
[
id
]
=
record
and
dict
(
record
.
items
(
)
)
return
CACHE
[
id
]
def
db_event
(
conn
,
pid
,
channel
,
payload
)
:
event
=
loads
(
payload
)
logger
.
info
(
'
Got DB event:
\n
'
+
dumps
(
event
,
indent
=
4
)
)
id
=
event
[
'
id
'
]
if
event
[
'
type
'
]
==
'
INSERT
'
:
CACHE
[
id
]
=
event
[
'
data
'
]
elif
event
[
'
type
'
]
==
'
UPDATE
'
:
CACHE
[
id
]
=
event
[
'
data
'
]
[
'
new
'
]
elif
event
[
'
type
'
]
==
'
DELETE
'
:
CACHE
[
id
]
=
None
async
def
create_table_if_missing
(
conn
)
:
if
not
await
conn
.
fetchval
(
EXISTS
)
:
await
conn
.
fetchval
(
CREATE_TABLE
)
await
create_notify_trigger
(
conn
,
channel
=
'
chan_patron
'
)
await
add_table_triggers
(
conn
,
table
=
'
patron
'
)
Du musst Trigger zur Datenbank hinzufügen, um Benachrichtigungen zu erhalten, wenn sich Daten ändern. Ich habe diese praktischen Helfer erstellt, um die Triggerfunktion selbst zu erstellen (mit
create_notify_trigger
) und um den Trigger zu einer bestimmten Tabelle hinzuzufügen (mitadd_table_triggers
). Die dafür erforderliche SQL-Anweisung würde den Rahmen dieses Buches sprengen, ist aber für das Verständnis der Funktionsweise dieses Fallbeispiels wichtig. Ich habe den kommentierten Code für diese Trigger inAnhang B aufgenommen.Das Drittanbieter-Paket
boltons
bietet eine Reihe nützlicher Tools, nicht zuletzt denLRU
Cache, der vielseitiger ist als der@lru_cache
Dekorator imfunctools
Standardbibliotheksmodul.3Dieser Textblock enthält das gesamte SQL für die Standard-CRUD-Operationen. Beachte, dass ich die native PostgreSQL-Syntax für die Parameter verwende:
$1
,$2
, und so weiter. Das ist nichts Neues und wird hier nicht weiter erläutert.Erstelle den Cache für diese App-Instanz.
Ich habe diese Funktion aus dem Sanic-Modul innerhalb des Endpunkts
new_patron()
aufgerufen, um neue Kunden hinzuzufügen. Innerhalb der Funktion verwende ich die Methodefetchval()
, um neue Daten einzufügen. Warumfetchval()
und nichtexecute()
? Weilfetchval()
den Primärschlüssel des neu eingefügten Datensatzes zurückgibt!4Aktualisiere einen bestehenden Datensatz. Wenn dies erfolgreich ist, gibt PostgreSQL
UPDATE 1
zurück. Ich verwende dies als Überprüfung, um zu sehen, ob die Aktualisierung erfolgreich war.Das Löschen ist dem Aktualisieren sehr ähnlich.
Dies ist die Leseoperation. Dies ist der einzige Teil unserer CRUD-Schnittstelle, der sich um den Cache kümmert. Denk mal kurz darüber nach: Wir aktualisieren den Cache nicht, wenn wir etwas einfügen, aktualisieren oder löschen. Das liegt daran, dass wir uns auf die asynchrone Benachrichtigung aus der Datenbank (über die installierten Trigger) verlassen, um den Cache zu aktualisieren, wenn Daten geändert werden.
Natürlich wollen wir den Cache auch nach dem ersten
GET
nutzen.Die Funktion
db_event()
ist der Callback, denasyncpg
ausführt, wenn es Ereignisse auf unserem DB-Benachrichtigungskanalchan_patron
gibt. Diese spezifische Parameterliste wird vonasyncpg
benötigt.conn
ist die Verbindung, auf der das Ereignis gesendet wurde,pid
ist die Prozess-ID der PostgreSQL-Instanz, die das Ereignis gesendet hat,channel
ist der Name des Kanals (in diesem Fallchan_patron
) und die Nutzlast sind die Daten, die über den Kanal gesendet werden.Deserialisiere die JSON-Daten in ein Dict.
Die Cache-Population ist im Allgemeinen recht einfach, aber beachte, dass Aktualisierungsereignisse sowohl neue als auch alte Daten enthalten, sodass wir sicherstellen müssen, dass nur die neuen Daten gecacht werden.
Dies ist eine kleine Hilfsfunktion, mit der du eine fehlende Tabelle ganz einfach wiederherstellen kannst. Das ist sehr nützlich, wenn du das häufig tun musst - zum Beispiel, wenn du die Codebeispiele für dieses Buch schreibst!
Hier werden auch die Datenbankbenachrichtigungs-Trigger erstellt und zu unserer Tabelle
patron
hinzugefügt. Siehe Beispiel B-1für eine kommentierte Auflistung dieser Funktionen.
Damit sind wir am Ende dieser Fallstudie angelangt. Wir haben gesehen, wie einfach es mit Sanic ist, einen API-Server zu erstellen. Wir haben gesehen, wie man asyncpg
für die Durchführung von Abfragen über einen Verbindungspool verwendet und wie man die asynchronen Benachrichtigungsfunktionen von PostgreSQL nutzt, um Rückrufe über eine dedizierte, langlebige Datenbankverbindung zu empfangen.
Viele Mitarbeiter von bevorzugen objektrelationale Mapper für die Arbeit mit Datenbanken, und in diesem Bereich ist SQLAlchemyführend. Es gibt immer mehr Unterstützung für die Verwendung von SQLAlchemy zusammen mit asyncpg
in Bibliotheken von Drittanbietern wie asyncpgsa
und GINO. Ein weiteres beliebtes ORM,Peewee, erhält Unterstützung fürasyncio
durch das aiopeewee
Paket.
Andere Bibliotheken und Ressourcen
Es gibt viele weitere Bibliotheken für asyncio
, die in diesem Buch nicht behandelt werden. Um mehr zu erfahren, kannst du dir dasProjektaio-libs
ansehen, das fast 40 Bibliotheken verwaltet, und dasProjekt Awesome asyncio
, das viele andere Projekte bucht, die mit dem Modul asyncio
kompatibel sind.
Eine Bibliothek, die besonders zu erwähnen ist, istaiofiles
. Wie du dich vielleicht aus unseren früheren Diskussionen erinnerst, habe ich gesagt, dass es für eine hohe Gleichzeitigkeit in Asyncio von entscheidender Bedeutung ist, dass die Schleife niemals blockiert. In diesem Zusammenhang haben wir uns ausschließlich auf blockierende Operationen im Netzwerk konzentriert, aber es hat sich herausgestellt, dass auch der Festplattenzugriff eine blockierende Operation ist, die sich bei sehr hoher Gleichzeitigkeit auf die Leistung auswirkt. Die Lösung für dieses Problem ist aiofiles
, das einen praktischen Wrapper für den Festplattenzugriff in einem Thread bietet. Das funktioniert, weil Python die GIL während der Datei-Operationen freigibt, so dass dein Haupt-Thread (der die asyncio
Schleife ausführt) davon nicht betroffen ist.
Die wichtigste Domäne für Asyncio wird die Netzwerkprogrammierung sein. Deshalb ist es keine schlechte Idee, ein wenig über Socket-Programmierung zu lernen, und auch nach all den Jahren ist Gordon McMillans"Socket Programming HOWTO", das in der Standard-Python-Dokumentation enthalten ist, eine der besten Einführungen, die du finden kannst.
Ich habe Asyncio aus einer Vielzahl von Quellen gelernt, von denen viele bereits in früheren Abschnitten erwähnt wurden. Jeder lernt anders, deshalb lohnt es sich, verschiedene Arten von Lernmaterialien zu erkunden. Hier sind ein paar andere, die ich nützlich fand:
-
Robert Smallshires Vortrag "Get to Grips with Asyncio in Python 3", gehalten beim NDC London im Januar 2017. Das ist bei weitem das beste YouTube-Video über Asyncio, das ich kenne. Der Vortrag mag für Anfänger/innen etwas fortgeschritten sein, aber er beschreibt sehr anschaulich, wie Asyncio aufgebaut ist.
-
Nikolay Noviks Folien zu "Building Apps with Asyncio", präsentiert auf der PyCon UA 2016. Die Informationen sind dicht, aber in diesen Folien sind viele praktische Erfahrungen festgehalten.
-
Endlose Sitzungen in der Python REPL, Dinge ausprobieren und "sehen, was passiert".
Ich ermutige dich, weiter zu lernen, und wenn du ein Konzept nicht verstehst, suche weiter nach neuen Quellen, bis du eine Erklärung findest, die für dich stimmt.
1 Eigentlich schon, solange die Sockets, die in verschiedenen Threads verwendet werden, vollständig in ihren eigenen Threads erstellt, verwendet und zerstört werden. Das ist zwar möglich, aber schwer zu bewerkstelligen, und viele Leute tun sich schwer, das richtig hinzubekommen. Deshalb ist die Empfehlung, einen einzigen Thread und einen Polling-Mechanismus zu verwenden, so wichtig.
2 Das Rezept für dieses Gericht und für andere leckere Gerichte aus Spam findest du auf der UKTV-Website.
3 Bekomme boltons
mit pip install boltons
.
4 Du brauchst aber auch den RETURNING id
Teil des SQLs!
Get Asyncio in Python verwenden now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.