Kapitel 4. 20 Asyncio-Bibliotheken, die du nicht verwendest (aber... Ach, egal)

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

In diesem Kapitel sehen wir uns Fallstudien an, in denen wir die neuen Python-Funktionen für die asynchrone Programmierung nutzen. Wir werden verschiedene Bibliotheken von Drittanbietern verwenden, wie du es auch in deinen eigenen Projekten tun wirst.

Der Titel dieses Kapitels ist eine Anspielung auf den Titel meines früheren Buches20 Python Libraries You Aren't Using (But Should) (O'Reilly). Viele dieser Bibliotheken werden auch in deinen asyncio-basierten Anwendungen nützlich sein, aber dieses Kapitel konzentriert sich auf Bibliotheken, die speziell für die neuen asynchronen Funktionen in Python entwickelt wurden.

Es ist schwierig, asyncio-basierten Code in kurzen Schnipseln zu präsentieren. Wie du in den bisherigen Codebeispielen in diesem Buch gesehen hast, habe ich versucht, aus jedem Beispiel ein vollständiges, lauffähiges Programm zu machen, denn die Verwaltung der Anwendungslebensdauer ist ein zentraler Aspekt für die richtige Anwendung der asynchronen Programmierung.

Aus diesem Grund sind die meisten Fallstudien in diesem Kapitel etwas umfangreicher, was die Anzahl der Codezeilen angeht, als es für ein solches Buch üblich ist. Mit diesem Ansatz möchte ich die Fallstudien nützlicher machen, indem ich dir einen "Gesamtüberblick" über ein asynchrones Programm vermittle, anstatt dich damit zu überlassen, herauszufinden, wie einzelne Fragmente zusammenpassen könnten.

Hinweis

Einige der Codebeispiele in diesem Kapitel gehen Kompromisse beim Stil ein, um Platz zu sparen. Ich mag PEP8 genauso sehr wie jeder andere Pythonista, aber Praktikabilität schlägt Reinheit!

Streams (Standardbibliothek)

Bevor wir uns die Bibliotheken von Drittanbietern ansehen, beginnen wir mit der Standardbibliothek. Die Streams-APIist die High-Level-Schnittstelle, die für die asynchrone Socket-Programmierung angeboten wird, und wie die folgende Fallstudie zeigen wird, ist sie ziemlich einfach zu benutzen. Das Anwendungsdesign bleibt jedoch komplex, weil es sich um eine Domäne handelt.

Die folgende Fallstudie zeigt die Implementierung eines Nachrichtenmaklers mit einem anfänglich naiven Design, gefolgt von einem durchdachteren Design. Keine der beiden Implementierungen sollte als produktionsreif angesehen werden. Mein Ziel ist es, dir zu helfen, über die verschiedenen Aspekte der gleichzeitigen Netzwerkprogrammierung nachzudenken, die beim Entwurf solcher Anwendungen berücksichtigt werden müssen.

Fallstudie: Eine Nachrichten-Warteschlange

Ein Message-Queue-Dienst ist eine Backend-Anwendung, die Verbindungen von anderen Anwendungen empfängt und Nachrichten zwischen diesen verbundenen Diensten weiterleitet, die oft als Publisher und Subscriber bezeichnet werden. Abonnenten hören in der Regel bestimmte Kanäle ab, um Nachrichten zu erhalten, und in der Regel ist es möglich, die Nachrichtenverteilung in verschiedenen Kanälen auf zwei Arten zu konfigurieren: Nachrichten können an alle Abonnenten eines Kanals verteilt werden(pub-sub), oder eine andere Nachricht kann an jeden Abonnenten einzeln gehen(point-to-point).

Vor kurzem habe ich an einem Projekt gearbeitet, bei demActiveMQ als Message Broker für die Kommunikation zwischen Microservices eingesetzt wurde. Auf einer grundlegenden Ebene ist ein solcher Broker (Server):

  • Unterhält dauerhafte Socket-Verbindungen zu mehreren Clients

  • Empfängt Nachrichten von Clients mit einem Zielkanalnamen

  • Leitet diese Nachrichten an alle anderen Clients weiter, die denselben Kanalnamen abonniert haben

Ich erinnere mich, dass ich mich gefragt habe, wie schwer es sein könnte, eine solche Anwendung zu erstellen. Außerdem kann ActiveMQ beide Modelle der Nachrichtenverteilung durchführen, und die beiden Modelle werden in der Regel durch den Kanalnamen unterschieden:

  • Kanalnamen mit dem Präfix /topic (z. B. /topic/customer/registration) werden nach dem Muster pub-subverwaltet, wobei alle Kanalabonnenten alle Nachrichten erhalten.

  • Kanalnamen mit dem Präfix /queue werden mit demPunkt-zu-Punkt-Modellgehandhabt, bei dem Nachrichten auf einem Kanal zwischen den Kanalteilnehmern in einem Rundlaufverfahren verteilt werden: Jeder Teilnehmer erhält eine eindeutige Nachricht.

In unserer Fallstudie werden wir einen kleinen Message Broker mit diesen grundlegenden Funktionen bauen. Das erste Problem, das wir angehen müssen ist, dass TCP kein nachrichtenbasiertes Protokoll ist: Wir bekommen nur Byteströme auf dem Draht. Wir müssen ein eigenes Protokoll für die Struktur der Nachrichten erstellen. Das einfachste Protokoll besteht darin, jeder Nachricht einen Header mit einer bestimmten Größe voranzustellen, gefolgt von einer Nutzlast in dieser Größe. Die Hilfsbibliothek in Beispiel 4-1 bietet Lese- und Schreibfunktionen für solche Nachrichten.

Beispiel 4-1. Nachrichtenprotokoll: Lesen und Schreiben
# msgproto.py
from asyncio import StreamReader, StreamWriter

async def read_msg(stream: StreamReader) -> bytes:
    size_bytes = await stream.readexactly(4)  1
    size = int.from_bytes(size_bytes, byteorder='big')  2
    data = await stream.readexactly(size)  3
    return data

async def send_msg(stream: StreamWriter, data: bytes):
    size_bytes = len(data).to_bytes(4, byteorder='big')
    stream.writelines([size_bytes, data])  4
    await stream.drain()
1

Erhalte die ersten 4 Bytes. Dies ist das Größenpräfix.

2

Diese 4 Bytes müssen in eine Ganzzahl umgewandelt werden.

3

Jetzt kennen wir die Größe der Nutzlast, also lesen wir sie aus dem Stream.

4

Schreiben ist die Umkehrung von Lesen: Zuerst senden wir die Länge der Daten, kodiert als 4 Bytes, und danach die Daten.

Da wir nun ein rudimentäres Nachrichtenprotokoll haben, können wir uns auf die Message Broker Anwendung in Beispiel 4-2 konzentrieren.

Beispiel 4-2. Ein 40-Zeilen-Prototyp-Server
# mq_server.py
import asyncio
from asyncio import StreamReader, StreamWriter, gather
from collections import deque, defaultdict
from typing import Deque, DefaultDict
from msgproto import read_msg, send_msg  1

SUBSCRIBERS: DefaultDict[bytes, Deque] = defaultdict(deque) 2

async def client(reader: StreamReader, writer: StreamWriter):
  peername = writer.get_extra_info('peername')  3
  subscribe_chan = await read_msg(reader)  4
  SUBSCRIBERS[subscribe_chan].append(writer)  5
  print(f'Remote {peername} subscribed to {subscribe_chan}')
  try:
    while channel_name := await read_msg(reader):  6
      data = await read_msg(reader)  7
      print(f'Sending to {channel_name}: {data[:19]}...')
      conns = SUBSCRIBERS[channel_name]  8
      if conns and channel_name.startswith(b'/queue'):  9
          conns.rotate()  10
          conns = [conns[0]]  11
      await gather(*[send_msg(c, data) for c in conns]) 12
  except asyncio.CancelledError:
    print(f'Remote {peername} closing connection.')
    writer.close()
    await writer.wait_closed()
  except asyncio.IncompleteReadError:
    print(f'Remote {peername} disconnected')
  finally:
    print(f'Remote {peername} closed')
    SUBSCRIBERS[subscribe_chan].remove(writer)  13

async def main(*args, **kwargs):
    server = await asyncio.start_server(*args, **kwargs)
    async with server:
        await server.serve_forever()

try:
    asyncio.run(main(client, host='127.0.0.1', port=25000))
except KeyboardInterrupt:
    print('Bye!')
1

Importiert von unserem Modul msgproto.py.

2

Eine globale Sammlung von derzeit aktiven Abonnenten. Jedes Mal, wenn sich ein Kunde verbindet, muss er zuerst den Namen des Kanals senden, den er abonniert. Eine Deque enthält alle Abonnenten für einen bestimmten Kanal.

3

Die Coroutine-Funktion client() erzeugt für jede neue Verbindung eine langlebige Coroutine. Sie ist sozusagen ein Callback für den TCP-Server, der in main() gestartet wurde. In dieser Zeile habe ich gezeigt, wie der Host und der Port der Gegenstelle ermittelt werden können, zum Beispiel für die Protokollierung.

4

Unser Protokoll für Kunden sieht wie folgt aus:

  • Beim ersten Verbindungsaufbau muss ein Client eine Nachricht mit dem Kanal senden, den er abonnieren möchte (hier subscribe_chan).

  • Danach sendet ein Kunde während der gesamten Dauer der Verbindung eine Nachricht an einen Kanal, indem er zunächst eine Nachricht mit dem Namen des Zielkanals und anschließend eine Nachricht mit den Daten sendet. Unser Broker sendet solche Datennachrichten an alle Kunden, die diesen Kanalnamen abonniert haben.

5

Füge die Instanz StreamWriter zu der globalen Sammlung von Abonnenten hinzu.

6

Eine Endlosschleife, die auf Daten von diesem Kunden wartet. Die erste Nachricht von einem Client muss der Name des Zielkanals sein.

7

Als Nächstes kommen die eigentlichen Daten, die an den Kanal verteilt werden sollen.

8

Ermittelt die Anzahl der Abonnenten des Zielkanals.

9

Eine Sonderbehandlung, wenn der Kanalname mit dem magischen Wort/queue beginnt: In diesem Fall senden wir die Daten nur an einen der Abonnenten, nicht an alle. Dies kann für die Arbeitsteilung zwischen mehreren Arbeitern genutzt werden, anstatt des üblichen Pub-Sub-Benachrichtigungsschemas, bei dem alle Abonnenten eines Kanals alle Nachrichten erhalten.

10

Das ist der Grund, warum wir eine Deque und keine Liste verwenden: Durch die Rotation der Deque behalten wir den Überblick, welcher Kunde als nächster an der Reihe ist, um /queue zu verteilen. Das scheint teuer zu sein, bis du merkst, dass eine einzelne Deque-Rotation eine O(1)-Operation ist.

11

Nimm nur den Kunden ins Visier, der an erster Stelle steht; dies ändert sich nach jeder Rotation.

12

Erstelle eine Liste von Coroutines für das Senden der Nachricht an jeden Writer und packe diese dann in gather() aus, damit wir warten können, bis alle Sendungen abgeschlossen sind.

Diese Zeile ist ein schlimmer Fehler in unserem Programm, aber es ist vielleicht nicht klar, warum: Es mag zwar stimmen, dass alle Sendungen an jeden Abonnenten gleichzeitig erfolgen, aber was passiert, wenn wir einen sehr langsamen Client haben? In diesem Fall wird die gather() erst dann beendet, wenn der langsamste Teilnehmer seine Daten erhalten hat. Wir können keine weiteren Daten von dem sendenden Client empfangen, bis alle send_msg() Coroutines beendet sind. Dadurch wird die gesamte Nachrichtenverteilung auf die Geschwindigkeit des langsamsten Teilnehmers verlangsamt.

13

Wenn wir die Coroutine client() verlassen, müssen wir uns aus der globalen SUBSCRIBERS Sammlung entfernen. Leider ist dies eine O(n)-Operation, die bei einem sehr großen n etwas teuer werden kann. Eine andere Datenstruktur würde dies beheben, aber im Moment trösten wir uns mit dem Wissen, dass die Verbindungen langlebig sein sollen - es sollte also nur wenige Verbindungsabbrüche geben - und dass n wahrscheinlich nicht sehr groß ist (sagen wir ~10.000 als grobe Schätzung) und dieser Code zumindest leicht zu verstehen ist.

Das ist also unser Server; jetzt brauchen wir noch Clients, damit wir eine Ausgabe zeigen können. Zu Demonstrationszwecken werde ich zwei Arten von Clients erstellen: einenSender und einen Hörer. Der Server macht keinen Unterschied; alle Clients sind gleich. Die Unterscheidung zwischen Absender- und Zuhörerverhalten dient nur zu Lehrzwecken. Beispiel 4-3 zeigt den Code für die Listener-Anwendung.

Beispiel 4-3. Listener: ein Toolkit zum Abhören von Nachrichten auf unserem Message Broker
# mq_client_listen.py
import asyncio
import argparse, uuid
from msgproto import read_msg, send_msg

async def main(args):
  me = uuid.uuid4().hex[:8] 1
  print(f'Starting up {me}')
  reader, writer = await asyncio.open_connection(
    args.host, args.port) 2
  print(f'I am {writer.get_extra_info("sockname")}')
  channel = args.listen.encode()  3
  await send_msg(writer, channel)  4
  try:
    while data := await read_msg(reader):  5
      print(f'Received by {me}: {data[:20]}')
    print('Connection ended.')
  except asyncio.IncompleteReadError:
    print('Server closed.')
  finally:
    writer.close()
    await writer.wait_closed()


if __name__ == '__main__':
  parser = argparse.ArgumentParser() 6
  parser.add_argument('--host', default='localhost')
  parser.add_argument('--port', default=25000)
  parser.add_argument('--listen', default='/topic/foo')
  try:
    asyncio.run(main(parser.parse_args()))
  except KeyboardInterrupt:
    print('Bye!')
1

Mit dem Modul uuid aus der Standardbibliothek kannst du bequem eine "Identität" für diesen Listener erstellen. Wenn du mehrere Instanzen startest, hat jede ihre eigene Identität und du kannst in den Protokollen nachvollziehen, was passiert.

2

Öffne eine Verbindung zum Server.

3

Der Kanal, der abonniert werden soll, ist ein Eingabeparameter, der inargs.listen erfasst wird. Kodiere ihn vor dem Senden in Bytes.

4

Nach unseren Protokollregeln (wie in der Broker-Code-Analyse zuvor besprochen) ist das erste, was nach dem Verbindungsaufbau zu tun ist, den Namen des Kanals zu senden, den du abonnieren willst.

5

Diese Schleife tut nichts anderes, als darauf zu warten, dass Daten auf dem Socket erscheinen.

6

Mit den Befehlszeilenargumenten für dieses Programm kannst du ganz einfach einen Host, einen Port und einen Kanalnamen angeben, auf den du hören willst.

Der Code für den anderen Client, das in Beispiel 4-4 gezeigte Absenderprogramm, ist ähnlich aufgebaut wie das Hörermodul.

