Kapitel 4. Umgang mit Zeit

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Ein entscheidender Unterschied zwischen der Programmierung von Anwendungen für einen Stream-Prozessor und einen Batch-Prozessor ist die Notwendigkeit, die Zeit explizit zu behandeln. Nehmen wir eine sehr einfache Anwendung: Zählen. Wir haben einen nicht enden wollenden Strom von Ereignissen (z. B. Tweets, Klicks, Transaktionen) und wollen die Ereignisse nach einem Schlüssel gruppieren und regelmäßig (z. B. jede Stunde) die Anzahl der einzelnen Ereignisse für jeden Schlüssel ausgeben. Das ist die sprichwörtliche Anwendung für "Big Data", die dem berüchtigten Beispiel der Wortzählung für MapReduce entspricht.

Zählen mit Batch- und Lambda-Architekturen

Auch wenn das einfach erscheint, ist das Zählen im großen Maßstab und in der Praxis überraschend schwierig und kommt natürlich überall vor. Andere Analysen, wie z. B. Aggregationen oder Operationen auf Online Analytical Processing (OLAP) Cubes, sind einfache Verallgemeinerungen des Zählens. Mit einer traditionellen Batch-Verarbeitungsarchitektur würden wir dies wie in Abbildung 4-1 dargestellt umsetzen.

Implementing continuous applications using periodic batch jobs. Data is continuously sliced into files, possibly on an hourly basis, and batch jobs are run with these files as input, giving an impression of a continuous processing of incoming data.
Abbildung 4-1. Kontinuierliche Anwendungen mit periodischen Batch-Aufträgen implementieren. Die Daten werden kontinuierlich in Dateien zerlegt, möglicherweise auf stündlicher Basis, und Batch-Aufträge werden mit diesen Dateien als Input ausgeführt, was den Eindruck einer kontinuierlichen Verarbeitung der eingehenden Daten vermittelt.

Bei dieser Architektur erstellt eine kontinuierliche Dateningestion-Pipeline stündlich Dateien (die in der Regel in einem verteilten Dateispeicher wie dem Hadoop Distributed File System [HDFS] oder MapR-FS gespeichert werden). Dies kann mit einem Tool wie Apache Flume geschehen. Ein Zeitplannungsprogramm plant einen Batch-Job (mit MapReduce oder einer anderen Alternative), um die letzte Datei zu analysieren, die Ereignisse in der Datei nach Schlüsseln zu gruppieren und die einzelnen Ereignisse pro Schlüssel zu zählen, um die letzten Zählungen auszugeben. Jedes Unternehmen, das Hadoop einsetzt, hat mehrere solcher Pipelines in seinen Clustern laufen.

Auch wenn diese Architektur durchaus funktionieren kann, gibt es einige Probleme damit:

  • Zu viele bewegliche Teile: Wir verwenden viele Systeme, um die Ereignisse in unseren eingehenden Daten zu zählen. Alle diese Systeme sind mit Lern- und Verwaltungskosten verbunden und weisen Fehler in den verschiedenen Programmen auf.

  • Implizite Behandlung der Zeit: Nehmen wir an, dass wir alle 30 Minuten zählen wollen und nicht jede Stunde. Diese Logik ist Teil der Workflow-Planungslogik (und nicht des Anwendungscodes), die DevOps-Belange mit Geschäftsanforderungen vermischt.

  • Frühwarnungen: Nehmen wir an, dass wir zusätzlich zur stündlichen Zählung so früh wie möglich (bei mindestens 10 Ereignissen) Warnmeldungen erhalten möchten. Dafür können wir Storm verwenden, um den Nachrichtenstrom (Kafka oder MapR Streams) zusätzlich zu den periodischen Batch-Aufträgen zu verarbeiten. Storm liefert frühe ungefähre Zählungen, und die periodischen Aufträge liefern die genauen stündlichen Zählungen. Wir haben gerade ein weiteres System und ein neues Programmiermodell in den Mix aufgenommen. Es handelt sich dabei um die Lambda-Architektur, die in Kapitel 1 kurz beschrieben und hier in Abbildung 4-2 dargestellt wird.

    Implementing continuous applications using periodic batch jobs and early results using a stream processor (Lambda architecture). The stream processor is used to provide approximate but real-time results, which are eventually corrected by the batch layer.
    Abbildung 4-2. Implementierung von kontinuierlichen Anwendungen mit periodischen Batch-Aufträgen und frühen Ergebnissen mit einem Stream-Prozessor (Lambda-Architektur). Der Stream-Prozessor wird verwendet, um ungefähre Ergebnisse in Echtzeit zu liefern, die dann von der Batch-Schicht korrigiert werden.
  • Ereignisse in falscher Reihenfolge: In den meisten realen Datenströmen können Ereignisse in falscher Reihenfolge eintreffen, d. h. die Reihenfolge, in der die Ereignisse in der realen Welt auftreten (wie sie durch die Zeitstempel angezeigt werden, die den Ereignissen bei ihrer Erzeugung beigefügt werden [z. B. die vom Smartphone gemessene Zeit, wenn sich ein Nutzer in eine Anwendung einloggt]), unterscheidet sich von der Reihenfolge, in der die Ereignisse im Rechenzentrum beobachtet werden. Das bedeutet, dass ein Ereignis, das zum vorherigen stündlichen Stapel gehört, möglicherweise fälschlicherweise zum aktuellen Stapel gezählt wird. Es gibt keine einfache Möglichkeit, dieses Problem mit dieser Architektur zu lösen - die meisten Leute ignorieren einfach, dass es diese Realität gibt.

  • Unklare Batch-Grenzen: Die Bedeutung von "stündlich" ist in dieser Architektur nicht ganz klar, da sie von der Interaktion zwischen den verschiedenen Systemen abhängt. Die stündlichen Batches sind bestenfalls ungefähre Angaben, wobei Ereignisse an den Kanten der Batches entweder im aktuellen oder im nächsten Batch landen, wobei es kaum Garantien gibt. Den Datenstrom in stündliche Batches zu unterteilen, ist eigentlich die einfachste Möglichkeit, die Zeit aufzuteilen. Nehmen wir an, dass wir Aggregate nicht für einfache stündliche Batches, sondern für Aktivitätsphasen (z. B. von der Anmeldung bis zur Abmeldung oder Inaktivität) erstellen möchten. Mit der in Abbildung 4-1 und Abbildung 4-2 gezeigten Architektur gibt es keine einfache Möglichkeit, dies zu tun.