Beispiel 4-4. Sender: ein Toolkit zum Senden von Daten an unseren Message Broker
# mq_client_sender.py
import asyncio
import argparse, uuid
from itertools import count
from msgproto import send_msg

async def main(args):
    me = uuid.uuid4().hex[:8]  1
    print(f'Starting up {me}')
    reader, writer = await asyncio.open_connection(
        host=args.host, port=args.port)  2
    print(f'I am {writer.get_extra_info("sockname")}')

    channel = b'/null'  3
    await send_msg(writer, channel) 4

    chan = args.channel.encode()  5
    try:
        for i in count():  6
            await asyncio.sleep(args.interval)  7
            data = b'X'*args.size or f'Msg {i} from {me}'.encode()
            try:
                await send_msg(writer, chan)
                await send_msg(writer, data) 8
            except OSError:
                print('Connection ended.')
                break
    except asyncio.CancelledError:
        writer.close()
        await writer.wait_closed()

if __name__ == '__main__':
  parser = argparse.ArgumentParser()  9
  parser.add_argument('--host', default='localhost')
  parser.add_argument('--port', default=25000, type=int)
  parser.add_argument('--channel', default='/topic/foo')
  parser.add_argument('--interval', default=1, type=float)
  parser.add_argument('--size', default=0, type=int)
  try:
    asyncio.run(main(parser.parse_args()))
  except KeyboardInterrupt:
    print('Bye!')
1

Wie bei der Zuhörerin oder dem Zuhörer solltest du eine Identität einfordern.

2

Reiche die Hand und stelle eine Verbindung her.

3

Laut unseren Protokollregeln ist das erste, was nach der Verbindung mit dem Server zu tun ist, den Namen des Kanals anzugeben, den wir abonnieren wollen. Da wir aber ein Sender sind, ist es uns eigentlich egal, ob wir einen Kanal abonnieren wollen. Trotzdem verlangt das Protokoll dies, also geben wir einfach einen Null-Kanal an, den wir abonnieren wollen (wir werden nicht wirklich auf etwas hören).

4

Sende den Kanal, den du abonnieren willst.

5

Der Kommandozeilenparameter args.channel gibt den Kanal an, an denwir Nachrichten senden wollen. Er muss vor dem Senden zunächst in Bytes umgewandelt werden.

6

itertools.count() ist wie eine while True Schleife, mit dem Unterschied, dass wir eine Iterationsvariable erhalten, die wir verwenden können. Wir verwenden diese in den Debugging-Meldungen, da es so etwas einfacher ist, nachzuvollziehen, welche Meldung von wo aus gesendet wurde.

7

Die Verzögerung zwischen den gesendeten Nachrichten ist ein Eingabeparameter,args.interval. Die nächste Zeile erzeugt die Nutzlast der Nachricht. Es handelt sich entweder um einen Bytestring der angegebenen Größe (args.size) oder um eine beschreibende Nachricht. Diese Flexibilität ist nur zum Testen gedacht.

8

Beachte, dass hier zwei Nachrichten gesendet werden: Die erste ist der Name des Zielkanals und die zweite ist die Nutzlast.

9

Wie beim Hörer gibt es auch für den Sender eine Reihe von Kommandozeilenoptionen, mit denen er eingestellt werden kann: channel legt den Zielkanal fest, an den gesendet werden soll, während interval die Verzögerung zwischen den Sendungen bestimmt. Der Parameter size bestimmt die Größe der Nutzlast jeder Nachricht.

Jetzt haben wir einen Broker, einen Listener und einen Sender; es ist Zeit, etwas zu sehen. Um die folgenden Codeschnipsel zu erzeugen, habe ich den Server gestartet, dann zwei Listener und dann einen Sender. Nachdem ein paar Nachrichten verschickt worden waren, habe ich den Server mit Strg-C gestoppt. Die Ausgabe des Servers ist in Beispiel 4-5 zu sehen, die des Senders in Beispiel 4-6 und die des Listeners in Beispiel 4-7 und 4-8.

Beispiel 4-5. Ausgabe des Nachrichtenbrokers (Server)
$ mq_server.py
Remote ('127.0.0.1', 55382) subscribed to b'/queue/blah'
Remote ('127.0.0.1', 55386) subscribed to b'/queue/blah'
Remote ('127.0.0.1', 55390) subscribed to b'/null'
Sending to b'/queue/blah': b'Msg 0 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 1 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 2 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 3 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 4 from 6b5a8e1d'...
Sending to b'/queue/blah': b'Msg 5 from 6b5a8e1d'...
^CBye!
Remote ('127.0.0.1', 55382) closing connection.
Remote ('127.0.0.1', 55382) closed
Remote ('127.0.0.1', 55390) closing connection.
Remote ('127.0.0.1', 55390) closed
Remote ('127.0.0.1', 55386) closing connection.
Remote ('127.0.0.1', 55386) closed
Beispiel 4-6. Absender (Client) Ausgabe
$ mq_client_sender.py --channel /queue/blah
Starting up 6b5a8e1d
I am ('127.0.0.1', 55390)
Connection ended.
Beispiel 4-7. Ausgabe von Listener 1 (Client)
$ mq_client_listen.py --listen /queue/blah
Starting up 9ae04690
I am ('127.0.0.1', 55382)
Received by 9ae04690: b'Msg 1 from 6b5a8e1d'
Received by 9ae04690: b'Msg 3 from 6b5a8e1d'
Received by 9ae04690: b'Msg 5 from 6b5a8e1d'
Server closed.
Beispiel 4-8. Ausgabe von Listener 2 (Client)
$ mq_client_listen.py --listen /queue/blah
Starting up bd4e3baa
I am ('127.0.0.1', 55386)
Received by bd4e3baa: b'Msg 0 from 6b5a8e1d'
Received by bd4e3baa: b'Msg 2 from 6b5a8e1d'
Received by bd4e3baa: b'Msg 4 from 6b5a8e1d'
Server closed.

Unser Spielzeug-Messenger funktioniert. Der Code ist auch ziemlich einfach zu verstehen, wenn man bedenkt, dass es sich um eine so komplexe Problemdomäne handelt, aber leider ist das Design des Broker-Codes selbst problematisch.

Das Problem ist, dass wir für einen bestimmten Client Nachrichten an die Abonnenten in der gleichen Coroutine senden, in der auch neue Nachrichten empfangen werden. Das bedeutet, dass es sehr lange dauern kann, bis die Zeile await gather(...) in Beispiel 4-2 abgeschlossen ist, wenn ein Abonnent die gesendeten Nachrichten nur langsam konsumiert, und wir in dieser Zeit keine weiteren Nachrichten empfangen und verarbeiten können.

Stattdessen müssen wir den Empfang von Nachrichten vom Senden von Nachrichten entkoppeln. In der nächsten Fallstudie überarbeiten wir unseren Code, um genau das zu tun.

Fallstudie: Verbesserung der Nachrichtenwarteschlange

In dieser Fallstudie verbessern wir das Design unseres Spielzeug-Nachrichtenbrokers. Die Programme für die Zuhörer und Absender bleiben unverändert. Die besondere Verbesserung des neuen Broker-Designs besteht darin, das Senden und Empfangen von Nachrichten zu entkoppeln. Damit wird das Problem gelöst, dass ein langsamer Abonnent auch den Empfang neuer Nachrichten verlangsamt, wie im vorherigen Abschnitt beschrieben. Der neue Code, der in Beispiel 4-9 gezeigt wird, ist zwar etwas länger, aber nicht sehr viel länger.

Beispiel 4-9. Nachrichtenbroker: verbessertes Design
# mq_server_plus.py
import asyncio
from asyncio import StreamReader, StreamWriter, Queue
from collections import deque, defaultdict
from contextlib import suppress
from typing import Deque, DefaultDict, Dict
from msgproto import read_msg, send_msg

SUBSCRIBERS: DefaultDict[bytes, Deque] = defaultdict(deque)
SEND_QUEUES: DefaultDict[StreamWriter, Queue] = defaultdict(Queue)
CHAN_QUEUES: Dict[bytes, Queue] = {}  1

async def client(reader: StreamReader, writer: StreamWriter):
  peername = writer.get_extra_info('peername')
  subscribe_chan = await read_msg(reader)
  SUBSCRIBERS[subscribe_chan].append(writer)  2
  send_task = asyncio.create_task(
      send_client(writer, SEND_QUEUES[writer]))  3
  print(f'Remote {peername} subscribed to {subscribe_chan}')
  try:
    while channel_name := await read_msg(reader):
      data = await read_msg(reader)
      if channel_name not in CHAN_QUEUES:  4
        CHAN_QUEUES[channel_name] = Queue(maxsize=10)  5
        asyncio.create_task(chan_sender(channel_name))  6
      await CHAN_QUEUES[channel_name].put(data)  7
  except asyncio.CancelledError:
    print(f'Remote {peername} connection cancelled.')
  except asyncio.IncompleteReadError:
    print(f'Remote {peername} disconnected')
  finally:
    print(f'Remote {peername} closed')
    await SEND_QUEUES[writer].put(None)  8
    await send_task  9
    del SEND_QUEUES[writer]  10
    SUBSCRIBERS[subscribe_chan].remove(writer)

async def send_client(writer: StreamWriter, queue: Queue):  11
    while True:
        try:
            data = await queue.get()
        except asyncio.CancelledError:
            continue

        if not data:
            break

        try:
            await send_msg(writer, data)
        except asyncio.CancelledError:
            await send_msg(writer, data)

    writer.close()
    await writer.wait_closed()

async def chan_sender(name: bytes):
    with suppress(asyncio.CancelledError):
        while True:
            writers = SUBSCRIBERS[name]
            if not writers:
                await asyncio.sleep(1)
                continue  12
            if name.startswith(b'/queue'):  13
                writers.rotate()
                writers = [writers[0]]
            if not (msg := await CHAN_QUEUES[name].get()): 14
                break
            for writer in writers:
                if not SEND_QUEUES[writer].full():
                    print(f'Sending to {name}: {msg[:19]}...')
                    await SEND_QUEUES[writer].put(msg)  15

async def main(*args, **kwargs):
    server = await asyncio.start_server(*args, **kwargs)
    async with server:
        await server.serve_forever()
try:
    asyncio.run(main(client, host='127.0.0.1', port=25000))
except KeyboardInterrupt:
    print('Bye!')
1

In der vorherigen Implementierung gab es nurSUBSCRIBERS; jetzt gibt es SEND_QUEUES und CHAN_QUEUES als globale Sammlungen. Dies ist eine Folge der vollständigen Entkopplung von Datenempfangund -versand. SEND_QUEUES hat einen Warteschlangeneintrag für jede Client-Verbindung: Alle Daten, die an diesen Client gesendet werden müssen, müssen in diese Warteschlange gestellt werden. (Wenn du einen Blick in die Zukunft wirfst, holt die Coroutine send_client() die Daten von SEND_QUEUES und sendet sie.)

2

Bis zu diesem Punkt in der client() Coroutine-Funktion ist der Code derselbe wie beim einfachen Server: Der Name des abonnierten Kanals wird empfangen, und wir fügen die StreamWriter Instanz für den neuen Client zur globalen SUBSCRIBERS Sammlung hinzu.

3

Das ist neu: Wir erstellen eine langlebige Aufgabe, die alle Daten an diesen Client sendet. Die Aufgabe wird unabhängig als eigene Coroutine ausgeführt und holt sich die Nachrichten aus der bereitgestellten Warteschlange SEND_QUEUES[writer], um sie zu versenden.

4

Jetzt befinden wir uns in der Schleife, in der wir die Daten empfangen. Erinnere dich daran, dass wir immer zwei Nachrichten erhalten: eine für den Namen des Zielkanals und eine für die Daten. Wir erstellen eine neue, dedizierte Queue für jeden Zielkanal. Dafür ist CHAN_QUEUES da: Wenn ein Client Daten in einen Kanal pushen will, stellen wir diese Daten in die entsprechende Warteschlange und hören dann sofort wieder auf weitere Daten. Dieser Ansatz entkoppelt das Verteilen von Nachrichten vom Empfangen von Nachrichten von diesem Client.

5

Wenn es noch keine Warteschlange für den Zielkanal gibt, erstelle eine.

6

Erstelle eine spezielle und langlebige Aufgabe für diesen Kanal. Die Coroutinechan_sender() ist dafür zuständig, Daten aus der Warteschlange des Kanals zu nehmen und sie an die Abonnenten zu verteilen.

7

Platziere die neu empfangenen Daten in der Warteschlange des jeweiligen Kanals. Wenn sich die Warteschlange füllt, warten wir hier, bis Platz für die neuen Daten ist. Hier zu warten bedeutet, dass wir keine neuen Daten aus dem Socket lesen, was bedeutet, dass der Client darauf warten muss, neue Daten in den Socket zu senden. Das ist nicht unbedingt etwas Schlechtes, da es dem Client den so genannten Gegendruck vermittelt (alternativ kannst du hier auch Nachrichten absetzen, wenn der Anwendungsfall das zulässt).

8

Wenn die Verbindung geschlossen ist, ist es Zeit, aufzuräumen. Die langlebige Aufgabe, die wir für das Senden von Daten an diesen Kunden erstellt haben, send_task, kann beendet werden, indem None in die Warteschlange SEND_QUEUES[writer] gestellt wird (schau dir den Code für send_client() an). Es ist wichtig, einen Wert für die Warteschlange zu verwenden, anstatt sie einfach abzubrechen, denn es kann sein, dass sich bereits Daten in der Warteschlange befinden und wir wollen, dass diese Daten versendet werden, bevor send_client() beendet wird.

9

Warte, bis die Absenderaufgabe beendet ist...

10

...dann entfernen wir den Eintrag in der Sammlung SEND_QUEUES (und in der nächsten Zeile entfernen wir auch sock aus der Sammlung SUBSCRIBERS wie zuvor).

11

Die Coroutine-Funktion send_client() ist fast schon ein Lehrbuchbeispiel dafür, wie man Arbeit aus einer Warteschlange abzieht. Beachte, dass die Coroutine nur beendet wird, wenn None in die Warteschlange gestellt wird. Beachte auch, wie wir CancelledError innerhalbder Schleife unterdrücken: Das liegt daran, dass wir diese Aufgabe nur dann beenden wollen, wenn wir ein None in der Warteschlange erhalten. Auf diese Weise können alle anstehenden Daten in der Warteschlange vor dem Beenden gesendet werden.

12

chan_sender() ist die Verteilungslogik für einen Kanal: Sie sendet Daten von einer speziellen Instanz des Kanals Queue an alle Abonnenten dieses Kanals. Aber was passiert, wenn es noch keine Abonnenten für diesen Kanal gibt? Wir warten einfach ein bisschen und versuchen es dann erneut. (Beachte aber, dass sich die Warteschlange für diesen Kanal, CHAN_QUEUES[name], weiter füllt).

13

Wie in unserer vorherigen Broker-Implementierung machen wir etwas Besonderes für Kanäle, deren Name mit /queue beginnt: Wir rotieren die Warteschlange und senden nur an den ersten Eintrag. Dies wirkt wie ein grobes Lastausgleichssystem, da jeder Abonnent unterschiedliche Nachrichten aus derselben Warteschlange erhält. Bei allen anderen Kanälen erhalten alle Abonnenten alle Nachrichten.

14

Wir warten hier auf Daten in der Warteschlange und beenden den Vorgang, wenn None empfangen wird. Zurzeit wird dies nirgendwo ausgelöst (so dass diese chan_sender()Koroutinen ewig leben), aber wenn eine Logik hinzugefügt würde, um diese Channel-Tasks nach einer gewissen Zeit der Inaktivität aufzuräumen, würde es so gemacht werden.

15

Die Daten wurden empfangen, also ist es an der Zeit, sie an die Abonnenten zu senden. Hier wird nicht gesendet, sondern die Daten werden in die eigene Sendewarteschlange eines jeden Teilnehmers gestellt. Diese Entkopplung ist notwendig, um sicherzustellen, dass ein langsamer Abonnent nicht alle anderen, die Daten empfangen, ausbremst. Und wenn der Abonnent so langsam ist, dass sich seine Warteschlange füllt, werden die Daten nicht in seine Warteschlange gestellt, d. h. sie gehen verloren.

Das vorangegangene Design erzeugt die gleiche Ausgabe wie die frühere, vereinfachte Implementierung, aber jetzt können wir sicher sein, dass ein langsamer Listener die Nachrichtenverteilung an andere Listener nicht stört.

Diese beiden Fallstudien zeigen, wie sich die Überlegungen zum Entwurf eines Nachrichtenverteilungssystems weiterentwickelt haben. Ein wichtiger Aspekt war die Erkenntnis, dass das Senden und Empfangen von Daten je nach Anwendungsfall am besten in separaten Coroutines abgewickelt werden sollte. In solchen Fällen können Warteschlangen sehr nützlich sein, um Daten zwischen den verschiedenen Coroutines zu verschieben und sie durch Pufferung zu entkoppeln.

Das wichtigere Ziel dieser Fallstudien war es, zu zeigen, wie die Streams-API in asyncio es sehr einfach macht, Socket-basierte Anwendungen zu erstellen.

Verdreht

Das Twisted-Projekt ist ein Vorläufer derasyncio Standardbibliothek und steht seit etwa 14 Jahren für die asynchrone Programmierung in Python. Das Projekt stellt nicht nur die grundlegenden Bausteine wie eine Ereignisschleife zur Verfügung, sondern auch Primitive wie Deferreds, die ein bisschen wie die Futures in asyncio sind. Das Design von asyncio wurde stark von Twisted und der umfangreichen Erfahrung seiner Leiter und Betreuer beeinflusst.

Beachte, dass asyncio Twisted nicht ersetzt. Twisted enthält hochwertige Implementierungen einer großen Anzahl von Internetprotokollen, darunter nicht nur das übliche HTTP, sondern auch XMPP, NNTP, IMAP, SSH, IRC und FTP (sowohl Server als auch Clients). Und die Liste geht noch weiter: DNS? Check. SMTP? Richtig. POP3? Erledigt. Die Verfügbarkeit dieser exzellenten Internetprotokoll-Implementierungen macht Twisted so überzeugend.

Auf der Code-Ebene ist der wichtigste Unterschied zwischen Twisted undasyncio, abgesehen von der Geschichte und dem historischen Kontext, dass Python lange Zeit keine Sprachunterstützung für Coroutines hatte, was bedeutete, dass Twisted und ähnliche Projekte Wege finden mussten, mit Asynchronität umzugehen, die mit der Standard-Python-Syntax funktionieren.

Die meiste Zeit in der Geschichte von Twisted waren Callbacks das Mittel, mit dem die asynchrone Programmierung durchgeführt wurde, mit all der nichtlinearen Komplexität, die das mit sich bringt. Als es jedoch möglich wurde, Generatoren als behelfsmäßige Coroutines zu verwenden, wurde es plötzlich möglich, den Code in Twisted auf lineare Weise zu gestalten, indem man seinen @defer.inlineCallbacks Dekorator verwendet, wie in Beispiel 4-10 gezeigt.

Beispiel 4-10. Noch mehr Twisted mit inlined callbacks
@defer.inlineCallbacks  1
def f():
    yield
    defer.returnValue(123)  2

@defer.inlineCallbacks
def my_coro_func():
    value = yield f()  3
    assert value == 123
1

Normalerweise müssen in Twisted Instanzen von Deferred erstellt und diesen Instanzen Rückrufe hinzugefügt werden, um asynchrone Programme zu erstellen. Vor ein paar Jahren wurde der @inlineCallbacks Dekorator hinzugefügt, der Generatoren als Coroutines umfunktioniert.

2

Während @inlineCallbacks es dir ermöglichte, linearen Code zu schreiben (im Gegensatz zu Callbacks), waren einige Hacks erforderlich, wie zum Beispiel dieser Aufruf von defer.returnValue(), der ist, wie du Werte aus@inlineCallbacks Coroutines zurückgibt.

3

Hier können wir die yield sehen, die diese Funktion zu einem Generator macht. Damit@inlineCallbacks funktioniert, muss mindestens ein yield in der Funktion vorhanden sein, die dekoriert wird.

Seit dem Erscheinen der nativen Coroutines in Python 3.5 hat das Twisted-Team (und insbesondere Amber Brown ) daran gearbeitet, die Unterstützung für die Ausführung von Twisted in der asyncio Ereignisschleife hinzuzufügen.

Ich möchte dich in diesem Abschnitt nicht davon überzeugen, alle deine Anwendungen als Twisted-asyncio -Hybride zu erstellen, sondern dich darauf aufmerksam machen, dass derzeit an einer umfassenden Interoperabilität zwischen den beiden Systemen gearbeitet wird.

Für diejenigen unter euch, die Erfahrung mit Twisted haben, könnte Beispiel 4-11verwirrend sein.

Beispiel 4-11. Unterstützung für asyncio in Twisted
# twisted_asyncio.py
from time import ctime
from twisted.internet import asyncioreactor
asyncioreactor.install()  1
from twisted.internet import reactor, defer, task  2

async def main():  3
    for i in range(5):
        print(f'{ctime()} Hello {i}')
        await task.deferLater(reactor, 1, lambda: None)  4

defer.ensureDeferred(main())  5
reactor.run()  6
1

So sagst du Twisted, dass es die asyncio Ereignisschleife als Haupt reactor verwenden soll. Beachte, dass diese Zeile vor dem reactor kommen muss, das von twisted.internet in der folgenden Zeile.

2

Jeder, der mit der Twisted-Programmierung vertraut ist, wird diese Importe wiedererkennen. Wir haben hier nicht genug Platz, um sie ausführlich zu behandeln, aber kurz gesagt ist reactor die Twisted Version der asyncio Schleife, und defer und task sind Namensräume für Werkzeuge, die mit Zeitplannungsprogrammen arbeiten.

3

async def hier in einem Twisted-Programm zu sehen, sieht seltsam aus, aber das ist tatsächlich das, was uns die neue Unterstützung für async/awaitgibt: die Möglichkeit, native Coroutines direkt in Twisted-Programmen zu verwenden.

4

In der älteren @inlineCallbacks Welt hättest du hieryield from verwendet, aber jetzt können wir await verwenden, genau wie im asyncio Code. Der andere Teil dieser Zeile, deferLater(), ist eine alternative Möglichkeit, das Gleiche zu tun wie asyncio.sleep(1). Wir await eine Zukunft, in der nach einer Sekunde ein "do-nothing"-Callback ausgelöst wird.

5

ensureDeferred() ist eine Twisted-Version der Zeitplanung einer Coroutine. Dies wäre vergleichbar mit loop.create_task() oder asyncio.ensure_future().

6

Das Ausführen der reactor ist dasselbe wie loop.run_forever() in asyncio.

Wenn du dieses Skript ausführst, erhältst du die folgende Ausgabe:

$ twisted_asyncio.py
Mon Oct 16 16:19:49 2019 Hello 0
Mon Oct 16 16:19:50 2019 Hello 1
Mon Oct 16 16:19:51 2019 Hello 2
Mon Oct 16 16:19:52 2019 Hello 3
Mon Oct 16 16:19:53 2019 Hello 4

Es gibt noch viel mehr über Twisted zu lernen. Vor allem lohnt es sich, die Liste der implementierten Netzwerkprotokolle durchzugehen. Es gibt noch einiges zu tun, aber die Zukunft sieht für die Zusammenarbeit zwischen Twisted und asyncio sehr rosig aus.

asyncio wurde so konzipiert, dass wir uns auf eine Zukunft freuen können, in der es möglich sein wird, Code aus vielen asynchronen Frameworks wie Twisted und Tornado in eine einzige Anwendung einzubinden, wobei der gesamte Code in derselben Ereignisschleife läuft.

Die Janus-Warteschlange

Die Janus-Warteschlange (installiert mit pip install janus) bietet eine Lösung für die Kommunikation zwischen Threads und Coroutines. In der Python-Standardbibliothek gibt es zwei Arten von Warteschlangen:

queue.Queue

Eine blockierende Warteschlange, die üblicherweise für die Kommunikation und Pufferung zwischen Threads verwendet wird

asyncio.Queue

Eine async-kompatible Warteschlange, die üblicherweise für die Kommunikation und Pufferung zwischen Coroutines verwendet wird

Leider ist beides nicht für die Kommunikation zwischen Threads und Coroutines geeignet! Hier kommt Janus ins Spiel: Es handelt sich um eine einzige Warteschlange, die beide APIs zur Verfügung stellt, eine blockierende und eine asynchrone.Beispiel 4-12 generiert Daten innerhalb eines Threads, legt diese Daten in einer Warteschlange ab und konsumiert diese Daten dann von einer Coroutine.

Beispiel 4-12. Verbinden von Coroutines und Threads mit einer Janus-Warteschlange
# janus_demo.py
import asyncio
import random
import time

import janus

async def main():
    loop = asyncio.get_running_loop()
    queue = janus.Queue(loop=loop)  1
    future = loop.run_in_executor(None, data_source, queue)
    while (data := await queue.async_q.get()) is not None:  2
        print(f'Got {data} off queue')  3
    print('Done.')

def data_source(queue):
    for i in range(10):
        r = random.randint(0, 4)
        time.sleep(r)  4
        queue.sync_q.put(r)  5
    queue.sync_q.put(None)

asyncio.run(main())
1

Erstelle eine Janus-Warteschlange. Beachte, dass die Janus-Warteschlange genau wie eine asyncio.Queue mit einer bestimmten Ereignisschleife verknüpft wird. Wenn du den Parameter loop nicht angibst, wird wie üblich der Standardaufrufget_event_loop() intern verwendet.

2

Unsere main() Coroutine-Funktion wartet einfach auf Daten in einer Warteschlange. Diese Zeile hält an, bis Daten vorhanden sind, genau wie der Aufruf von get() auf einer asyncio.Queue Instanz. Das Warteschlangenobjekt hat zwei Gesichter: Dieses heißtasync_q und bietet die async-kompatible Warteschlangen-API.

3

Drucke eine Nachricht.

4

In der Funktion data_source() wird eine zufällige int generiert, die sowohl als Schlafdauer als auch als Datenwert verwendet wird. Beachte, dass der Aufruf von time.sleep() blockierend ist, daher muss diese Funktion in einem Thread ausgeführt werden.

5

Lege die Daten in die Janus-Warteschlange. Dies zeigt die andereSeite der Janus-Warteschlange: sync_q Die Janus-Warteschlange, die die standardmäßige, blockierende Queue API bereitstellt.

Hier ist die Ausgabe:

$ <name>
Got 2 off queue
Got 4 off queue
Got 4 off queue
Got 2 off queue
Got 3 off queue
Got 4 off queue
Got 1 off queue
Got 1 off queue
Got 0 off queue
Got 4 off queue
Done.

Wenn es möglich ist, ist es besser, kurze Aufträge zu haben, und in diesen Fällen ist eine Warteschlange (für die Kommunikation) nicht notwendig. Das ist jedoch nicht immer möglich, und in solchen Situationen kann die Janus-Warteschlange die bequemste Lösung sein, um Daten zwischen Threads und Coroutines zu puffern und zu verteilen.

aiohttp

aiohttp bringt alles, was mit HTTP zu tun hat, auf asyncio, einschließlich Unterstützung für HTTP-Clients und -Server sowie WebSocket-Unterstützung. Kommen wir gleich zu den Codebeispielen, angefangen mit der Einfachheit selbst: "Hello World".

Fallstudie: Hallo Welt

Beispiel 4-13 zeigt einen minimalen Webserver mit aiohttp.

Beispiel 4-13. Minimales aiohttp-Beispiel
from aiohttp import web

async def hello(request):
    return web.Response(text="Hello, world")

app = web.Application()  1
app.router.add_get('/', hello)  2
web.run_app(app, port=8080)  3
1

Eine Application Instanz wird erstellt.

2

Es wird eine Route erstellt, wobei die Ziel-Coroutine hello() als Handler angegeben wird.

3

Die Webanwendung wird ausgeführt.

Beachte, dass in diesem Code keine Schleifen, Tasks oder Futures erwähnt werden: Die Entwickler des Frameworks aiohttp haben all das vor uns verborgen und eine sehr saubere API hinterlassen. Dies wird bei den meisten Frameworks, die auf asyncio aufbauen, der Fall sein. Das Framework wurde so konzipiert, dass die Entwickler nur die Teile auswählen können, die sie benötigen, und diese in ihrer bevorzugten API kapseln.

Fallstudie: Scraping der Nachrichten

aiohttp kann sowohl als Server als auch als Client-Bibliothek verwendet werden, wie die sehr beliebte (aber blockierende!)requests Bibliothek. Ich wollte aiohttp anhand eines Beispiels vorstellen, das beide Funktionen beinhaltet.

In dieser Fallstudie implementieren wir eine Website, die hinter den Kulissen Web Scraping betreibt. Die Anwendung wird zwei Nachrichten-Websites scrapen und die Schlagzeilen auf einer Ergebnisseite zusammenfassen. Hier ist die Strategie:

  1. Ein Browser-Client stellt eine Web-Anfrage an http://localhost:8080/news.

  2. Unser Webserver empfängt die Anfrage und holt dann im Backend die HTML-Daten von mehreren Nachrichten-Websites ab.

  3. Die Daten jeder Seite werden nach Schlagzeilen durchsucht.

  4. Die Überschriften werden sortiert und in der HTML-Antwort formatiert, die wir an den Browser-Client zurückschicken.

Abbildung 4-1 zeigt die Ausgabe.

uaip 0401
Abbildung 4-1. Das Endprodukt unseres News Scrapers: Schlagzeilen von CNN werden in einer Farbe dargestellt und Al Jazeera in einer anderen

Web Scraping ist heutzutage ziemlich schwierig geworden. Wenn du zum Beispielrequests.get('http://edition.cnn.com') ausprobierst, wirst du feststellen, dass die Antwort nur sehr wenige verwertbare Daten enthält! Es wird immer wichtiger, JavaScript lokal ausführen zu können, um Daten zu erhalten, denn viele Websites verwenden JavaScript, um ihre Inhalte zu laden. Der Prozess, bei dem dieses JavaScript ausgeführt wird, um die endgültige, vollständige HTML-Ausgabe zu erzeugen, wird Rendering genannt.