Zählen mit Streaming-Architektur

Es muss doch einen besseren Weg geben, um Zählungen aus einem Strom von Ereignissen zu erstellen. Wie du vielleicht schon vermutet hast, handelt es sich um einen Streaming-Anwendungsfall, bei dem wir periodische Batch-Aufträge verwenden, um Streaming zu simulieren. Außerdem müssen wir eine Vielzahl von Systemen miteinander verknüpfen. Mit einer Streaming-Architektur würde die Anwendung dem Modell in Abbildung 4-3 folgen.

Implementing continuous applications using a streaming architecture. The message transport (Kafka, MapR Streams) is shown here as a horizontal cylinder. It supplies streaming data to the stream processor (in our case, Flink) that is used for all data processing, providing both real-time results and correct results.
Abbildung 4-3. Implementierung von kontinuierlichen Anwendungen mit einer Streaming-Architektur. Der Nachrichtentransport (Kafka, MapR Streams) ist hier als horizontaler Zylinder dargestellt. Er liefert Streaming-Daten an den Stream-Prozessor (in unserem Fall Flink), der für die gesamte Datenverarbeitung verwendet wird und sowohl Echtzeit- als auch korrekte Ergebnisse liefert.

Der Ereignisstrom wird wieder über den Nachrichtentransport bereitgestellt und einfach von einem einzigen Flink-Job verarbeitet, der stündliche Zählungen und (optional) Frühwarnungen erstellt. Dieser Ansatz löst alle bisherigen Probleme auf unkomplizierte Weise. Verlangsamungen des Flink-Jobs oder Durchsatzspitzen stauen sich einfach im Message-Transport-Tool an. Die Logik zur Aufteilung von Ereignissen in zeitgerechte Batches ( Fenster genannt) ist vollständig in die Anwendungslogik des Flink-Programms eingebettet. Frühzeitige Warnungen werden von demselben Programm erzeugt. Ereignisse außerhalb der Reihenfolge werden von Flink transparent behandelt. Die Gruppierung nach Sitzung statt nach einem festen Zeitpunkt bedeutet, dass die Fensterdefinition im Flink-Programm einfach geändert wird. Um die Anwendung mit geändertem Code erneut abzuspielen, muss lediglich das Kafka-Thema erneut abgespielt werden. Durch die Einführung einer Streaming-Architektur haben wir die Anzahl der Systeme, die wir lernen, verwalten und in denen wir Code erstellen müssen, erheblich reduziert. Der Flink-Anwendungscode für diese Zählung ist sehr einfach:

DataStream<LogEvent> stream = env
  // create stream from Kafka
  .addSource(new FlinkKafkaConsumer(...))
  // group by country
  .keyBy("country")
  // window of size 1 hour
  .timeWindow(Time.minutes(60))
  // do operations per window
  .apply(new CountPerWindowFunction());