Für das Rendering von verwenden wir ein nettes Projekt namensSplash, das sich selbst als "JavaScript Rendering Service" bezeichnet. Es kann in einemDocker-Container laufen und bietet eine API für das Rendering anderer Websites. Intern verwendet es eine (JavaScript-fähige) WebKit-Engine, um eine Website vollständig zu laden und zu rendern. Das werden wir nutzen, um Websitedaten zu erhalten. Unser aiohttp Server, der inBeispiel 4-14 gezeigt wird, ruft diese Splash API auf, um die Seitendaten zu erhalten.

Tipp

Um den Splash-Container zu erhalten und zu starten, führe diese Befehle in deiner Shell aus:

$ docker pull scrapinghub/splash
$ docker run --rm -p 8050:8050 scrapinghub/splash

Unser Server-Backend wird die Splash-API unter http://localhost:8050 aufrufen .

Beispiel 4-14. Code für den News Scraper
from asyncio import gather, create_task
from string import Template
from aiohttp import web, ClientSession
from bs4 import BeautifulSoup

async def news(request):  1
    sites = [
        ('http://edition.cnn.com', cnn_articles),  2
        ('http://www.aljazeera.com', aljazeera_articles),
    ]
    tasks = [create_task(news_fetch(*s)) for s in sites] 3
    await gather(*tasks)  4

    items = {  5
        text: (  6
            f'<div class="box {kind}">'
            f'<span>'
            f'<a href="{href}">{text}</a>'
            f'</span>'
            f'</div>'
        )
        for task in tasks for href, text, kind in task.result()
    }
    content = ''.join(items[x] for x in sorted(items))

    page = Template(open('index.html').read())  7
    return web.Response(
        body=page.safe_substitute(body=content),  8
        content_type='text/html',
    )

async def news_fetch(url, postprocess):
    proxy_url = (
        f'http://localhost:8050/render.html?'  9
        f'url={url}&timeout=60&wait=1'
    )
    async with ClientSession() as session:
        async with session.get(proxy_url) as resp:  10
            data = await resp.read()
            data = data.decode('utf-8')
    return postprocess(url, data)  11

def cnn_articles(url, page_data):  12
    soup = BeautifulSoup(page_data, 'lxml')
    def match(tag):
        return (
            tag.text and tag.has_attr('href')
            and tag['href'].startswith('/')
            and tag['href'].endswith('.html')
            and tag.find(class_='cd__headline-text')
        )
    headlines = soup.find_all(match)  13
    return [(url + hl['href'], hl.text, 'cnn')
            for hl in headlines]

def aljazeera_articles(url, page_data):  14
    soup = BeautifulSoup(page_data, 'lxml')
    def match(tag):
        return (
            tag.text and tag.has_attr('href')
            and tag['href'].startswith('/news')
            and tag['href'].endswith('.html')
        )
    headlines = soup.find_all(match)
    return [(url + hl['href'], hl. text, 'aljazeera')
            for hl in headlines]

app = web.Application()
app.router.add_get('/news', news)
web.run_app(app, port=8080)
1

Die Funktion news() ist der Handler für die URL /news auf unserem Server. Sie gibt die HTML-Seite mit allen Schlagzeilen zurück.

2

Hier haben wir nur zwei Nachrichten-Websites, die wir auslesen können: CNN und Al Jazeera. Es könnten leicht weitere hinzugefügt werden, aber dann müssten auch zusätzliche Postprozessoren hinzugefügt werden, genau wie die Funktionen cnn_articles() undaljazeera_articles(), die für die Extraktion von Schlagzeilendaten angepasst werden.

3

Für jede Nachrichtenseite erstellen wir eine Aufgabe, um die HTML-Seitendaten für ihre Startseite zu holen und zu verarbeiten. Beachte, dass wir das Tupel auspacken ((*s)), da die Coroutine-Funktion news_fetch() sowohl die URL als auch die Nachbearbeitungsfunktion als Parameter benötigt. Jeder Aufruf von news_fetch() gibt eine Liste von Tupelnals Überschriftenergebnisse zurück, in der Form <article URL>, <article title>.

4

Alle Aufgaben werden in einem einzigen Futuregesammelt (gather() gibt einen Future zurück, der den Zustand aller gesammelten Aufgaben repräsentiert), und dann werden wir sofort await den Abschluss dieses Futures. Diese Zeile hält an, bis der Future abgeschlossen ist.

5

Da alle news_fetch() Aufgaben nun abgeschlossen sind, sammeln wir alle Ergebnisse in einem Wörterbuch. Beachte, wie verschachtelte Comprehensions verwendet werden, um über die Aufgaben zu iterieren und dann über die Liste der Tupel, die von jeder Aufgabe zurückgegeben werden. Wir verwenden f-strings auch, um Daten direkt zu ersetzen, einschließlich der Art der Seite, die in CSS verwendet wird, um den Hintergrund von divzu färben.

6

In diesem Wörterbuch ist der Schlüssel der Titel der Überschrift und der Wert ein HTML-String für div, der auf unserer Ergebnisseite angezeigt wird.

7

Unser Webserver wird HTML zurückgeben. Wir laden die HTML-Daten aus einer lokalen Datei namens index.html. Diese Datei findest du inBeispiel B-1, falls du die Fallstudie selbst nachbauen möchtest.

8

Wir ersetzen die gesammelte Überschrift div in der Vorlage und geben die Seite an den Browser-Client zurück. So entsteht die in Abbildung 4-1 gezeigte Seite.

9

Hier, in der news_fetch() Coroutine-Funktion, haben wir eine kleine Vorlage für den Zugriff auf die Splash-API (die bei mir in einem lokalen Docker-Container auf Port 8050 läuft). Dies zeigt, wie aiohttpals HTTP-Client verwendet werden kann.

10

Der Standardweg ist, eine ClientSession() Instanz zu erstellen und dann die get() Methode auf der Sitzungsinstanz zu verwenden, um den REST-Aufruf durchzuführen. In der nächsten Zeile werden die Antwortdaten abgefragt. Beachte, dass diese Coroutine nie blockiert, weil wir immer mit Coroutines arbeiten, also mit async with und await: Wir können viele tausend dieser Anfragen bearbeiten, auch wenn dieser Vorgang (news_fetch()) relativ langsam ist, weil wir intern Webaufrufe durchführen.

11

Nachdem wir die Daten erhalten haben, rufen wir die Nachbearbeitungsfunktion auf. Für CNN ist das cnn_articles() und für Al Jazeera aljazeera_articles().

12

Wir haben nur Platz für einen kurzen Blick auf die Nachbearbeitung. Nachdem wir die Seitendaten erhalten haben, verwenden wir die Beautiful Soup 4-Bibliothek, um Überschriften zu extrahieren.

13

Die Funktion match() gibt alle übereinstimmenden Tags zurück (ich habe den HTML-Quelltext dieser Nachrichten-Websites manuell überprüft, um herauszufinden, welche Kombination von Filtern die besten Tags extrahiert), und dann geben wir eine Liste von Tupeln zurück, die dem Format <article URL>, <article title>.

14

Dies ist der analoge Postprozessor für Al Jazeera. Die match()Bedingung ist etwas anders, aber ansonsten ist sie die gleiche wie die von CNN.

Im Allgemeinen wirst du feststellen, dass aiohttp eine einfache API hat und dir bei der Entwicklung deiner Anwendungen "nicht im Weg steht".

Im nächsten Abschnitt sehen wir uns die Verwendung von ZeroMQ mit asyncio an, was die Socket-Programmierung auf seltsame Weise sehr angenehm macht.

ØMQ (ZeroMQ)

Programmieren ist eine Wissenschaft, die sich als Kunst verkleidet, denn die meisten von uns verstehen die Physik der Software nicht und sie wird selten, wenn überhaupt, gelehrt. Die Physik der Software besteht nicht aus Algorithmen, Datenstrukturen, Sprachen und Abstraktionen. Das sind nur Werkzeuge, die wir herstellen, benutzen und wegwerfen. Die wahre Physik der Software ist die Physik der Menschen. Genauer gesagt geht es um unsere Grenzen, wenn es um Komplexität geht, und um unseren Wunsch, zusammenzuarbeiten, um große Probleme in Teilen zu lösen. Das ist die Wissenschaft des Programmierens: Wenn du Bausteine entwickelst, die die Menschen verstehen und einfach benutzen können, werden sie zusammenarbeiten, um auch die größten Probleme zu lösen.

Pieter Hintjens, ZeroMQ: Messaging für viele Anwendungen

ØMQ (oder ZeroMQ) ist eine beliebte sprachunabhängige Bibliothek für Netzwerkanwendungen: Sie bietet "intelligente" Sockets. Wenn du ØMQ-Sockets in deinem Code erstellst, ähneln sie normalen Sockets, mit erkennbaren Methodennamen wie recv() undsend() und so weiter - aber intern erledigen diese Sockets einige der lästigen und mühsamen Aufgaben, die für die Arbeit mit herkömmlichen Sockets erforderlich sind.

So musst du nicht dein eigenes Protokoll erfinden und die Bytes auf der Leitung zählen, um herauszufinden, wann alle Bytes für eine bestimmte Nachricht angekommen sind - du sendest einfach das, was du als "Nachricht" betrachtest, und das Ganze kommt am anderen Ende intakt an.

Eine weitere großartige Funktion ist die automatische Wiederverbindungslogik. Wenn der Server ausfällt und später wieder hochkommt, wird der ØMQ-Socket des Clientsautomatisch wieder verbunden. Und was noch besser ist: Nachrichten, die dein Code an den Socket sendet, werden während der Unterbrechung gepuffert, so dass sie auch dann noch gesendet werden, wenn der Server zurückkehrt. Dies sind einige der Gründe, warum ØMQ manchmal alsbrokerloses Messaging bezeichnet wird: Es bietet einige der Funktionen von Message Broker Software direkt in den Socket-Objekten selbst.

ØMQ-Sockets sind intern bereits asynchron implementiert (sie können also viele Tausend gleichzeitige Verbindungen aufrechterhalten, selbst wenn sie in Threaded Code verwendet werden), aber dies ist hinter der ØMQ-API verborgen. DiePyZMQ-Python-Bindings für die ØMQ-Bibliothek unterstützen Asyncio, und in diesem Abschnitt werden wir uns einige Beispiele dafür ansehen, wie du diese intelligenten Sockets in deine Python-Anwendungen einbauen kannst.

Fallstudie: Mehrere Steckdosen

Wenn ØMQ bereits asynchrone Sockets zur Verfügung stellt, die mit Threading genutzt werden können, was bringt es dann, ØMQ mit asyncio zu nutzen? Die Antwort ist sauberer Code.

Zur Veranschaulichung schauen wir uns eine kleine Fallstudie an, in der du mehrere ØMQ-Sockets in der gleichen Anwendung verwendest. Beispiel 4-15zeigt zunächst die blockierende Version (dieses Beispiel stammt aus dem zguide, dem offiziellen Leitfaden für ØMQ).

Beispiel 4-15. Der traditionelle ØMQ-Ansatz
# poller.py
import zmq

context = zmq.Context()
receiver = context.socket(zmq.PULL)  1
receiver.connect("tcp://localhost:5557")

subscriber = context.socket(zmq.SUB)  2
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')

poller = zmq.Poller()  3
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

while True:
    try:
        socks = dict(poller.poll())  4
    except KeyboardInterrupt:
        break

    if receiver in socks:
        message = receiver.recv_json()
        print(f'Via PULL: {message}')

    if subscriber in socks:
        message = subscriber.recv_json()
        print(f'Via SUB: {message}')
1

ØMQ-Sockets haben Typen. Dies ist ein PULL Socket. Du kannst ihn dir als eine Art Socket vorstellen, der nur empfängt und von einem anderen Socket gefüttert wird,der nur sendet und der vom Typ PUSH ist.

2

Der SUB Socket ist eine andere Art von Socket, der nur empfängt, und er wird mit einem PUB Socket gefüttert, der nur sendet.

3

Wenn du Daten zwischen mehreren Sockets in einer ØMQ-Anwendung mit Threads austauschen musst, brauchst du einen Poller. Das liegt daran, dass diese Sockets nicht thread-sicher sind. Du kannst also nicht recv() auf verschiedenen Sockets in verschiedenen Threads verwenden.1

4

Er funktioniert ähnlich wie der Systemaufruf select(). Der Poller gibt die Blockade auf, wenn auf einem der registrierten Sockets Daten zum Empfang bereitstehen, und dann liegt es an dir, die Daten abzuziehen und etwas damit zu tun. Der große if Block ist, wie du den richtigen Socket erkennst.

Die Verwendung einer Poller-Schleife und eines expliziten Socket-Auswahlblocks lässt den Code etwas klobig aussehen, aber dieser Ansatz vermeidet Probleme mit der Thread-Sicherheit, da er garantiert, dass derselbe Socket nicht von verschiedenen Threads verwendet wird.

Beispiel 4-16 zeigt den Servercode.

Beispiel 4-16. Server-Code
# poller_srv.py
import zmq, itertools, time

context = zmq.Context()
pusher = context.socket(zmq.PUSH)
pusher.bind("tcp://*:5557")

publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")

for i in itertools.count():
    time.sleep(1)
    pusher.send_json(i)
    publisher.send_json(i)

Dieser Code ist für die Diskussion nicht wichtig, aber kurz: Es gibt einen PUSH Socket und einen PUB Socket, wie ich bereits sagte, und eine Schleife darin, die jede Sekunde Daten an beide Sockets sendet. Hier ist ein Beispiel für die Ausgabe von poller.py (Achtung: beide Programme müssen laufen):

$ poller.py
Via PULL: 0
Via SUB: 0
Via PULL: 1
Via SUB: 1
Via PULL: 2
Via SUB: 2
Via PULL: 3
Via SUB: 3

Der Code funktioniert, aber unser Interesse gilt hier nicht der Frage, ob der Code läuft, sondern ob asyncio etwas für die Struktur vonpoller.py zu bieten hat. Das Wichtigste ist, dass unserasyncio Code in einem einzigen Thread ausgeführt wird, was bedeutet, dass es in Ordnung ist, verschiedene Sockets in verschiedenen Coroutineszu behandeln - undgenau das werden wir auch tun.

Natürlichmusste sich jemand die Mühe machen, die Unterstützung für Coroutines in pyzmq(die Python-Client-Bibliothek für ØMQ) selbst einzubauen, damit das funktioniert, es war also nicht kostenlos. Aber wir können diese harte Arbeit nutzen, um die "traditionelle" Codestruktur zu verbessern, wie in Beispiel 4-17 gezeigt.

Beispiel 4-17. Saubere Trennung mit asyncio
# poller_aio.py
import asyncio
import zmq
from zmq.asyncio import Context

context = Context()

async def do_receiver():
    receiver = context.socket(zmq.PULL)  1
    receiver.connect("tcp://localhost:5557")
    while message := await receiver.recv_json():  2
        print(f'Via PULL: {message}')

async def do_subscriber():
    subscriber = context.socket(zmq.SUB)  3
    subscriber.connect("tcp://localhost:5556")
    subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
    while message := await subscriber.recv_json():  4
        print(f'Via SUB: {message}')

async def main():
    await asyncio.gather(
        do_receiver(),
        do_subscriber(),
    )

asyncio.run(main())
1

Dieses Codebeispiel macht dasselbe wie Beispiel 4-15, nur dass wir jetzt die Vorteile von Coroutines nutzen, um alles neu zu strukturieren. Jetzt können wir jeden Socket einzeln behandeln. Ich habe zwei Coroutine-Funktionen erstellt, eine für jeden Socket; diese hier ist für den Socket PULL.

2

Ich verwende die asyncio Unterstützung in pyzmq, was bedeutet, dass allesend() und recv() Aufrufe das await Schlüsselwort verwenden müssen. Das Poller erscheint nirgendwo mehr, weil es in die asyncioEreignisschleife selbst integriert wurde.

3

Dies ist der Handler für den Socket SUB. Die Struktur ist der des PULL Socket-Handlers sehr ähnlich, aber das hätte nicht sein müssen. Wäre eine komplexere Logik erforderlich gewesen, hätte ich sie hier einfach hinzufügen können, und zwar vollständig gekapselt im SUB-Handler-Code.

4

Auch die asyncio-kompatiblen Sockets benötigen das Schlüsselwort await zum Senden und Empfangen.

Die Ausgabe ist dieselbe wie vorher, deshalb zeige ich sie nicht.

Die Verwendung von Coroutines hat meiner Meinung nach einen erstaunlich positiven Effekt auf den Codeaufbau in diesen Beispielen. In echtem Produktionscode mit vielen ØMQ-Sockets könnten die Coroutine-Handler für jeden einzelnen sogar in separaten Dateien stehen, was mehr Möglichkeiten für eine bessere Codestruktur bietet. Und selbst bei Programmen mit einem einzigen Lese-/Schreib-Socket ist es sehr einfach, getrennte Coroutines für das Lesen und Schreiben zu verwenden, falls nötig.

Der verbesserte Code sieht dem Code mit Threads sehr ähnlich, und für das hier gezeigte Beispiel funktioniert derselbe Refactor auch für Threads: Die blockierenden Funktionen do_receiver() und do_subscriber() werden in separaten Threads ausgeführt. Aber willst du dich wirklich mit dem Potenzialfür Race Conditions auseinandersetzen, vor allem, wenn deine Anwendung mit der Zeit an Funktionen und Komplexität zunimmt?

Hier gibt es eine Menge zu entdecken, und wie ich schon sagte, macht es Spaß, mit diesen magischen Sockets zu spielen. In der nächsten Fallstudie werden wir uns eine praktischere Anwendung von ØMQ ansehen.

Fallstudie: Überwachung der Anwendungsleistung

Mit den modernen, containerisierten, Microservice-basierten Bereitstellungspraktiken von heute sind einige Dinge, die früher trivial waren, wie z.B. die Überwachung der CPU- und Speichernutzung deiner Apps, etwas komplizierter geworden, als wenn du einfach top laufen lässt. In den letzten Jahren sind mehrere kommerzielle Produkte auf den Markt gekommen, die sich mit diesen Problemen befassen, aber ihre Kosten können für kleine Startup-Teams und Hobbyisten unerschwinglich sein.

In dieser Fallstudie werde ich ØMQ und asyncio nutzen, um einen Prototyp für die Überwachung verteilter Anwendungen zu entwickeln. Unser Entwurf besteht aus drei Teilen:

Anwendungsschicht

Diese Schicht enthält alle unsere Anwendungen. Beispiele dafür sind ein Microservice "Kunden", ein Microservice "Buchungen", ein Microservice "E-Mailer" usw. Ich werde jeder unserer Anwendungen einen ØMQ "sendenden" Socket hinzufügen. Dieser Socket sendet Leistungskennzahlen an einen zentralen Server.

Ebene der Sammlung

Der zentrale Server stellt einen ØMQ-Socket zur Verfügung, um die Daten von allen laufenden Anwendungsinstanzen zu sammeln. Der Server stellt außerdem eine Webseite zur Verfügung, auf der Leistungsdiagramme im Zeitverlauf angezeigt werden und die Daten live gestreamt werden, sobald sie eingehen.

Visualisierungsebene

Dies ist die Webseite, die aufgerufen wird. Wir zeigen die gesammelten Daten in einer Reihe von Diagrammen an, die sich in Echtzeit aktualisieren. Um die Codebeispiele zu vereinfachen, verwende ich die praktische Smoothie ChartsJavaScript-Bibliothek, die alle notwendigen clientseitigen Funktionen bietet.

Die Backend-Applikation (Anwendungsschicht), die Metriken erzeugt, wird in Beispiel 4-18 gezeigt.

Beispiel 4-18. Die Anwendungsschicht: Erzeugung von Metriken
import argparse
import asyncio
from random import randint, uniform
from datetime import datetime as dt
from datetime import timezone as tz
from contextlib import suppress
import zmq, zmq.asyncio, psutil

ctx = zmq.asyncio.Context()

async def stats_reporter(color: str):  1
    p = psutil.Process()
    sock = ctx.socket(zmq.PUB)  2
    sock.setsockopt(zmq.LINGER, 1)
    sock.connect('tcp://localhost:5555')  3
    with suppress(asyncio.CancelledError):  4
        while True:  5
            await sock.send_json(dict(  6
                color=color,
                timestamp=dt.now(tz=tz.utc).isoformat(),  7
                cpu=p.cpu_percent(),
                mem=p.memory_full_info().rss / 1024 / 1024
            ))
            await asyncio.sleep(1)
    sock.close()  8

async def main(args):
    asyncio.create_task(stats_reporter(args.color))
    leak = []
    with suppress(asyncio.CancelledError):
        while True:
            sum(range(randint(1_000, 10_000_000)))  9
            await asyncio.sleep(uniform(0, 1))
            leak += [0] * args.leak

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--color', type=str)  10
    parser.add_argument('--leak', type=int, default=0)
    args = parser.parse_args()
    try:
        asyncio.run(main(args))
    except KeyboardInterrupt:
        print('Leaving...')
        ctx.term()  11
1

Diese Coroutine-Funktion wird als langlebige Coroutine ausgeführt und sendet kontinuierlich Daten an den Serverprozess.

2

Erstelle einen ØMQ-Socket. Wie du weißt, gibt es verschiedene Arten von Sockets; dieser ist ein PUB Typ, der es ermöglicht, Einwegnachrichten an einen anderen ØMQ-Socket zu senden. Dieser Socket hat - wie der ØMQ-Leitfaden sagt - Superkräfte. Er kümmert sich automatisch um die Wiederverbindung und die Pufferung der Nachrichten für uns.

3

Verbinde dich mit dem Server.

4

Unsere Shutdown-Sequenz wird von KeyboardInterrupt gesteuert, weiter unten. Wenn dieses Signal empfangen wird, werden alle Aufgaben abgebrochen. Hier behandle ich das ausgelöste CancelledError mit dem praktischen suppress() Kontextmanager aus dem contextlib Standardbibliotheksmodul.

5

Iteriert ewig und sendet Daten an den Server.

6

Da ØMQ in der Lage ist, mit kompletten Nachrichten zu arbeiten und nicht nur mit Teilen eines Bytestroms, öffnet es die Tür zu einer Reihe nützlicher Wrapper um das übliche sock.send() Idiom: Hier verwende ich eine dieser Hilfsmethoden,send_json(), die das Argument automatisch in JSON serialisiert. So können wir dict() direkt verwenden.

7

Eine zuverlässige Methode zur Übertragung von Datumsangaben ist das ISO 8601-Format. Das gilt vor allem, wenn du Datumsdaten zwischen Software in verschiedenen Sprachen übertragen musst, da die meisten Sprachimplementierungen mit diesem Standard arbeiten können.

8

Um hier zu landen, müssen wir die CancelledError Ausnahme erhalten haben, die aus dem Abbruch der Aufgabe resultiert. Der ØMQ-Socket muss geschlossen werden, damit das Programm heruntergefahren werden kann.

9

Die Funktion main() symbolisiert die eigentliche Microservice-Anwendung. Mit dieser Summe über Zufallszahlen wird Scheinarbeit produziert, nur um uns einige Daten ungleich Null zu liefern, die wir etwas später in der Visualisierungsschicht betrachten können.

10

Ich werde mehrere Instanzen dieser Anwendung erstellen, so dass es praktisch ist, wenn ich sie (später in den Diagrammen) mit einem --color Parameter unterscheiden kann.

11

Schließlich kann der ØMQ-Kontext beendet werden.

Das Hauptaugenmerk liegt auf der Funktion stats_reporter(). Über sie werden die (von der nützlichen psutilBibliothek gesammelten) Metrikdaten ausgegeben. Der Rest des Codes kann als typische Microservice-Anwendung angesehen werden.

Der Servercode in Beispiel 4-19 sammelt alle Daten und liefert sie an einen Webclient.

Beispiel 4-19. Die Sammelschicht: Dieser Server sammelt Prozessstatistiken
# metric-server.py
import asyncio
from contextlib import suppress
import zmq
import zmq.asyncio
import aiohttp
from aiohttp import web
from aiohttp_sse import sse_response
from weakref import WeakSet
import json

# zmq.asyncio.install()
ctx = zmq.asyncio.Context()
connections = WeakSet()  1

async def collector():
    sock = ctx.socket(zmq.SUB)  2
    sock.setsockopt_string(zmq.SUBSCRIBE, '')  3
    sock.bind('tcp://*:5555')  4
    with suppress(asyncio.CancelledError):
        while data := await sock.recv_json():  5
            print(data)
            for q in connections:
                await q.put(data)  6
    sock.close()

async def feed(request):  7
    queue = asyncio.Queue()
    connections.add(queue)  8
    with suppress(asyncio.CancelledError):
        async with sse_response(request) as resp:  9
            while data := await queue.get():  10
                print('sending data:', data)
                resp.send(json.dumps(data))  11
    return resp

async def index(request):  12
    return aiohttp.web.FileResponse('./charts.html')

async def start_collector(app):  13
    app['collector'] = app.loop.create_task(collector())

async def stop_collector(app):
    print('Stopping collector...')
    app['collector'].cancel()  14
    await app['collector']
    ctx.term()

if __name__ == '__main__':
    app = web.Application()
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/feed', feed)
    app.on_startup.append(start_collector)  15
    app.on_cleanup.append(stop_collector)
    web.run_app(app, host='127.0.0.1', port=8088)
1

Die eine Hälfte dieses Programms empfängt Daten von anderen Anwendungen, die andere Hälfte liefert Daten an die Browser-Clients über Server-sent events (SSEs). Ich verwende eine WeakSet() Jeder verbundene Client hat eine zugehörige Queue() Instanz, sodass dieseconnections Kennung eigentlich eine Reihe von Warteschlangen ist.

2

Erinnere dich daran, dass ich in der Anwendungsschicht einen zmq.PUB Socket verwendet habe; hier in der Sammlungsschicht verwende ich seinen Partner, den zmq.SUB Socket-Typ. Dieser ØMQ-Socket kann nur empfangen, aber nicht senden.

3

Für den Socket-Typ zmq.SUB ist die Angabe eines Abonnementnamens erforderlich, aber für unsere Zwecke nehmen wir einfach alles, was reinkommt - daher der leere Themenname.

4

Ich binde die zmq.SUB Buchse. Denk mal kurz darüber nach. In Pub-Sub-Konfigurationen musst du normalerweise das Pub-Ende zum Server machen (bind()) und das Sub-Ende zum Client (connect()). Bei ØMQ ist das anders: Jedes Ende kann der Server sein. Für unseren Anwendungsfall ist das wichtig, denn jede unserer Anwendungsschicht-Instanzen wird sich mit demselben Domänennamen des Sammlungsservers verbinden, und nicht andersherum.

5

Die Unterstützung für asyncio in pyzmq ermöglicht es uns, await Daten von unseren verbundenen Apps zu erhalten. Und nicht nur das, die eingehenden Daten werden automatisch aus JSON deserialisiert (ja, das bedeutet, dass data ein dict() ist).

6

Erinnere dich daran, dass unser connections Set eine Warteschlange für jeden verbundenen Webclient enthält. Jetzt, da die Daten empfangen wurden, ist es an der Zeit, sie an alle Clients zu senden: Die Daten werden in jede Warteschlange gestellt.

7

Die Funktion feed() coroutine erstellt Coroutines für jeden verbundenen Webclient. Intern werden die vom Server gesendeten Ereignisse verwendet, um Daten an die Webclients zu senden.

8

Wie bereits beschrieben, hat jeder Webclient seine eigene queue Instanz, um Daten von der collector() Coroutine zu empfangen. Die Instanz queuewird dem connections Set hinzugefügt, aber da connections ein Weakset ist, wird der Eintrag automatisch ausconnections entfernt, wenn queue nicht mehr verfügbar ist, d.h. wenn ein Webclient die Verbindung trennt. Weakrefs sind großartig, um diese Art von Buchhaltungsaufgaben zu vereinfachen.

9

Das Paket aiohttp_sse stellt den Kontextmanager sse_response() zur Verfügung. Dieser gibt uns einen Rahmen, in dem wir Daten an den Webclient weitergeben können.

10

Wir bleiben mit dem Webclient verbunden und warten auf Daten in der Warteschlange dieses speziellen Clients.

11

Sobald die Daten eintreffen (innerhalb von collector()), werden sie an den verbundenen Webclient gesendet. Beachte, dass ich hier das data Diktat reserialisiere. Eine Optimierung dieses Codes wäre es, die Deserialisierung von JSON in collector() zu vermeiden und stattdessen sock.recv_string() zu verwenden, um den Serialisierungsvorgang zu umgehen. In einem realen Szenario könntest du natürlich auch im Collector deserialisieren und eine Validierung der Daten durchführen, bevor du sie an den Browser-Client sendest. So viele Möglichkeiten!

12

Der Endpunkt index() ist der primäre Seitenladepunkt, und hier wird eine statische Datei namens charts.html bereitgestellt.

13

Die Bibliothek aiohttp bietet uns die Möglichkeit, zusätzliche langlebige Coroutines einzubinden, die wir eventuell benötigen. Mit der Coroutine collector() haben wir genau diese Situation, also erstelle ich eine Startup-Coroutine,start_collector(), und eine Shutdown-Coroutine. Diese werden in bestimmten Phasen der aiohttpStartup- und Shutdown-Sequenz aufgerufen. Beachte, dass ich die Collector-Aufgabe in die app selbst einfüge, die ein Mapping-Protokoll implementiert, damit du sie wie ein Diktat verwenden kannst.

14

Ich beziehe unsere collector() Coroutine über denapp Bezeichner und rufe cancel() auf.

15

Schließlich kannst du sehen, wo die benutzerdefinierten Startup- und Shutdown-Coroutinen eingehängt werden: Die Instanz app bietet Hooks, an die unsere benutzerdefinierten Coroutinen angehängt werden können.