Es gibt zwei Hauptunterschiede zwischen den beiden Ansätzen: 1) Wir behandeln den nie endenden Strom eingehender Ereignisse als das, was er tatsächlich ist - ein Strom -, anstatt zu versuchen, ihn künstlich in Dateien zu unterteilen, und 2) wir kodieren die Definition von Zeit (um den Strom in Gruppen zu unterteilen) explizit im Anwendungscode (das obige Zeitfenster), anstatt ihre Definition implizit auf die Aufnahme, Berechnung und Planung zu übertragen.

Batching in Stromverarbeitungssystemen

Der Begriff "Micro-Batching", den wir in Kapitel 1 besprochen haben, wird verwendet, um etwas zwischen Batch und Streaming zu bezeichnen. In Wirklichkeit kann Micro-Batching je nach Kontext ganz unterschiedliche Dinge bedeuten. In gewisser Weise ist die Batch-Architektur, die wir in Abbildung 4-1 gesehen haben, eine Micro-Batch-Architektur, wenn die Dateien ausreichend klein sind.

Storm Trident implementiert Micro-Batching, indem es ein großes Storm-Ereignis erstellt, das eine feste Anzahl von Ereignissen enthält, und die aggregierten Ereignisse mit einer kontinuierlich laufenden Storm-Topologie verarbeitet. Spark Streaming implementiert Micro-Batching im Wesentlichen als die Batch-Architektur, die wir gesehen haben, verbirgt aber die ersten beiden Schritte (Ingestion und Speicherung) vor dem Benutzer und speichert die Mini-Batches intern im Speicher, in einem Write-Ahead-Log statt in Dateien. Schließlich verwendet jeder moderne Stream-Prozessor, einschließlich Flink, intern eine Form von Micro-Batches, indem er Puffer, die viele Ereignisse enthalten, in Shuffle-Phasen über das Netzwerk schickt, anstatt einzelne Ereignisse. All diese Formen des Micro-Batching sind sehr unterschiedlich.

Um das zu verdeutlichen, sollte die Dosierung in Stromverarbeitungssystemen die folgenden Anforderungen erfüllen:

  • Batching sollte nur als Mechanismus zur Leistungssteigerung eingesetzt werden. Je größer die Batches sind, desto größer ist der Durchsatz, auf den ein System skalieren kann.

  • Das Batching für die Leistung sollte völlig unabhängig von der Pufferung für die Definition von Fenstern oder von Commits für die Fehlertoleranz sein und sollte nicht Teil der API sein. Eine Kopplung führt zu eingeschränkten, schwer zu nutzenden und anfälligen Systemen.

Letztendlich solltest du dich als Anwendungsentwickler und Nutzer von Datenverarbeitungssystemen nicht darum kümmern, ob und wie ein System Micro-Batching implementiert. Stattdessen solltest du dir Gedanken darüber machen, ob das System mit ungeordneten Streams und Sessions und anderen falsch ausgerichteten Fenstern umgehen kann, ob es neben genauen Aggregaten auch Frühwarnungen liefern kann und ob es vergangene Daten deterministisch wiedergeben kann, sowie über die Leistungsmerkmale des Systems (Latenz und Durchsatz) und die Garantien des Systems im Falle von Ausfällen.

Vorstellungen von Zeit

Bei der Verarbeitung von Datenströmen sprechen wir im Allgemeinen von zwei Hauptbegriffen der Zeit:1

  • DieEreigniszeit ist der Zeitpunkt, an dem ein Ereignis tatsächlich in der realen Welt stattgefunden hat. Genauer gesagt ist jedes Ereignis in der Regel mit einem Zeitstempel verbunden, der Teil des Datensatzes selbst ist (z. B. gemessen von einem Mobiltelefon oder einem Server, der Protokolle ausgibt). Die Ereigniszeit eines Ereignisses ist einfach ein Zeitstempel.

  • DieVerarbeitungszeit ist die Zeit, die das Ereignis von der Maschine, die es verarbeitet, beobachtet wird. Die Verarbeitungszeit eines Ereignisses ist einfach die Zeit, die von der Uhr der Maschine, die das Ereignis verarbeitet, gemessen wird.

Abbildung 4-4 veranschaulicht den Unterschied zwischen Ereigniszeit und Bearbeitungszeit.

An example of an out-of-order stream of events where processing time order is different from event time order.
Abbildung 4-4. Ein Beispiel für einen ungeordneten Strom von Ereignissen, bei dem die Reihenfolge der Bearbeitungszeit von der Reihenfolge der Ereigniszeit abweicht.

Betrachte die Star Wars-Filmreihe: Die ersten Filme, die 1977, 1980 und 1983 in die Kinos kamen (das ist die Verarbeitungszeit), waren die Filme 4, 5 und 6 in der Handlung der Reihe (das ist die Ereigniszeit). Die Filme, die in den Jahren 1999, 2002, 2005 und 2015 in der Bearbeitungszeit erschienen sind, beziehen sich auf die Filme 1, 2, 3 und 7 in der Ereigniszeit. Daher können Ströme in der falschen Reihenfolge ankommen (obwohl sie normalerweise nicht Jahre in der falschen Reihenfolge sind).