Jetzt fehlt nur noch die Visualisierungsebene, wie in Beispiel 4-20 gezeigt. Ich verwende dieSmoothie Charts-Bibliothek, um scrollende Diagramme zu erstellen. Das komplette HTML für unsere wichtigste (und einzige) Webseite,charts.html, findest du in Beispiel B-1. Es gibt zu viel HTML, CSS und JavaScript, um es in diesem Abschnitt darzustellen, aber ich möchte ein paar Punkte hervorheben, wie die vom Server gesendeten Ereignisse in JavaScript im Browser-Client behandelt werden.

Beispiel 4-20. Die Visualisierungsebene, was eine schicke Umschreibung für "der Browser" ist
<snip>
var evtSource = new EventSource("/feed");  1
evtSource.onmessage = function(e) {
    var obj = JSON.parse(e.data);  2
    if (!(obj.color in cpu)) {
        add_timeseries(cpu, cpu_chart, obj.color);
    }
    if (!(obj.color in mem)) {
        add_timeseries(mem, mem_chart, obj.color);
    }
    cpu[obj.color].append(
        Date.parse(obj.timestamp), obj.cpu);  3
    mem[obj.color].append(
        Date.parse(obj.timestamp), obj.mem);
};
<snip>
1

Erstelle eine neue EventSource() Instanz mit der URL /feed. Der Browser wird sich mit /feed auf unserem Server verbinden (metric_server.py). Beachte, dass der Browser automatisch versucht, die Verbindung wiederherzustellen, wenn sie unterbrochen wird. Vom Server gesendete Ereignisse werden oft übersehen, aber in vielen Situationen sind sie aufgrund ihrer Einfachheit den WebSockets vorzuziehen.

2

Das Ereignis onmessage wird jedes Mal ausgelöst, wenn der Server Daten sendet. Hier werden die Daten als JSON geparst.

3

Der cpu Bezeichner ist eine Zuordnung einer Farbe zu einerTimeSeries() Instanz (mehr dazu in Beispiel B-1). Hier erhalten wir diese Zeitreihe und fügen ihr Daten hinzu. Außerdem holen wir uns den Zeitstempel und parsen ihn, um das richtige Format für das Diagramm zu erhalten.

Jetzt können wir den Code ausführen. Um die ganze Show in Gang zu bringen, sind eine Reihe von Befehlszeilenanweisungen erforderlich, von denen die erste darin besteht, den Datensammelprozess zu starten:

$ metric-server.py
======== Running on http://127.0.0.1:8088 ========
(Press CTRL+C to quit)

Der nächste Schritt besteht darin, alle Microservice-Instanzen zu starten. Diese werden ihre CPU- und Speichernutzungsdaten an den Collector senden. Jeder wird durch eine andere Farbe gekennzeichnet, die in der Befehlszeile angegeben wird. Beachte, wie zwei der Microservices angewiesen werden, Speicher zu lecken:

$ backend-app.py --color red &
$ backend-app.py --color blue --leak 10000 &
$ backend-app.py --color green --leak 100000 &

Abbildung 4-2 zeigt unser Endprodukt in einem Browser. Du wirst mir glauben müssen, dass die Graphen wirklich animiert sind. Du wirst in den vorangegangenen Befehlszeilen sehen, dass ich bei Blau etwas und bei Grün viel Speicherplatz verloren habe. Ich musste den grünen Dienst sogar ein paar Mal neu starten, um zu verhindern, dass er über 100 MB ansteigt.

uaip 0402
Abbildung 4-2. Wir sollten so schnell wie möglich eine SRE auf Grün setzen!

Was an diesem Projekt besonders interessant ist, ist Folgendes: Jede der laufenden Instanzen in jedem Teil dieses Stacks kann neu gestartet werden, und es ist kein Code zur Wiederherstellung der Verbindung erforderlich. Die ØMQ-Sockets und die EventSource() JavaScript-Instanz im Browser verbinden sich auf magische Weise wieder und machen da weiter, wo sie aufgehört haben.

Im nächsten Abschnitt widmen wir uns den Datenbanken und der Frage, wie asynciogenutzt werden kann, um ein System zur Cache-Invalidierung zu entwickeln.

asyncpg und Sanic

Die Bibliothekasyncpg ermöglicht den Client-Zugriff auf die PostgreSQL-Datenbank, unterscheidet sich aber von anderenasyncio-kompatiblen Postgres-Client-Bibliotheken, indem sie den Schwerpunkt auf Geschwindigkeit legt.asyncpg wurde von Yury Selivanov, einem der wichtigsten asyncio Python-Entwickler, verfasst, der auch Autor des uvloop-Projekts ist.Es gibt keine Abhängigkeiten von Drittanbietern, obwohlCython erforderlich ist, wenn du vom Quellcode installierst.

asyncpg erreicht seine Geschwindigkeit, indem es direkt gegen das PostgreSQL-Binärprotokoll arbeitet. Weitere Vorteile dieses Low-Level-Ansatzes sind die Unterstützung vonPrepared Statementsund scrollbaren Cursors.

Wir werden uns eine Fallstudie ansehen, in der asyncpg für die Cache-Ungültigkeitserklärung verwendet wird. Zuvor ist es jedoch nützlich, ein grundlegendes Verständnis für die API asyncpg zu bekommen. Für den gesamten Code in diesem Abschnitt benötigen wir eine laufende Instanz von PostgreSQL. Das geht am einfachsten mit Docker, indem du den folgenden Befehl verwendest:

$ docker run -d --rm -p 55432:5432 postgres

Beachte, dass ich den Port 55432 und nicht den Standardport 5432 verwendet habe, falls du bereits eine Instanz der Datenbank auf dem Standardport laufen hast. Beispiel 4-21 zeigt kurz, wie man asyncpgverwendet, um mit PostgreSQL zu kommunizieren.

Beispiel 4-21. Grundlegende Demo von asyncpg
# asyncpg-basic.py
import asyncio
import asyncpg
import datetime
from util import Database  1

async def main():
    async with Database('test', owner=True) as conn:  2
        await demo(conn)

async def demo(conn: asyncpg.Connection):
    await conn.execute('''
        CREATE TABLE users(
            id serial PRIMARY KEY,
            name text,
            dob date
        )'''
    )  3

    pk = await conn.fetchval(  4
        'INSERT INTO users(name, dob) VALUES($1, $2) '
        'RETURNING id', 'Bob', datetime.date(1984, 3, 1)
    )

    async def get_row():  5
        return await conn.fetchrow(  6
            'SELECT * FROM users WHERE name = $1',
            'Bob'
        )
    print('After INSERT:', await get_row())  7

    await conn.execute(
        'UPDATE users SET dob = $1 WHERE id=1',
        datetime.date(1985, 3, 1)  8
    )
    print('After UPDATE:', await get_row())

    await conn.execute(
        'DELETE FROM users WHERE id=1'
    )
    print('After DELETE:', await get_row())

if __name__ == '__main__':
    asyncio.run(main())
1

Ich habe einige Textbausteine in einem kleinen util Modul versteckt, um die Dinge zu vereinfachen und die Kernaussage beizubehalten.

2

Die Klasse Database stellt uns einen Kontextmanager zur Verfügung, der eine neue Datenbank für uns erstellt - in diesem Fall mit dem Namen test- und diese Datenbank zerstört, wenn der Kontextmanager beendet wird. Das ist sehr nützlich, wenn wir mit Ideen im Code experimentieren. Da kein Zustand zwischen den Experimenten übertragen wird, kannst du jedes Mal mit einer sauberen Datenbank beginnen. Beachte, dass es sich um einen async with Kontextmanager handelt; wir werden später mehr darüber sprechen, aber im Moment liegt der Schwerpunkt dieser Demo auf den Vorgängen innerhalb der Coroutine demo().

3

Der Database Kontextmanager hat uns eine ConnectionInstanz zur Verfügung gestellt, die sofort verwendet wird, um eine neue Tabelle, users, zu erstellen.

4

Ich benutze fetchval(), um einen neuen Datensatz einzufügen. Ich hätte auch execute()zum Einfügen verwenden können, aber der Vorteil der Verwendung von fetchval()ist, dass ich den id des neu eingefügten Datensatzes erhalten kann, den ich im Bezeichner pk speichere.

Beachte, dass ich Parameter ($1 und $2) verwende, um Daten an die SQL-Abfrage zu übergeben. Verwende niemals String-Interpolation oder Verkettung, um Abfragen zu erstellen, da dies ein Sicherheitsrisiko darstellt!

5

Im weiteren Verlauf der Demo werde ich die Daten in der Tabelle users bearbeiten, also erstelle ich eine neue Coroutine-Funktion, die einen Datensatz in der Tabelle abruft. Diese Funktion wird mehrere Male aufgerufen werden.

6

Beim Abrufen von Daten ist es viel nützlicher, die fetch-basierten Methoden zu verwenden, da diese Record Objekte zurückgeben. asyncpg wandelt die Datentypen automatisch in die für Python am besten geeigneten Typen um.

7

Ich verwende sofort die get_row() Hilfe, um den neu eingefügten Datensatz anzuzeigen.

8

Ich ändere Daten, indem ich den Befehl UPDATE für SQL verwende. Es ist eine kleine Änderung: Der Jahreswert im Geburtsdatum wird um ein Jahr geändert. Wie zuvor wird dies mit der Methode execute() der Verbindung durchgeführt. Der Rest der Code-Demo folgt der gleichen Struktur wie bisher, und ein paar Zeilen weiter folgt ein DELETE, gefolgt von einem weiteren print().

Hier ist die Ausgabe, wenn du das Skript ausführst:

$ asyncpg-basic.py
After INSERT: <Record id=1 name='Bob' dob=datetime.date(1984, 3, 1)>
After UPDATE: <Record id=1 name='Bob' dob=datetime.date(1985, 3, 1)>
After DELETE: None

Beachte, wie der Datumswert in unserem RecordObjekt in ein Python date Objekt umgewandelt wurde: asyncpg hat den Datentyp automatisch vom SQL-Typ in sein Python-Gegenstück umgewandelt. Eine große Tabelle mitTypkonvertierungen in der Dokumentation von asyncpg beschreibt alle Typzuordnungen, die in die Bibliothek eingebaut sind.

Der vorangegangene Code ist sehr einfach, vielleicht sogar grob, wenn du an die Annehmlichkeiten von objektrelationalen Mappern (ORMs) wie SQLAlchemy oder dem eingebauten ORM des Django Web Frameworks gewöhnt bist. Am Ende dieses Kapitels erwähne ich einige Bibliotheken von Drittanbietern, die Zugang zu ORMs oder ORM-ähnlichen Funktionen für asyncpg bieten.

Beispiel 4-22 zeigt mein Boilerplate Database Objekt im utilsModul; vielleicht findest du es nützlich, etwas Ähnliches für deine eigenen Experimente zu erstellen.

Beispiel 4-22. Nützliche Hilfsmittel für deine asyncpg-Experimente
# util.py
import argparse, asyncio, asyncpg
from asyncpg.pool import Pool

DSN = 'postgresql://{user}@{host}:{port}'
DSN_DB = DSN + '/{name}'
CREATE_DB = 'CREATE DATABASE {name}'
DROP_DB = 'DROP DATABASE {name}'

class Database:
    def __init__(self, name, owner=False, **kwargs):
        self.params = dict(
            user='postgres', host='localhost',
            port=55432, name=name)  1
        self.params.update(kwargs)
        self.pool: Pool = None
        self.owner = owner
        self.listeners = []

    async def connect(self) -> Pool:
        if self.owner:
            await self.server_command(
                CREATE_DB.format(**self.params))  3

        self.pool = await asyncpg.create_pool(  4
            DSN_DB.format(**self.params))
        return self.pool

    async def disconnect(self):
        """Destroy the database"""
        if self.pool:
            releases = [self.pool.release(conn)
                        for conn in self.listeners]
            await asyncio.gather(*releases)
            await self.pool.close()  5
        if self.owner:
            await self.server_command(  6
                DROP_DB.format(**self.params))

    async def __aenter__(self) -> Pool:  2
        return await self.connect()

    async def __aexit__(self, *exc):
        await self.disconnect()

    async def server_command(self, cmd):  7
        conn = await asyncpg.connect(
            DSN.format(**self.params))
        await conn.execute(cmd)
        await conn.close()

    async def add_listener(self, channel, callback):  8
        conn: asyncpg.Connection = await self.pool.acquire()
        await conn.add_listener(channel, callback)
        self.listeners.append(conn)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--cmd', choices=['create', 'drop'])
    parser.add_argument('--name', type=str)
    args = parser.parse_args()
    d = Database(args.name, owner=True)
    if args.cmd == 'create':
        asyncio.run(d.connect())
    elif args.cmd == 'drop':
        asyncio.run(d.disconnect())
    else:
        parser.print_help()
1

Die Klasse Database ist nur ein ausgefallener Kontextmanager zum Erstellen und Löschen einer Datenbank in einer PostgreSQL-Instanz. Der Datenbankname wird dem Konstruktor übergeben.

2

(Hinweis: Die Reihenfolge der Aufrufe im Code weicht absichtlich von dieser Liste ab). Dies ist ein asynchroner Kontextmanager. Anstelle der üblichen Methoden__enter__() und __exit__() verwende ich ihre Gegenstücke__aenter__() und __aexit__() .

3

Hier, auf der Eingabeseite, erstelle ich die neue Datenbank und gebe eine Verbindung zu dieser neuen Datenbank zurück. server_command() ist eine weitere Hilfsmethode, die ein paar Zeilen weiter unten definiert ist. Ich verwende sie, um den Befehl zum Erstellen unserer neuen Datenbank auszuführen.

4

Dann stelle ich eine Verbindung zu der neu erstellten Datenbank her. Beachte, dass ich einige Details über die Verbindung fest einprogrammiert habe: Das ist Absicht, denn ich wollte die Codebeispiele klein halten. Du könntest dies leicht verallgemeinern, indem du Felder für den Benutzernamen, den Hostnamen und den Port erstellst.

5

Auf der Beendigungsseite des Kontextmanagers schließe ich die Verbindung und...

6

...die Datenbank zerstören.

7

Der Vollständigkeit halber: Dies ist unsere Utility-Methode zur Ausführung von Befehlen gegen den PostgreSQL-Server selbst. Sie erstellt zu diesem Zweck eine Verbindung, führt den angegebenen Befehl aus und beendet sich.

8

Diese Funktion erstellt eine dauerhafte Socket-Verbindung zur Datenbank, die auf Ereignisse wartet. Dieser Mechanismus wird in der nächsten Fallstudie vorgestellt.

Vorsicht

In Punkt 8 des vorangegangenen Codes habe ich für jeden Kanal, den ich abhören will, eine eigene Verbindung erstellt. Das ist teuer, denn es bedeutet, dass ein PostgreSQL-Worker für jeden Kanal, der abgehört wird, komplett ausgelastet ist. Viel besser wäre es, eine Verbindung für mehrere Kanäle zu verwenden. Wenn du dieses Beispiel durchgearbeitet hast, kannst du versuchen, den Code so zu ändern, dass eine einzige Verbindung für mehrere Channel Listener verwendet wird.

Jetzt, wo du die grundlegenden Bausteine von asyncpg kennst, können wir sie mit einer wirklich unterhaltsamen Fallstudie weiter erforschen: die Verwendung der in PostgreSQL eingebauten Unterstützung für das Senden von Ereignisbenachrichtigungen, um eine Cache-Ungültigkeitserklärung durchzuführen!

Fallstudie: Cache-Invalidierung

Es gibt zwei schwierige Dinge in der Informatik: Cache-Invalidierung, das Benennen von Dingen, und Off-by-One-Fehler.

Phil Karlton

Bei Webservices und Webanwendungen kommt es häufig vor, dass die Persistenzschicht, d. h. die Datenbank (DB), eher zum Leistungsengpass wird als jeder andere Teil des Stacks. Die Anwendungsschicht kann in der Regel horizontal skaliert werden, indem mehr Instanzen betrieben werden, während dies bei einer Datenbank schwieriger ist.

Deshalb ist es gängige Praxis, nach Designoptionen zu suchen, die eine übermäßige Interaktion mit der Datenbank begrenzen können. Die gebräuchlichste Option ist die Verwendung von Caching, um sich an zuvor abgefragte Datenbankergebnisse zu "erinnern" und sie bei Bedarf wieder abzurufen, so dass ein erneuter Aufruf der Datenbank für dieselben Informationen vermieden wird.

Was passiert aber, wenn eine deiner App-Instanzen neue Daten in die Datenbank schreibt, während eine andere App-Instanz noch die alten, veralteten Daten aus ihrem internen Cache zurückgibt? Das ist ein klassisches Cache-Invalidierungsproblem, das sich nur sehr schwer lösen lässt.

Unsere Angriffsstrategie sieht folgendermaßen aus:

  1. Jede App-Instanz hat einen In-Memory-Cache für DB-Abfragen.

  2. Wenn man neue Daten in die Datenbank schreibt, informiert die Datenbank alle verbundenen App-Instanzen über die neuen Daten.

  3. Jede App-Instanz aktualisiert dann ihren internen Cache entsprechend.

In dieser Fallstudie wird gezeigt, wie PostgreSQL mit seiner eingebauten Unterstützung für Ereignisaktualisierungen über dieLISTEN undNOTIFY Befehle, einfach mitteilen kann, wenn sich die Daten geändert haben.

asyncpg unterstützt bereits die LISTEN/NOTIFY API. Diese Funktion von PostgreSQL ermöglicht es deiner Anwendung, Ereignisse in einem benannten Kanal zu abonnieren und Ereignisse an benannte Kanäle zu senden. PostgreSQL kann fast zu einer leichteren Version vonRabbitMQ oderActiveMQ werden!

In dieser Fallstudie gibt es mehr bewegliche Teile als sonst, und das macht es schwierig, sie in der üblichen linearen Form zu präsentieren. Stattdessen fangen wir mit dem Endprodukt an und arbeiten uns rückwärts zur zugrunde liegenden Implementierung vor.

Unsere App bietet einen JSON-basierten API-Server für die Verwaltung der Lieblingsgerichte der Gäste in unserem Roboterrestaurant. Die Datenbank enthält nur eine Tabelle, patron, mit nur zwei Feldern: name und fav_dish. Unsere API ermöglicht die üblichen vier Operationen: Erstellen, Lesen,Aktualisieren und Löschen (CRUD).

Im Folgenden siehst du ein Beispiel für die Interaktion mit unserer API unter curl. Es zeigt, wie du einen neuen Eintrag in unserer Datenbank erstellst (ich habe noch nicht gezeigt, wie du den Server auf localhost:8000 startest; das kommt später):

$ curl -d '{"name": "Carol", "fav_dish": "SPAM Bruschetta"}' \
    -H "Content-Type: application/json" \
    -X POST \
    http://localhost:8000/patron
{"msg":"ok","id":37}

Der Parameter -d ist für Daten,2 -H ist für die HTTP-Header, -Xist für die HTTP-Anforderungsmethode (Alternativen sind GET, DELETE, PUT und einige andere) und die URL ist für unseren API-Server. Zu dem Code dafür kommen wir gleich.

In der Ausgabe sehen wir, dass die Erstellung ok war, und dass id der Primärschlüssel des neuen Datensatzes in der Datenbank ist.

In den nächsten paar Shell-Schnipseln werden wir die anderen drei Operationen durchgehen:lesen, aktualisieren und löschen. Mit diesem Befehl können wir den soeben erstellten Datensatz eines Gönners lesen:

$ curl -X GET http://localhost:8000/patron/37
{"id":37,"name":"Carol","fav_dish":"SPAM Bruschetta"}

Das Lesen der Daten ist ziemlich einfach. Beachte, dass die id des gewünschten Datensatzes in der URL angegeben werden muss.

Als Nächstes aktualisieren wir den Datensatz und überprüfen die Ergebnisse:

$ curl -d '{"name": "Eric", "fav_dish": "SPAM Bruschetta"}' \
    -H "Content-Type: application/json" \
    -X PUT \
    http://localhost:8000/patron/37
$ curl -X GET http://localhost:8000/patron/37
{"msg":"ok"}
{"id":37,"name":"Eric","fav_dish":"SPAM Bruschetta"}

Das Aktualisieren einer Ressource ist ähnlich wie das Erstellen einer Ressource, mit zwei wichtigen Unterschieden:

  • Die HTTP-Anforderungsmethode (-X) lautet PUT, nicht POST.

  • Die URL benötigt jetzt das Feld id, um anzugeben, welche Ressource aktualisiert werden soll.

Schließlich können wir den Datensatz löschen und seine Löschung mit den folgenden Befehlen überprüfen:

$ curl -X DELETE http://localhost:8000/patron/37
$ curl -X GET http://localhost:8000/patron/37
{"msg":"ok"}
null

Wie du sehen kannst, wird null zurückgegeben, wenn du versuchst,GET einen Datensatz zu finden, der nicht existiert.

Bis jetzt sieht das alles recht gewöhnlich aus, aber unser Ziel ist nicht nur eine CRUD-API, sondern wir wollen uns auch die Cache-Invalidierung ansehen. Richten wir also unsere Aufmerksamkeit auf den Cache. Da wir nun ein grundlegendes Verständnis der API unserer App haben, können wir uns die Anwendungsprotokolle ansehen, um die Zeitdaten für jede Anfrage zu ermitteln: Daraus können wir ersehen, welche Anfragen im Cache gespeichert sind und welche auf die DB treffen.

Wenn der Server zum ersten Mal gestartet wird, ist der Cache leer; es handelt sich schließlich um einen Speicher-Cache. Wir werden unseren Server starten und dann in einer separaten Shell zwei GET Anfragen kurz hintereinander ausführen:

$ curl -X GET http://localhost:8000/patron/29
$ curl -X GET http://localhost:8000/patron/29
{"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"}
{"id":29,"name":"John Cleese","fav_dish":"Gravy on Toast"}

Wir gehen davon aus, dass wir beim ersten Mal, wenn wir unseren Datensatz abrufen, einen Fehler im Cache haben werden und beim zweiten Mal einen Treffer. Das können wir im Protokoll des API-Servers selbst sehen (der erste Sanic-Webserver, der auf localhost:8000 läuft):

$ sanic_demo.py
2019-09-29 16:20:33 - (sanic)[DEBUG]:
                 ▄▄▄▄▄
        ▀▀▀██████▄▄▄       _______________
      ▄▄▄▄▄  █████████▄  /                 \
     ▀▀▀▀█████▌ ▀▐▄ ▀▐█ |   Gotta go fast!  |
   ▀▀█████▄▄ ▀██████▄██ | _________________/
   ▀▄▄▄▄▄  ▀▀█▄▀█════█▀ |/
        ▀▀▀▄  ▀▀███ ▀       ▄▄
     ▄███▀▀██▄████████▄ ▄▀▀▀▀▀▀█▌
   ██▀▄▄▄██▀▄███▀ ▀▀████      ▄██
▄▀▀▀▄██▄▀▀▌████▒▒▒▒▒▒███     ▌▄▄▀
▌    ▐▀████▐███▒▒▒▒▒▐██▌
▀▄▄▄▄▀   ▀▀████▒▒▒▒▄██▀
          ▀▀█████████▀
        ▄▄██▀██████▀█
      ▄██▀     ▀▀▀  █
     ▄█             ▐▌
 ▄▄▄▄█▌              ▀█▄▄▄▄▀▀▄
▌     ▐                ▀▀▄▄▄▀
 ▀▀▄▄▀

2019-09-29 16:20:33 (sanic): Goin' Fast @ http://0.0.0.0:8000
2019-09-29 16:20:33 (sanic): Starting worker [10366]  1
2019-09-29 16:25:27 (perf): id=37 Cache miss  2
2019-09-29 16:25:27 (perf): get Elapsed: 4.26 ms 3
2019-09-29 16:25:27 (perf): get Elapsed: 0.04 ms 4
1

Alles bis zu dieser Zeile ist die standardmäßige sanic Startup-Logmeldung.

2

Wie beschrieben, führt die erste GET zu einem Cache-Miss, weil der Server gerade erst gestartet ist.

3

Das ist von unserem ersten curl -X GET. Ich habe den API-Endpunkten einige Zeitfunktionen hinzugefügt. Hier können wir sehen, dass der Handler für die GET Anfrage ~4 ms brauchte.

4

Die zweite GET liefert Daten aus dem Cache und die viel schnelleren (100x schneller!) Zeitdaten.

So weit nichts Ungewöhnliches. Viele Webanwendungen nutzen das Caching auf diese Weise.

Starten wir nun eine zweite App-Instanz auf Port 8001 (die erste Instanz war auf Port 8000):

$ sanic_demo.py --port 8001
<snip>
2017-10-02 08:09:56 - (sanic): Goin' Fast @ http://0.0.0.0:8001
2017-10-02 08:09:56 - (sanic): Starting worker [385]

Beide Instanzen verbinden sich natürlich mit der gleichen Datenbank. Jetzt, wo beide API-Server-Instanzen laufen, ändern wir die Daten für den KundenJohn, der offensichtlich nicht genug Spam auf seinem Speiseplan hat. Hier führen wir eine UPDATE gegen die erste App-Instanz an Port 8000 aus:

$ curl -d '{"name": "John Cleese", "fav_dish": "SPAM on toast"}' \
    -H "Content-Type: application/json" \
    -X PUT \
    http://localhost:8000/patron/29
{"msg":"ok"}

Unmittelbar nach diesem Aktualisierungsereignis auf nur einer der App-Instanzen melden beideAPI-Server, 8000 und 8001, das Ereignis in ihren jeweiligen Protokollen:

2019-10-02 08:35:49 - (perf)[INFO]: Got DB event:
{
    "table": "patron",
    "id": 29,
    "type": "UPDATE",
    "data": {
        "old": {
            "id": 29,
            "name": "John Cleese",
            "fav_dish": "Gravy on Toast"
        },
        "new": {
            "id": 29,
            "name": "John Cleese",
            "fav_dish": "SPAM on toast"
        },
        "diff": {
            "fav_dish": "SPAM on toast"
        }
    }
}

Die Datenbank hat das Update-Ereignis an beide App-Instanzen zurückgemeldet. Wir haben aber noch keine Anfragen an die App-Instanz 8001 gestellt - bedeutet das, dass die neuen Daten dort bereits zwischengespeichert sind?

Um das zu überprüfen, können wir eine GET auf dem zweiten Server auf Port 8001 ausführen:

$ curl -X GET http://localhost:8001/patron/29
{"id":29,"name":"John Cleese","fav_dish":"SPAM on toast"}

Die Timing-Informationen in der Log-Ausgabe zeigen, dass wir die Daten tatsächlich direkt aus dem Cache beziehen, auch wenn dies unsere erste Anfrage ist:

2019-10-02 08:46:45 - (perf)[INFO]: get Elapsed: 0.04 ms

Das Ergebnis ist, dass alle verbundenen App-Instanzen benachrichtigt werden, wenn sich die Datenbank ändert, damit sie ihre Caches aktualisieren können.

Nach dieser Erklärung können wir uns nun dieasyncpg Code-Implementierung ansehen, die erforderlich ist, damit unsere Cache-Ungültigkeitserklärung tatsächlich funktioniert. Der grundlegende Entwurf für den in Beispiel 4-23 gezeigten Servercode sieht wie folgt aus:

  1. Wir haben eine einfache Web-API, die das neue, asyncio-kompatibleSanic Web-Framework nutzt.

  2. Die Daten werden in einer PostgreSQL-Instanz im Backend gespeichert, aber die API wird über mehrere Instanzen der Web-API-App-Server bedient.

  3. Die App-Server werden die Daten aus der Datenbank zwischenspeichern.

  4. Die App-Server abonnieren Ereignisse über asyncpg in bestimmten Tabellen der DB und erhalten Aktualisierungsbenachrichtigungen, wenn die Daten in der DB-Tabelle geändert wurden. So können die App-Server ihre individuellen In-Memory-Caches aktualisieren.

Beispiel 4-23. API-Server mit Sanic
# sanic_demo.py
import argparse
from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import json
from util import Database  1
from perf import aelapsed, aprofiler  2
import model

app = Sanic()  3

@aelapsed
async def new_patron(request):  4
    data = request.json  5
    id = await model.add_patron(app.pool, data)  6
    return json(dict(msg='ok', id=id))  7

class PatronAPI(HTTPMethodView, metaclass=aprofiler):  8
    async def get(self, request, id):
        data = await model.get_patron(app.pool, id)  9
        return json(data)

    async def put(self, request, id):
        data = request.json
        ok = await model.update_patron(app.pool, id, data)
        return json(dict(msg='ok' if ok else 'bad'))  10

    async def delete(self, request, id):
        ok = await model.delete_patron(app.pool, id)
        return json(dict(msg='ok' if ok else 'bad'))

@app.listener('before_server_start')  11
async def db_connect(app, loop):
    app.db = Database('restaurant', owner=False)  12
    app.pool = await app.db.connect()  13
    await model.create_table_if_missing(app.pool)  14
    await app.db.add_listener('chan_patron', model.db_event)  15

@app.listener('after_server_stop')  16
async def db_disconnect(app, loop):
    await app.db.disconnect()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--port', type=int, default=8000)
    args = parser.parse_args()
    app.add_route(
        new_patron, '/patron', methods=['POST'])  17
    app.add_route(
        PatronAPI.as_view(), '/patron/<id:int>')  18
    app.run(host="0.0.0.0", port=args.port)
1

Das Hilfsprogramm Database, wie oben beschrieben. Er stellt die Methoden bereit, die für die Verbindung mit der Datenbank erforderlich sind.

2

Zwei weitere Tools, die ich zusammengeschustert habe, um die verstrichene Zeit für jeden API-Endpunkt zu protokollieren. Ich habe dies in der vorherigen Diskussion verwendet, um zu erkennen, wann GET aus dem Cache zurückgegeben wurde. Die Implementierungen füraelapsed() und aprofiler() sind für diese Fallstudie nicht wichtig, aber du kannst sie in Beispiel B-1 erhalten.