Oft wird ein dritter Zeitbegriff verwendet, der Ingestion Time oder Ingress Time genannt wird und sich auf den Zeitpunkt bezieht, an dem das Ereignis in die Streamverarbeitung eintritt. Daten, die keine echte Ereigniszeit haben, können mit einer Zeit versehen werden, aber diese Zeitstempel werden einfach vom Stream-Prozessor zugewiesen, wenn er das Ereignis zum ersten Mal sieht (in der Quellfunktion, dem ersten Operator des Programms).

Aufgrund verschiedener realer Faktoren (z. B. vorübergehender Mangel an Konnektivität, unterschiedliche Verzögerungen des Netzwerks, Uhren in verteilten Systemen, Datenraten-Spikes, physikalische Gegebenheiten oder einfach nur Pech) haben Ereigniszeit und Verarbeitungszeit immer eine zeitliche Verzögerung ( Ereigniszeit-Skew genannt). Die Reihenfolge der Ereignisse auf der Grundlage der Ereigniszeit stimmt oft nicht mit der Reihenfolge auf der Grundlage der Verarbeitungszeit überein, d. h., die Ereignisse kommen beim Stream-Prozessor nicht in der richtigen Reihenfolge an.

Je nach Anwendung sind beide Zeitvorstellungen sinnvoll. Manche Anwendungen (z. B. einige Alarmierungsanwendungen) brauchen so schnell wie möglich Ergebnisse und sind zufrieden, wenn diese Ergebnisse etwas ungenau sind. In solchen Fällen ist es nicht nötig, auf verspätete Ereignisse zu warten, und die Semantik der Verarbeitungszeit ist eine gute Wahl. Bei anderen Anwendungen (z. B. Betrugserkennung oder Rechnungsstellung) ist Genauigkeit gefragt: Ein Ereignis sollte in dem Zeitfenster berücksichtigt werden, in dem es tatsächlich eingetreten ist. Für diese Anwendungen ist die Ereigniszeitsemantik meist die richtige Wahl. Und es gibt auch Anwendungen, die beides brauchen, z. B. genaue Zählungen, aber auch eine frühzeitige Warnung, wenn eine Anomalie entdeckt wird.

Tipp

Flink ermöglicht es dem Nutzer, Fenster in der Verarbeitungszeit, der Ingestion-Zeit oder der Event-Zeit zu definieren, je nach gewünschter Semantik und Genauigkeitsanforderungen der Anwendung.

Wenn ein Zeitfenster in der Ereigniszeit definiert ist, kann die Anwendung mit Ereignissen außerhalb der Reihenfolge und mit unterschiedlichen Zeitverzögerungen umgehen. Sie berechnet aussagekräftige Ergebnisse in Bezug auf die Zeit, die den Ereignissen innewohnt.

Windows

Im ersten Abschnitt dieses Kapitels haben wir uns ein Beispiel für die Definition eines Zeitfensters in Flink angesehen, um die Ergebnisse der letzten Stunde zusammenzufassen. Mit Hilfe von Zeitfenstern lassen sich eine Reihe von Ereignissen nach Zeit oder einem anderen Merkmal gruppieren und sammeln, um diese Ereignisse als Ganzes zu analysieren (z. B. zu summieren).

Zeitfenster

Die einfachste und nützlichste Form von Fenstern sind solche, die auf der Zeit basieren. Zeitfenster können taumelnd oder gleitend sein. Nehmen wir zum Beispiel an, wir zählen die Werte, die ein Sensor ausgibt, und vergleichen diese Auswahl:

Ein Taumelfenster von 1 Minute sammelt die Werte der letzten Minute und gibt ihre Summe am Ende der Minute aus, wie in Abbildung 4-5 gezeigt.

A tumbling time window of 1 minute that sums the last minute’s worth of values.
Abbildung 4-5. Ein taumelndes Zeitfenster von 1 Minute, das die Werte der letzten Minute summiert.

Ein gleitendes Fenster von 1 Minute, das jede halbe Minute gleitet, zählt die Werte der letzten Minute und gibt den Zählerstand jede halbe Minute aus, wie in Abbildung 4-6 gezeigt.

A sliding time window that computes the sum of the last minute’s values every half minute.
Abbildung 4-6. Ein gleitendes Zeitfenster, das jede halbe Minute die Summe der Werte der letzten Minute berechnet.