3

Wir erstellen die Hauptinstanz der Sanic App.

4

Diese Coroutine-Funktion ist für die Erstellung neuer Gönnereinträge zuständig. In einemadd_route() -Aufruf am Ende des Codes istnew_patron() mit dem Endpunkt /patron verbunden, nur für die HTTP-MethodePOST. Der @aelapsed Dekorator ist nicht Teil der Sanic API: Er ist meine eigene Erfindung und dient lediglich dazu, die Zeiten für jeden Aufruf zu protokollieren.

5

Sanic bietet eine sofortige Deserialisierung der empfangenen JSON-Daten, indem es das .json Attribut des request Objekts verwendet.

6

Das Modul model, das ich importiert habe, ist das Modell für unsere Tabellepatron in der Datenbank. Im nächsten Code-Listing gehe ich näher darauf ein; im Moment musst du nur wissen, dass alle Datenbankabfragen und SQL in diesem model Modul enthalten sind. Hier übergebe ich den Verbindungspool für die Datenbank, und das gleiche Muster wird für die gesamte Interaktion mit dem Datenbankmodell in dieser Funktion und in der Klasse PatronAPI weiter unten verwendet.

7

Es wird ein neuer Primärschlüssel, id, erstellt, der als JSON an den Aufrufer zurückgegeben wird.

8

Während die Erstellung in der Funktion new_patron() abgewickelt wird, werden alle anderen Interaktionen in dieser klassenbasierten Ansicht abgewickelt, die von Sanic zur Verfügung gestellt wird. Alle Methoden dieser Klasse sind mit derselben URL verbunden, /patron/<id:int>, die du in der Funktion add_route() ganz unten sehen kannst. Beachte, dass der URL-Parameterid an jede der Methoden übergeben wird, und dass dieser Parameter für alle drei Endpunkte erforderlich ist.

Das Argument metaclass kannst du getrost ignorieren: Es dient lediglich dazu, jede Methode mit dem @aelapsed Dekorator zu umhüllen, damit die Zeitangaben in den Protokollen ausgegeben werden. Auch dies ist nicht Teil der Sanic API, sondern meine eigene Erfindung, um Zeitdaten zu protokollieren.

9

Die Interaktion mit dem Modell wird wie bisher im Modul model durchgeführt.

10

Wenn das Modell einen Fehler bei der Aktualisierung meldet, ändere ich die Antwortdaten. Ich habe dies für Leser eingefügt, die Pythons Version des ternären Operators noch nicht kennen.

11

Die @app.listener Dekoratoren sind Hooks, die Sanic zur Verfügung stellt, um dir die Möglichkeit zu geben, zusätzliche Aktionen während des Starts und des Herunterfahrens hinzuzufügen. Dieser Dekorator, before_server_start, wird vor dem Start des API-Servers aufgerufen. Dies scheint ein guter Ort zu sein, um unsere Datenbankverbindung zu initialisieren.

12

Verwende den Database Helfer, um eine Verbindung zu unserer PostgreSQL-Instanz herzustellen. Die DB, mit der wir uns verbinden, ist restaurant.

13

Besorge dir einen Verbindungspool zu unserer Datenbank.

14

Verwende unser Modell (für die Tabelle patron ), um die Tabelle zu erstellen, wenn sie fehlt.

15

Verwende unser Modell, um einen dedicated_listener für Datenbankereignisse zu erstellen, der auf den Kanal chan_patron hört. Die Callback-Funktion für diese Ereignisse ist model.db_event(), die ich im nächsten Listing erläutern werde. Der Callback wird jedes Mal aufgerufen, wenn die Datenbank den Kanal aktualisiert.

16

after_server_stop ist der Haken für Aufgaben, die beim Herunterfahren passieren müssen. Hier trennen wir die Verbindung mit der Datenbank.

17

Dieser add_route() Aufruf sendet POST Anfragen für die /patron URL an die new_patron() Coroutine-Funktion.

18

Dieser add_route() Aufruf sendet alle Anfragen für die /patron/<id:int> URL an die PatronAPI klassenbasierte Ansicht. Die Methodennamen in dieser Klasse bestimmen, welche Methode aufgerufen wird: Eine GET HTTP-Anfrage ruft die PatronAPI.get() Methode auf, und so weiter.

Der vorangehende Code enthält die gesamte HTTP-Behandlung für unseren Server sowie Aufgaben zum Starten und Herunterfahren, wie das Einrichten eines Verbindungspools zur Datenbank und - ganz wichtig - das Einrichten eines db-event Listeners auf dem chan_patron Kanal auf dem DB-Server.

Beispiel 4-24 zeigt das Modell für die Tabelle patron in der Datenbank.

Beispiel 4-24. DB-Modell für die Tabelle "patron
# model.py
import logging
from json import loads, dumps
from triggers import (
    create_notify_trigger, add_table_triggers)  1
from boltons.cacheutils import LRU  2

logger = logging.getLogger('perf')

CREATE_TABLE = ('CREATE TABLE IF NOT EXISTS patron('  3
                'id serial PRIMARY KEY, name text, '
                'fav_dish text)')
INSERT = ('INSERT INTO patron(name, fav_dish) '
          'VALUES ($1, $2) RETURNING id')
SELECT = 'SELECT * FROM patron WHERE id = $1'
UPDATE = 'UPDATE patron SET name=$1, fav_dish=$2 WHERE id=$3'
DELETE = 'DELETE FROM patron WHERE id=$1'
EXISTS = "SELECT to_regclass('patron')"

CACHE = LRU(max_size=65536)  4

async def add_patron(conn, data: dict) -> int:  5
    return await conn.fetchval(
        INSERT, data['name'], data['fav_dish'])

async def update_patron(conn, id: int, data: dict) -> bool:
    result = await conn.execute(  6
        UPDATE, data['name'], data['fav_dish'], id)
    return result == 'UPDATE 1'

async def delete_patron(conn, id: int):  7
    result = await conn.execute(DELETE, id)
    return result == 'DELETE 1'

async def get_patron(conn, id: int) -> dict:  8
    if id not in CACHE:
        logger.info(f'id={id} Cache miss')
        record = await conn.fetchrow(SELECT, id)  9
        CACHE[id] = record and dict(record.items())
    return CACHE[id]

def db_event(conn, pid, channel, payload):  10
    event = loads(payload)  11
    logger.info('Got DB event:\n' + dumps(event, indent=4))
    id = event['id']
    if event['type'] == 'INSERT':
        CACHE[id] = event['data']
    elif event['type'] == 'UPDATE':
        CACHE[id] = event['data']['new']  12
    elif event['type'] == 'DELETE':
        CACHE[id] = None

async def create_table_if_missing(conn):  13
    if not await conn.fetchval(EXISTS):
        await conn.fetchval(CREATE_TABLE)
        await create_notify_trigger(
            conn, channel='chan_patron')
        await add_table_triggers(
            conn, table='patron')
1

Du musst Trigger zur Datenbank hinzufügen, um Benachrichtigungen zu erhalten, wenn sich Daten ändern. Ich habe diese praktischen Helfer erstellt, um die Triggerfunktion selbst zu erstellen (mit create_notify_trigger) und um den Trigger zu einer bestimmten Tabelle hinzuzufügen (mit add_table_triggers). Die dafür erforderliche SQL-Anweisung würde den Rahmen dieses Buches sprengen, ist aber für das Verständnis der Funktionsweise dieses Fallbeispiels wichtig. Ich habe den kommentierten Code für diese Trigger inAnhang B aufgenommen.

2

Das Drittanbieter-Paket boltons bietet eine Reihe nützlicher Tools, nicht zuletzt den LRU Cache, der vielseitiger ist als der@lru_cache Dekorator im functools Standardbibliotheksmodul.3

3

Dieser Textblock enthält das gesamte SQL für die Standard-CRUD-Operationen. Beachte, dass ich die native PostgreSQL-Syntax für die Parameter verwende: $1, $2, und so weiter. Das ist nichts Neues und wird hier nicht weiter erläutert.

4

Erstelle den Cache für diese App-Instanz.

5

Ich habe diese Funktion aus dem Sanic-Modul innerhalb des Endpunkts new_patron() aufgerufen, um neue Kunden hinzuzufügen. Innerhalb der Funktion verwende ich die Methode fetchval(), um neue Daten einzufügen. Warum fetchval() und nicht execute()? Weil fetchval() den Primärschlüssel des neu eingefügten Datensatzes zurückgibt!4

6

Aktualisiere einen bestehenden Datensatz. Wenn dies erfolgreich ist, gibt PostgreSQL UPDATE 1 zurück. Ich verwende dies als Überprüfung, um zu sehen, ob die Aktualisierung erfolgreich war.

7

Das Löschen ist dem Aktualisieren sehr ähnlich.

8

Dies ist die Leseoperation. Dies ist der einzige Teil unserer CRUD-Schnittstelle, der sich um den Cache kümmert. Denk mal kurz darüber nach: Wir aktualisieren den Cache nicht, wenn wir etwas einfügen, aktualisieren oder löschen. Das liegt daran, dass wir uns auf die asynchrone Benachrichtigung aus der Datenbank (über die installierten Trigger) verlassen, um den Cache zu aktualisieren, wenn Daten geändert werden.

9

Natürlich wollen wir den Cache auch nach dem ersten GET nutzen.

10

Die Funktion db_event() ist der Callback, den asyncpg ausführt, wenn es Ereignisse auf unserem DB-Benachrichtigungskanal chan_patron gibt. Diese spezifische Parameterliste wird von asyncpg benötigt. conn ist die Verbindung, auf der das Ereignis gesendet wurde, pid ist die Prozess-ID der PostgreSQL-Instanz, die das Ereignis gesendet hat, channel ist der Name des Kanals (in diesem Fall chan_patron) und die Nutzlast sind die Daten, die über den Kanal gesendet werden.

11

Deserialisiere die JSON-Daten in ein Dict.

12

Die Cache-Population ist im Allgemeinen recht einfach, aber beachte, dass Aktualisierungsereignisse sowohl neue als auch alte Daten enthalten, sodass wir sicherstellen müssen, dass nur die neuen Daten gecacht werden.

13

Dies ist eine kleine Hilfsfunktion, mit der du eine fehlende Tabelle ganz einfach wiederherstellen kannst. Das ist sehr nützlich, wenn du das häufig tun musst - zum Beispiel, wenn du die Codebeispiele für dieses Buch schreibst!

Hier werden auch die Datenbankbenachrichtigungs-Trigger erstellt und zu unserer Tabelle patron hinzugefügt. Siehe Beispiel B-1für eine kommentierte Auflistung dieser Funktionen.

Damit sind wir am Ende dieser Fallstudie angelangt. Wir haben gesehen, wie einfach es mit Sanic ist, einen API-Server zu erstellen. Wir haben gesehen, wie man asyncpg für die Durchführung von Abfragen über einen Verbindungspool verwendet und wie man die asynchronen Benachrichtigungsfunktionen von PostgreSQL nutzt, um Rückrufe über eine dedizierte, langlebige Datenbankverbindung zu empfangen.

Viele Mitarbeiter von bevorzugen objektrelationale Mapper für die Arbeit mit Datenbanken, und in diesem Bereich ist SQLAlchemyführend. Es gibt immer mehr Unterstützung für die Verwendung von SQLAlchemy zusammen mit asyncpgin Bibliotheken von Drittanbietern wie asyncpgsa und GINO. Ein weiteres beliebtes ORM,Peewee, erhält Unterstützung fürasyncio durch das aiopeewee Paket.

Andere Bibliotheken und Ressourcen

Es gibt viele weitere Bibliotheken für asyncio, die in diesem Buch nicht behandelt werden. Um mehr zu erfahren, kannst du dir dasProjektaio-libs ansehen, das fast 40 Bibliotheken verwaltet, und dasProjekt Awesome asyncio , das viele andere Projekte bucht, die mit dem Modul asynciokompatibel sind.

Eine Bibliothek, die besonders zu erwähnen ist, istaiofiles. Wie du dich vielleicht aus unseren früheren Diskussionen erinnerst, habe ich gesagt, dass es für eine hohe Gleichzeitigkeit in Asyncio von entscheidender Bedeutung ist, dass die Schleife niemals blockiert. In diesem Zusammenhang haben wir uns ausschließlich auf blockierende Operationen im Netzwerk konzentriert, aber es hat sich herausgestellt, dass auch der Festplattenzugriff eine blockierende Operation ist, die sich bei sehr hoher Gleichzeitigkeit auf die Leistung auswirkt. Die Lösung für dieses Problem ist aiofiles, das einen praktischen Wrapper für den Festplattenzugriff in einem Thread bietet. Das funktioniert, weil Python die GIL während der Datei-Operationen freigibt, so dass dein Haupt-Thread (der die asyncio Schleife ausführt) davon nicht betroffen ist.

Die wichtigste Domäne für Asyncio wird die Netzwerkprogrammierung sein. Deshalb ist es keine schlechte Idee, ein wenig über Socket-Programmierung zu lernen, und auch nach all den Jahren ist Gordon McMillans"Socket Programming HOWTO", das in der Standard-Python-Dokumentation enthalten ist, eine der besten Einführungen, die du finden kannst.

Ich habe Asyncio aus einer Vielzahl von Quellen gelernt, von denen viele bereits in früheren Abschnitten erwähnt wurden. Jeder lernt anders, deshalb lohnt es sich, verschiedene Arten von Lernmaterialien zu erkunden. Hier sind ein paar andere, die ich nützlich fand:

  • Robert Smallshires Vortrag "Get to Grips with Asyncio in Python 3", gehalten beim NDC London im Januar 2017. Das ist bei weitem das beste YouTube-Video über Asyncio, das ich kenne. Der Vortrag mag für Anfänger/innen etwas fortgeschritten sein, aber er beschreibt sehr anschaulich, wie Asyncio aufgebaut ist.

  • Nikolay Noviks Folien zu "Building Apps with Asyncio", präsentiert auf der PyCon UA 2016. Die Informationen sind dicht, aber in diesen Folien sind viele praktische Erfahrungen festgehalten.

  • Endlose Sitzungen in der Python REPL, Dinge ausprobieren und "sehen, was passiert".

Ich ermutige dich, weiter zu lernen, und wenn du ein Konzept nicht verstehst, suche weiter nach neuen Quellen, bis du eine Erklärung findest, die für dich stimmt.

1 Eigentlich schon, solange die Sockets, die in verschiedenen Threads verwendet werden, vollständig in ihren eigenen Threads erstellt, verwendet und zerstört werden. Das ist zwar möglich, aber schwer zu bewerkstelligen, und viele Leute tun sich schwer, das richtig hinzubekommen. Deshalb ist die Empfehlung, einen einzigen Thread und einen Polling-Mechanismus zu verwenden, so wichtig.

2 Das Rezept für dieses Gericht und für andere leckere Gerichte aus Spam findest du auf der UKTV-Website.

3 Bekomme boltons mit pip install boltons.

4 Du brauchst aber auch den RETURNING id Teil des SQLs!

Get Asyncio in Python verwenden now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.