Im ersten gleitenden Fenster werden die Werte 9, 6, 8 und 4 addiert, was das Ergebnis 27 ergibt. Als Nächstes verschiebt sich das Fenster um eine halbe Minute (sagen wir, 2 Werte in unserem Beispiel), und die Werte 8, 4, 7, 3 werden addiert, was das Ergebnis 22 ergibt, usw. Ein taumelndes Zeitfenster von 1 Minute kann in Flink einfach wie folgt definiert werden:

stream.timeWindow(Time.minutes(1))

Und ein gleitendes Zeitfenster von 1 Minute, das sich alle 30 Sekunden verschiebt, kann so einfach definiert werden wie:

stream.timeWindow(Time.minutes(1), Time.seconds(30))

Fenster zählen

Ein weiterer gängiger Fenstertyp, der von Flink unterstützt wird, ist das Zählfenster. Hier gruppieren wir die Elemente nach ihrer Anzahl und nicht nach ihrem Zeitstempel. Das gleitende Fenster in Abbildung 4-6 kann zum Beispiel auch als ein Zählfenster der Größe 4 Elemente interpretiert werden, das alle 2 Elemente gleitet. Taumelnde und gleitende Zählfenster können so einfach definiert werden wie:

stream.countWindow(4)
stream.countWindow(4, 2)

Zählfenster sind zwar nützlich, aber weniger streng definiert als Zeitfenster und sollten mit Vorsicht verwendet werden. Da die Zeit immer weiterläuft, wird sich ein Zeitfenster immer irgendwann "schließen". Bei einem Zählfenster von z. B. 100 Elementen kann es jedoch vorkommen, dass es nie 100 Elemente für diesen Schlüssel gibt, was dazu führt, dass sich das Fenster nie schließt und der vom Fenster belegte Speicher leer bleibt. Eine Möglichkeit, dies zu verhindern, ist die Kopplung eines Zeitfensters mit einer Zeitüberschreitung durch einen Auslöser, den wir später im Abschnitt "Auslöser" beschreiben.

Sitzungsfenster

Ein weiterer sehr nützlicher Fenstertyp, der von Flink bereitgestellt wird, ist das Sitzungsfenster. Wie bereits in Kapitel 3 kurz erwähnt, ist eine Sitzung ein Zeitraum, dem eine Phase der Aktivität vorausgeht und auf den eine Phase der Inaktivität folgt; zum Beispiel eine Reihe von Interaktionen eines Nutzers auf einer Website, auf die das Schließen der Browser-Registerkarte folgt oder der Nutzer einfach inaktiv wird. Sitzungen benötigen einen eigenen Mechanismus, weil sie in der Regel keine bestimmte Dauer (manche Sitzungen können 30 Sekunden und andere 1 Stunde dauern) oder eine bestimmte Anzahl von Interaktionen haben (manche Sitzungen können 3 Klicks mit anschließendem Kauf und andere 40 Klicks ohne Kauf umfassen).

Hinweis

Flink ist derzeit die einzige Open Source Stream Processing Engine, die Sessions unterstützt.

Sitzungsfenster werden in Flink mit einem Timeout angegeben. Damit legen wir fest, wie lange wir warten wollen, bis wir glauben, dass eine Sitzung beendet ist. In diesem Beispiel läuft eine Sitzung ab, wenn der Benutzer fünf Minuten lang inaktiv ist:

stream.window(SessionWindows.withGap(Time.minutes(5))

Auslöser

Zusätzlich zu den Fenstern bietet Flink auch einen optionalen Mechanismus zur Definition von Triggern. Trigger steuern, wann die Ergebnisse zur Verfügung gestellt werden - mit anderen Worten, wann der Inhalt eines Fensters aggregiert und an den Nutzer zurückgegeben wird. Jedes Standardfenster ist mit einem Trigger verbunden. Zum Beispiel wird ein Zeitfenster zur Ereigniszeit ausgelöst, wenn ein Wasserzeichen eintrifft. Als Nutzer kannst du aber auch einen benutzerdefinierten Auslöser implementieren (z. B. ungefähre frühe Ergebnisse des Fensters alle 1 Sekunde), zusätzlich zu den vollständigen und genauen Ergebnissen bei Eintreffen des Wasserzeichens.

Implementierung von Windows

Intern werden in Flink alle diese Fenstertypen über denselben Mechanismus implementiert. Obwohl die Interna des Mechanismus für einfache Benutzer nicht wichtig sind, ist es wichtig zu wissen, dass:

  • Der Fenstermechanismus ist völlig unabhängig vom Checkpointing-Mechanismus (der in Kapitel 5 ausführlich behandelt wird). Das bedeutet, dass die Dauer des Fensters nicht vom Checkpointing-Intervall abhängt und dass man auch Fenster ohne "Dauer" definieren kann (z. B. die Zähl- und Sitzungsfenster, die wir oben gesehen haben).

  • Fortgeschrittene können den zugrunde liegenden Mechanismus direkt nutzen, um kompliziertere Formen von Fenstern zu definieren (z. B. Zeitfenster, die auch ein Zwischenergebnis auf Basis der Anzahl oder sogar einen Wert eines bestimmten Datensatzes liefern).

Zeitreise

Ein zentraler Aspekt der Streaming-Architektur ist die Zeitreise. Wenn die gesamte Datenverarbeitung durch den Stream-Prozessor erfolgt, wie entwickeln wir dann Anwendungen, wie verarbeiten wir historische Daten und wie verarbeiten wir die Daten erneut (z. B. zu Debugging- oder Prüfungszwecken)? Diese Idee wird in Abbildung 4-7 dargestellt.

Time travel for data reprocessing. Support for event time by the stream processor means that rerunning the same program on the same data by rewinding the stream will yield the same results.
Abbildung 4-7. Zeitreise für die Wiederverarbeitung von Daten. Die Unterstützung der Ereigniszeit durch den Stream-Prozessor bedeutet, dass die Wiederholung desselben Programms mit denselben Daten durch Zurückspulen des Streams zu denselben Ergebnissen führt.

Wie in Abbildung 4-7 dargestellt, bedeutet Zeitreise, dass der Stream zu einem Zeitpunkt in der Vergangenheit zurückgespult und die Verarbeitung von dort aus neu gestartet wird, um schließlich die Gegenwart zu erreichen. Moderne Transportschichten wie Apache Kafka und MapR Streams unterstützen diese Funktion und unterscheiden sich damit von älteren Lösungen. Während die Echtzeit-Streamverarbeitung immer die letzten Daten (das "Jetzt") in der Abbildung verarbeitet, beginnt die historische Streamverarbeitung in der Vergangenheit und holt (optional) die Gegenwart ein.

Hinweis

Um in der Lage zu sein, in der Zeit zurückzureisen und die Daten korrekt zu verarbeiten, muss der Stream-Prozessor die Ereigniszeit unterstützen.

Wenn die Fenster auf der Basis der Wanduhrzeit definiert werden und nicht nach den Zeitstempeln, die in den Datensätzen selbst eingebettet sind, werden wir jedes Mal, wenn wir dieselbe Anwendung ausführen, ein anderes Ergebnis erhalten. Die Ereigniszeit macht die Verarbeitung deterministisch, indem sie garantiert, dass die gleiche Anwendung mit dem gleichen Stream die gleichen Ergebnisse liefert.

Wasserzeichen

Wir haben gesehen, dass die Unterstützung der Ereigniszeit für die Streaming-Architektur von zentraler Bedeutung ist, denn sie sorgt für Genauigkeit und die Möglichkeit, Daten weiterzuverarbeiten. Wenn die Berechnung auf der Ereigniszeit basiert, woher wissen wir dann, dass alle Ereignisse eingetroffen sind und dass wir das Ergebnis eines Fensters berechnen und ausgeben können? Mit anderen Worten: Wie behalten wir die Ereigniszeit im Auge und wissen, dass eine bestimmte Ereigniszeit im Eingabestrom erreicht wurde? Um die Ereigniszeit im Auge zu behalten, brauchen wir eine Art Uhr, die von den Daten angetrieben wird und nicht von den Wanduhren der Maschinen, die die Berechnungen durchführen.

Betrachte die 1-Minuten-Taumelfenster in Abbildung 4-5. Nehmen wir an, das erste Fenster beginnt um 10:00:00 Uhr (d.h. 10 Stunden, 0 Minuten, 0 Sekunden) und muss alle Werte von 10:00:00 Uhr bis 10:01:00 Uhr summieren. Woher wissen wir, dass die Zeit 10:01:00 ist, wenn die Zeit Teil der Datensätze selbst ist? Mit anderen Worten: Woher wissen wir, dass ein Element mit dem Zeitstempel 10:00:59 nicht ankommen wird?

Flink erreicht dies über Wasserzeichen, einen Mechanismus zur Vorverlegung der Ereigniszeit. Wasserzeichen sind regelmäßige Datensätze, die in den Stream eingebettet sind und die Berechnungen anhand der Ereigniszeit darüber informieren, dass eine bestimmte Zeit erreicht wurde. Wenn das oben genannte Fenster ein Wasserzeichen mit einer Zeitmarke größer als 10:01:00 erhält (zum Beispiel würden sowohl ein Wasserzeichen mit der Zeitmarke 10:01:00 als auch ein Wasserzeichen mit der Zeitmarke 10:03:43 genauso funktionieren), weiß es, dass keine weiteren Datensätze mit einem Zeitstempel größer als die Marke auftreten werden; alle Ereignisse mit einer Zeit kleiner oder gleich dem Zeitstempel sind bereits eingetreten. Dann kann er das Ergebnis des Fensters (die Summe) sicher berechnen und ausgeben. Bei Wasserzeichen läuft die Ereigniszeit völlig unabhängig von der Verarbeitungszeit ab. Wenn zum Beispiel ein Wasserzeichen zu spät kommt ("zu spät" bezieht sich auf die Verarbeitungszeit), hat das keinen Einfluss auf die Korrektheit der Ergebnisse, sondern nur auf die Geschwindigkeit, mit der wir die Ergebnisse erhalten.

Wie Wasserzeichen erzeugt werden

In Flink generiert der Anwendungsentwickler Wasserzeichen, da dies in der Regel einige Kenntnisse über die Domäne erfordert. Ein perfektes Wasserzeichen ist ein Wasserzeichen, das niemals falsch sein kann, d.h. kein Ereignis wird jemals nach einem Wasserzeichen mit einer Ereigniszeit von vor dem Wasserzeichen eintreffen. Unter besonderen Umständen kann sogar der Zeitstempel des letzten Ereignisses ein perfektes Wasserzeichen sein. Das könnte zum Beispiel der Fall sein, wenn unser Input perfekt geordnet ist. Ein heuristisches Wasserzeichen ist dagegen nur eine Schätzung des zeitlichen Fortschritts, kann aber manchmal falsch sein, d. h. einige späte Ereignisse können nach dem Wasserzeichen kommen, das versprochen hat, dass sie nicht kommen werden. Flink bietet Mechanismen, um mit späten Elementen umzugehen, wenn Wasserzeichen heuristisch sind.

Um ein Wasserzeichen festzulegen, wird oft Fachwissen verwendet. Wenn wir z. B. wissen, dass unsere Ereignisse zwar verspätet sind, aber nicht mehr als fünf Sekunden zu spät sein können, können wir ein Wasserzeichen mit dem größten Zeitstempel minus fünf Sekunden ausgeben. Oder ein anderer Flink-Auftrag überwacht den Stream und erstellt ein Modell zur Erzeugung von Wasserzeichen, indem er aus der Verspätung der eintreffenden Ereignisse lernt.

Hinweis

Wasserzeichen bieten einen (möglicherweise heuristischen) Mechanismus, um die Vollständigkeit unserer Eingaben in der Ereigniszeit anzugeben.

Wenn Wasserzeichen zu langsam sind, kann sich die Geschwindigkeit der Ausgabe verlangsamen, aber wir können das beheben, indem wir ungefähre Ergebnisse noch vor dem Wasserzeichen ausgeben (Flink bietet Mechanismen dafür). Wenn die Wasserzeichen zu schnell sind, erhalten wir vielleicht ein Ergebnis, das wir für richtig halten, das aber nicht stimmt, und das können wir mit den Flink-Mechanismen für späte Daten beheben. Wenn dir das alles kompliziert vorkommt, erinnere dich daran, dass die meisten Ereignisströme in der realen Welt ungeordnet sind und dass es (normalerweise) kein perfektes Wissen darüber gibt, wie ungeordnet sie sind. (Theoretisch müssten wir dafür in die Zukunft schauen.) Wasserzeichen sind der einzige Mechanismus, der uns dazu zwingt, mit ungeordneten Daten umzugehen und die Korrektheit unserer Ergebnisse zu begrenzen; die Alternative wäre, die Realität zu ignorieren und so zu tun, als ob unsere Ergebnisse korrekt wären, obwohl sie es nicht sind, ohne dass es irgendwelche Grenzen für ihre Korrektheit gäbe.

Ein Beispiel aus der Praxis: Die Kappa-Architektur bei Ericsson

Angeregt durch den Umfang der Daten, die ein typischer Ericsson-Betreiber verarbeiten muss (10 bis 100 Terabyte pro Tag oder 100.000 bis 1.000.000 Ereignisse pro Sekunde), hat ein Team bei Ericsson versucht, eine sogenannte "Kappa-Architektur" zu implementieren.2 Dieser Begriff wurde von Jay Kreps, einem der Schöpfer von Apache Kafka, in einem O'Reilly Radar-Artikel aus dem Jahr 2014 als Kritik an der sogenannten Lambda-Architektur geprägt (mit einem Augenzwinkern). Das ist nur ein anderer Name für genau die Streaming-Architektur, die wir in Kapitel 2 besprochen haben: Der Datenstrom ist das Herzstück des Designs; die Datenquellen sind unveränderlich; und ein Single-Stream-Analytics-Framework wie Apache Flink wird verwendet, um sowohl die frischen Daten als auch die historischen Daten per Stream-Replay zu verarbeiten.

Der Anwendungsfall ist die Echtzeitanalyse von Protokollen und Systemleistungsmetriken einer Live-Cloud-Infrastruktur, um kontinuierlich zu überwachen, ob sich die Cloud normal verhält oder ein "Novum" aufweist. Ein Novum kann entweder ein anormales Verhalten oder eine Änderung des Systemzustands sein, z. B. das Hinzufügen neuer virtueller Maschinen. Der Ansatz, den sie verfolgten, bestand darin, ein Bayes'sches Online-Lernmodell auf einen Strom verschiedener Metriken (Telemetrie- und Log-Ereignisse) eines Telco-Cloud-Überwachungssystems anzuwenden. Mit den Worten der Ericsson-Forscher Nicolas Seyvet und Ignacio Mulas Viela:

Die vorgeschlagene Methode erkennt Anomalien schnell und mit hoher Genauigkeit, während sie sich im Laufe der Zeit an neue Systemnormen anpasst (lernt), was sie zu einem wünschenswerten Werkzeug macht, um die mit der Betriebsfähigkeit großer Computerinfrastrukturen verbundenen Wartungskosten erheblich zu senken.

Die Datenpipeline, die das Ericsson-Team aufgebaut hat, ist in Abbildung 4-8 dargestellt.

Streaming architecture using Apache Flink at Ericsson.
Abbildung 4-8. Streaming-Architektur mit Apache Flink bei Ericsson.

Bei den Rohdaten, die an Kafka gesendet werden, handelt es sich um Telemetrie- und Log-Ereignisse von allen physischen und virtuellen Maschinen in der Cloud. Anschließend verarbeiten verschiedene Flink-Aufträge diese Daten und schreiben sie zurück in Kafka-Themen, von wo aus sie an Elastic Search und Kibana, einen Suchindex bzw. ein Visualisierungssystem, weitergeleitet werden. Diese Architektur ermöglicht es jedem Flink-Auftrag, eine genau definierte Aufgabe zu erfüllen, da die Ausgabe eines Auftrags als Eingabe für einen anderen verwendet werden kann. Die Pipeline zum Aufspüren von Anomalien in der Ausrüstung ist beispielsweise in Abbildung 4-9 dargestellt, wobei jeder Zwischenstrom ein Kafka-Topic ist (benannt nach den ihm zugewiesenen Daten) und jedes Rechteck ein Flink-Auftrag ist.

The data processing pipeline at Ericsson for anomaly detection uses Flink for the statistical extractor application and for anomaly detection.
Abbildung 4-9. Die Datenverarbeitungspipeline bei Ericsson für die Anomalieerkennung verwendet Flink für die statistische Extraktionsanwendung und für die Anomalieerkennung.

Warum ist die Unterstützung von Flink für die Ereigniszeit für diese Anwendung so wichtig? Dafür gibt es zwei Gründe:

  1. Anomalien richtig klassifizieren: Das Timing ist entscheidend, um eine Anomalie zu erkennen. Wenn zum Beispiel viele Logging-Ereignisse zur gleichen Zeit auftreten, ist das oft ein Anzeichen dafür, dass etwas nicht stimmt. Um diese Ereignisse richtig zu gruppieren und zu klassifizieren, ist es wichtig, den Zeitpunkt zu berücksichtigen, zu dem diese Ereignisse tatsächlich erzeugt wurden (und nicht den Zeitpunkt, zu dem wir sie in der Verarbeitungspipeline sehen).

  2. Verwendung der Stream-First-Architektur: Bei der Streaming-Architektur wird der Stream-Prozessor für alle Berechnungen verwendet. Der Weg, um Anwendungen weiterzuentwickeln, besteht darin, ihre Ausführung im Stream-Prozessor zu wiederholen. Wenn dieselben Daten zweimal durch eine Berechnung laufen, muss das gleiche Ergebnis erzielt werden, und das ist nur möglich, wenn mit Ereigniszeit gearbeitet wird.

1 Viele der Ideen in diesem Kapitel gehen auf die Arbeit des Google Dataflow-Teams (jetzt Apache Beam [incubating]) zurück, darunter Tyler Akidau, Frances Perry und andere. Die Artikel Streaming 101 und Streaming 102 von Tyler Akidau sind eine gute Lektüre, wenn du dich näher mit dem Dataflow-Modell befassen möchtest. Die Mechanismen von Flink für den Umgang mit Zeit und Fenstern basieren zu einem großen Teil auf den allgemeinen Konzepten des Dataflow Papers in der VLDB 2015.

2 Dieser Abschnitt basiert auf der Arbeit von Nicolas Seyvet und Ignacio Mulas Viela, die beim Flink Forward 2015 und bei der Strata/Hadoop World London 2016 vorgestellt wurde.

Get Einführung in Apache Flink now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.