Kapitel 4. Erweiterte Fensterung

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Hallo nochmal! Ich hoffe, dir hat Kapitel 3 genauso gut gefallen wie mir. Wasserzeichen sind ein faszinierendes Thema, und Slava kennt sie besser als jeder andere auf der Welt. Jetzt, wo wir ein tieferes Verständnis für Wasserzeichen haben, möchte ich auf einige fortgeschrittenere Themen eingehen, die mit den Fragen nach dem Was,Wo, Wann und Wie zu tun haben.

Zuerst schauen wir uns das Processing-Time-Windowing an, das eine interessante Mischung aus Wo und Wann ist, um besser zu verstehen, wie es mit dem Event-Time-Windowing zusammenhängt und ein Gefühl dafür zu bekommen, wann es tatsächlich der richtige Ansatz ist. Dann tauchen wir in einige fortgeschrittene Konzepte des Event-Time-Windowings ein, betrachten Sitzungsfenster im Detail und begründen schließlich, warum generalisiertes Custom-Windowing ein nützliches (und überraschend einfaches) Konzept ist, indem wir drei verschiedene Arten von Custom-Windows untersuchen: nicht ausgerichtete feste Fenster, feste Fenster pro Schlüssel und begrenzte Sitzungsfenster.

Wann/Wo: Bearbeitungszeit-Fenster

Zeitfenster für die Verarbeitung sind aus zwei Gründen wichtig:

  • Für bestimmte Anwendungsfälle, wie z. B. die Nutzungsüberwachung (z. B. QPS für den Webservice-Verkehr), bei denen du einen eingehenden Datenstrom analysieren willst, während er beobachtet wird, ist das Processing-Time-Windowing absolut der richtige Ansatz.

  • Für Anwendungsfälle, bei denen der Zeitpunkt des Geschehens wichtig ist (z. B. Analyse von Trends im Nutzerverhalten, Abrechnung, Scoring usw.), ist die Festlegung von Verarbeitungszeitfenstern der absolut falsche Ansatz, und es ist wichtig, diese Fälle zu erkennen.

Es lohnt sich also, die Unterschiede zwischen Processing-Time-Windowing und Event-Time-Windowing zu verstehen, vor allem, weil Processing-Time-Windowing heute in vielen Streaming-Systemen weit verbreitet ist.

Wenn du innerhalb eines Modells arbeitest, für das das Windowing als erstklassiger Begriff streng ereigniszeitbasiert ist, wie das in diesem Buch vorgestellte, gibt es zwei Methoden, die du verwenden kannst, um processing-time windowing zu erreichen:

Auslöser

Ignoriere die Zeit des Ereignisses (d.h. verwende ein globales Fenster, das sich über die gesamte Ereigniszeit erstreckt) und verwende Trigger, um Schnappschüsse dieses Fensters auf der Achse der Bearbeitungszeit zu erstellen.

Ingress-Zeit

Weise die Ingress-Zeiten als Ereigniszeiten für die Daten zu, wenn sie ankommen, und verwende von da an die normale Ereigniszeitfensterung. Das ist im Wesentlichen das, was Spark Streaming 1.x macht.

Beachte, dass die beiden Methoden mehr oder weniger gleichwertig sind, obwohl sie sich bei mehrstufigen Pipelines leicht unterscheiden: In der Trigger-Version teilt eine mehrstufige Pipeline die Verarbeitungszeit-"Fenster" in jeder Stufe unabhängig voneinander auf, so dass z. B. Daten, die in einer Stufe in Fenster N liegen, in der folgenden Stufe stattdessen in Fenster N-1oder N+1 landen können; Bei der Ingress-Version verbleiben die Daten, nachdem sie in Fenster N aufgenommen wurden, für die Dauer der Pipeline in Fenster N. Dies geschieht durch die Synchronisierung des Fortschritts zwischen den Phasen über Wasserzeichen (im Fall von Cloud Dataflow), Microbatch-Grenzen (im Fall von Spark Streaming) oder andere koordinierende Faktoren auf der Ebene der Engine.

Wie bereits erwähnt, besteht der große Nachteil von Verarbeitungszeitfenstern darin, dass sich der Inhalt der Fenster ändert, wenn sich die Beobachtungsreihenfolge der Eingänge ändert. Um das zu verdeutlichen, schauen wir uns diese drei Anwendungsfälle an: Ereignis-Zeit-Fenster, Verarbeitungszeit-Fenster über Trigger und Verarbeitungszeit-Fenster über Ingress-Zeit.

Jede wird auf zwei verschiedene Eingabesätze angewandt (also insgesamt sechs Varianten). Die beiden Eingabesätze beziehen sich auf genau dieselben Ereignisse (d. h. dieselben Werte, die zu denselben Zeitpunkten eintreten), aber mit unterschiedlicher Beobachtungsreihenfolge. Der erste Satz entspricht der Beobachtungsreihenfolge, die wir die ganze Zeit gesehen haben, und ist grau gefärbt; beim zweiten Satz sind alle Werte in der Achse der Verarbeitungszeit verschoben, wie in Abbildung 4-1, und sind lila gefärbt. Du kannst dir einfach vorstellen, dass das lilafarbene Beispiel eine andere Realität darstellt, wenn der Wind aus dem Osten statt aus dem Westen geweht hätte (d. h. wenn die zugrunde liegenden komplexen verteilten Systeme die Dinge in einer etwas anderen Reihenfolge gespielt hätten).

Event-Time Windowing

Um eine Grundlage zu schaffen, vergleichen wir zunächst die feste Fensterung in der Ereigniszeit mit einem heuristischen Wasserzeichen über diese beiden Beobachtungsreihenfolgen. Wir verwenden den Früh-/Spät-Code aus Beispiel 2-7/Abbildung2-10, um die in Abbildung 4-2 dargestellten Ergebnisse zu erhalten. Die linke Seite ist im Wesentlichen das, was wir zuvor gesehen haben; die rechte Seite zeigt die Ergebnisse für die zweite Beobachtungsreihenfolge. Wichtig ist, dass die Ergebnisse zwar unterschiedlich aussehen (aufgrund der unterschiedlichen Reihenfolge der Beobachtungen in der Verarbeitungszeit), die Endergebnisse für die vier Fenster aber gleich bleiben: 14, 18, 4 und 12.

Verarbeitungszeit-Fenster über Auslöser

Vergleichen wir dies nun mit den beiden soeben beschriebenen Bearbeitungszeitmethoden. Zuerst probieren wir die Trigger-Methode aus. Es gibt drei Aspekte, mit denen das "Windowing" der Verarbeitungszeit auf diese Weise funktioniert:

Fenstern

Wir verwenden das globale Ereignis-Zeit-Fenster, weil wir im Grunde genommen Verarbeitungszeit-Fenster mit Ereignis-Zeit-Fenstern emulieren.

Auslösen

Wir lösen in regelmäßigen Abständen im Bereich der Bearbeitungszeit aus, basierend auf der gewünschten Größe der Bearbeitungszeitfenster.

Akkumulation

Wir verwenden den Verwerfungsmodus, um die Fenster unabhängig voneinander zu halten, so dass jedes von ihnen wie ein unabhängiges "Fenster" zur Bearbeitungszeit wirkt.

Der entsprechende Code sieht in etwa so aus wie in Beispiel 4-1. Beachte, dass die globale Fensterung in Beam die Standardeinstellung ist, daher gibt es keine spezielle Überschreibung der Fensterungsstrategie.

Beispiel 4-1. Bearbeitungszeit-Fensterung über wiederholte, verwerfende Ausschnitte eines globalen Ereigniszeit-Fensters
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.triggering(Repeatedly(AlignedDelay(TWO_MINUTES)))
               .discardingFiredPanes())
  .apply(Sum.integersPerKey());

Bei der Ausführung auf einem Streaming-Runner gegen unsere beiden unterschiedlichen Reihenfolgen der Eingabedaten sehen die Ergebnisse wie in Abbildung 4-3 aus. Hier sind einige interessante Hinweise zu dieser Abbildung:

  • Da wir Verarbeitungszeitfenster über Ereigniszeitfenster emulieren, werden die "Fenster" auf der Verarbeitungszeitachse abgegrenzt, was bedeutet, dass ihre effektive Breite auf der y-Achse statt auf der x-Achse gemessen wird.

  • Da die Verarbeitungszeitfenster von der Reihenfolge abhängen, in der die Eingabedaten auftreten, unterscheiden sich die Ergebnisse für jedes der "Fenster" für jede der beiden Beobachtungsreihenfolgen, auch wenn die Ereignisse selbst technisch gesehen in jeder Version zu denselben Zeiten stattfanden. Auf der linken Seite erhalten wir 12, 18, 18, während wir auf der rechten Seite 7, 36, 5 erhalten.

Fensterung der Bearbeitungszeit über die Ingress-Zeit

Als Letztes wollen wir uns ansehen. Die Fensterung der Verarbeitungszeit wird erreicht, indem die Ereigniszeiten der Eingabedaten auf ihre Ingress-Zeiten abbildet. In Bezug auf den Code sind hier vier Aspekte erwähnenswert:

Zeitverschiebung

Wenn Elemente ankommen, müssen ihre Ereigniszeiten mit der Zeit des Ingress überschrieben werden. Wir können dies in Beam tun, indem wir eine neue DoFn bereitstellen, die den Zeitstempel des Elements über die Methode outputWithTimestamp auf die aktuelle Zeit setzt.

Fenstern

Kehre zur Standard-Ereigniszeit-Fensterung zurück.

Auslösen

Da die Ingress-Zeit die Möglichkeit bietet, ein perfektes Wasserzeichen zu berechnen, können wir den Standardauslöser verwenden, der in diesem Fall implizit genau einmal ausgelöst wird, wenn das Wasserzeichen das Ende des Fensters passiert.

Akkumulationsmodus

Da wir immer nur eine Ausgabe pro Fenster haben, ist der Akkumulationsmodus irrelevant.

Der eigentliche Code könnte also etwa so aussehen wie in Beispiel 4-2.

Beispiel 4-2. Fensterung der Bearbeitungszeit über wiederholte, verwerfende Fenster eines globalen Ereigniszeitfensters
PCollection<String> raw = IO.read().apply(ParDo.of(
  new DoFn<String, String>() {
    public void processElement(ProcessContext c) {
      c.outputWithTimestmap(new Instant());
    }
  });
PCollection<KV<Team, Integer>> input =
  raw.apply(ParDo.of(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.info(FixedWindows.of(TWO_MINUTES))
  .apply(Sum.integersPerKey());

Die Ausführung in einer Streaming-Engine würde wie in Abbildung 4-4 aussehen. Wenn Daten eintreffen, werden ihre Ereigniszeiten aktualisiert, damit sie mit den Ingress-Zeiten (d. h. den Verarbeitungszeiten bei der Ankunft) übereinstimmen, was zu einer horizontalen Verschiebung nach rechts auf die ideale Wasserzeichenlinie führt. Hier sind einige interessante Hinweise zu dieser Abbildung:

  • Wie bei dem anderen Beispiel mit den Verarbeitungszeitfenstern erhalten wir unterschiedliche Ergebnisse, wenn sich die Reihenfolge der Eingaben ändert, auch wenn die Werte und Ereigniszeiten für die Eingabe konstant bleiben.

  • Anders als im anderen Beispiel sind die Fenster wieder im Ereignis-Zeit-Bereich (und damit entlang der x-Achse) abgegrenzt. Trotzdem handelt es sich nicht um echte Ereignis-Zeit-Fenster. Wir haben einfach die Verarbeitungszeit auf die Ereignis-Zeit-Domäne übertragen, indem wir die ursprüngliche Aufzeichnung des Auftretens für jede Eingabe gelöscht und durch eine neue ersetzt haben, die stattdessen den Zeitpunkt angibt, zu dem das Datum zum ersten Mal von der Pipeline beobachtet wurde.

  • Trotzdem werden die Auslöser dank des Wasserzeichens immer noch genau zur gleichen Zeit ausgelöst wie im vorherigen Beispiel mit der Verarbeitungszeit. Außerdem sind die Ausgabewerte wie vorhergesagt identisch mit dem Beispiel: 12, 18, 18 auf der linken Seite und 7, 36, 5 auf der rechten Seite.

  • Da bei der Verwendung der Ingress-Zeit perfekte Wasserzeichen möglich sind, entspricht das tatsächliche Wasserzeichen dem idealen Wasserzeichen, das mit einer Steigung von eins nach oben und rechts ansteigt.

Obwohl es interessant ist, die verschiedenen Möglichkeiten zur Implementierung von Verarbeitungszeitfenstern zu sehen, ist die wichtigste Erkenntnis die, auf die ich seit dem ersten Kapitel hingewiesen habe: Ereignis-Zeit-Fenster sind unabhängig von der Reihenfolge, zumindest im Grenzbereich (die tatsächlichen Fenster auf dem Weg können sich unterscheiden, bis die Eingabe vollständig ist); Verarbeitungszeit-Fenster sind es nicht. Wenn du dich für die Zeitpunkte interessierst, zu denen deine Ereignisse tatsächlich eingetreten sind, musst du die Ereigniszeitfensterung verwenden, sonst sind deine Ergebnisse bedeutungslos. Ich werde jetzt von meiner Seifenkiste heruntersteigen.

Wo: Sitzung Fenster

Genug von den Verarbeitungszeitfenstern. Jetzt wir zurück zum bewährten Event-Time-Windowing, aber jetzt schauen wir uns eine meiner Lieblingsfunktionen an: die dynamischen, datengesteuerten Fenster namens Sitzungen.

Sitzungen sind ein spezieller Fenstertyp, der einen Aktivitätszeitraum in den Daten erfasst, der durch eine Lücke der Inaktivität beendet wird. Sie sind bei der Datenanalyse besonders nützlich, weil sie einen Überblick über die Aktivitäten eines bestimmten Nutzers über einen bestimmten Zeitraum hinweg geben, in dem er einer bestimmten Aktivität nachgegangen ist. So lassen sich die Aktivitäten innerhalb der Sitzung miteinander in Beziehung setzen und anhand der Länge der Sitzungen Rückschlüsse auf das Engagement ziehen usw.

Aus der Perspektive des Windowings sind die Sitzungen in zweierlei Hinsicht besonders interessant:

  • Sie sind ein Beispiel für ein datengesteuertes Fenster: Die Position und Größe der Fenster sind eine direkte Folge der Eingabedaten selbst und basieren nicht auf einem vordefinierten Muster innerhalb der Zeit, wie es bei festen und gleitenden Fenstern der Fall ist.

  • Sie sind auch ein Beispiel für ein nicht ausgerichtetes Fenster, d. h. ein Fenster, das nicht einheitlich für die Daten gilt, sondern nur für eine bestimmte Teilmenge der Daten (z. B. pro Nutzer). Dies steht im Gegensatz zu ausgerichteten Fenstern wie festen und gleitenden Fenstern, die in der Regel gleichmäßig auf die Daten angewendet werden.

In einigen Anwendungsfällen ist es möglich, die Daten innerhalb einer einzelnen Sitzung vorab mit einer gemeinsamen Kennung zu versehen (z. B. ein Videoplayer, der Heartbeat-Pings mit Quality-of-Service-Informationen sendet; für eine bestimmte Ansicht können alle Pings vorab mit einer einzigen Sitzungs-ID versehen werden). In diesem Fall ist es viel einfacher, Sitzungen zu erstellen, da es sich im Grunde nur um eine Art Gruppierung nach Schlüssel handelt.

Im allgemeineren Fall (d. h. wenn die eigentliche Sitzung nicht im Voraus bekannt ist) müssen die Sitzungen jedoch allein aus den Positionen der Daten in der Zeit konstruiert werden. Besonders knifflig wird es, wenn es sich um Daten handelt, die nicht in der richtigen Reihenfolge vorliegen.

Abbildung 4-5 zeigt ein Beispiel mit fünf unabhängigen Datensätzen, die in Sitzungsfenstern mit einem Zeitabstand von 60 Minuten gruppiert sind. Jeder Datensatz beginnt in einem eigenen 60-minütigen Fenster (einer Proto-Sitzung). Wenn du überlappende Proto-Sitzungen zusammenlegst, erhältst du zwei größere Sitzungsfenster mit drei bzw. zwei Datensätzen.

Abbildung 4-5. Nicht zusammengeführte Proto-Sitzungsfenster und die daraus resultierenden zusammengeführten Sitzungen

Die wichtigste Erkenntnis bei der Bereitstellung allgemeiner Sitzungsunterstützung ist, dass ein vollständiges Sitzungsfenster per Definition aus einer Reihe kleinerer, sich überlappender Fenster besteht, die jeweils einen einzelnen Datensatz enthalten, wobei jeder Datensatz in der Abfolge durch eine Inaktivitätslücke getrennt ist, die nicht größer ist als ein vordefiniertes Timeout. Selbst wenn wir die Daten in der Lerneinheit nicht in der richtigen Reihenfolge beobachten, können wir die endgültige Lerneinheit zusammenstellen, indem wir die sich überschneidenden Fenster für die einzelnen Daten zusammenfügen, sobald sie eintreffen.

Betrachte das Beispiel, das wir bisher verwendet haben, aus einem anderen Blickwinkel. Wenn wir ein Sitzungs-Timeout von einer Minute festlegen, würden wir erwarten, dass wir zwei Sitzungen in den Daten erkennen, die in Abbildung 4-6 durch die gestrichelten schwarzen Linien gekennzeichnet sind. Jede dieser Sitzungen fängt eine Reihe von Aktivitäten des Nutzers ein, wobei jedes Ereignis in der Sitzung weniger als eine Minute von mindestens einem anderen Ereignis in der Sitzung entfernt ist.

Abbildung 4-6. Sitzungen, die wir berechnen wollen

Um zu sehen, wie die Fensterzusammenführung funktioniert, um diese Sitzungen im Laufe der Zeit aufzubauen, wenn Ereignisse eintreten, sehen wir uns das in Aktion an. Wir nehmen den früh/spät Code mit aktivierten Rückzügen aus Beispiel 2-10 und aktualisieren die Fensterzusammenführung, um stattdessen Sitzungen mit einer einminütigen Lückenzeit aufzubauen. Beispiel 4-3 zeigt, wie das aussieht.

Beispiel 4-3. Frühzeitige/pünktliche/späte Abschüsse mit Sitzungsfenstern und Rückzügen
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(Sessions.withGapDuration(ONE_MINUTE))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AfterCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

Bei einer Streaming-Engine würdest du etwa das in Abbildung 4-7 gezeigte Ergebnis erhalten (beachte, dass ich die gestrichelten schwarzen Linien, die die erwarteten endgültigen Sitzungen markieren, als Referenz eingefügt habe).

Hier gibt es eine ganze Menge zu tun, also werde ich dir einiges davon erklären:

  • Wenn der erste Datensatz mit dem Wert 5 gefunden wird, wird er in ein einzelnes Proto-Sitzungsfenster eingefügt, das mit der Ereigniszeit dieses Datensatzes beginnt und sich über die gesamte Dauer der Sitzungslücke erstreckt, z. B. eine Minute über den Zeitpunkt hinaus, an dem dieses Datum aufgetreten ist. Alle Fenster, die wir in Zukunft finden und die sich mit diesem Fenster überschneiden, sollten Teil der gleichen Sitzung sein und werden als solche zusammengeführt.

  • Der zweite eintreffende Datensatz ist die 7, die ebenfalls in ein eigenes Proto-Sitzungsfenster gelegt wird, da sie sich nicht mit dem Fenster für die 5 überschneidet.

  • In der Zwischenzeit hat das Wasserzeichen das Ende des ersten Fensters überschritten, sodass der Wert 5 kurz vor 12:06 Uhr als pünktliches Ergebnis erscheint. Kurz darauf wird auch das zweite Fenster als spekulatives Ergebnis mit dem Wert 7 realisiert, genau dann, wenn die Bearbeitungszeit 12:06 Uhr erreicht.

  • Als Nächstes beobachten wir ein Paar von Datensätzen 3 und 4, deren Proto-Sitzungen sich überschneiden. Daher werden sie zusammengeführt und zu dem Zeitpunkt, an dem der frühe Trigger für 12:07 Uhr ausgelöst wird, wird ein einziges Fenster mit dem Wert 7 ausgegeben.

  • Wenn die 8 kurz darauf eintrifft, überschneidet sie sich mit den beiden Fenstern mit dem Wert 7. Alle drei werden also zusammengeführt und bilden eine neue kombinierte Sitzung mit dem Wert 22. Wenn das Wasserzeichen dann das Ende dieser Session passiert, materialisiert es sowohl die neue Session mit dem Wert 22 als auch die Rücknahmen für die beiden Fenster mit dem Wert 7, die zuvor ausgesendet, aber später in die Session integriert wurden.

  • Ein ähnlicher Tanz findet statt, wenn die 9 zu spät eintrifft und die Proto-Sitzung mit dem Wert 5 und die Sitzung mit dem Wert 22 zu einer einzigen größeren Sitzung mit dem Wert 36 vereint. Die 36 und die Rücknahmen für die Fenster 5 und 22 werden alle sofort durch den späten Datenauslöser gesendet.

Das ist eine ziemlich mächtige Sache. Und das Tolle daran ist, wie einfach es ist, so etwas in einem Modell zu beschreiben, das die Dimensionen der Stream-Verarbeitung in einzelne, zusammensetzbare Teile zerlegt. Am Ende kannst du dich mehr auf die interessante Geschäftslogik konzentrieren und weniger auf die Kleinigkeiten, mit denen die Daten in eine brauchbare Form gebracht werden.

Wenn du mir nicht glaubst, sieh dir diesen Blogbeitrag auf an, in dem beschrieben wird, wie man manuell Sessions auf Spark Streaming 1.x aufbaut (das soll kein Vorwurf sein; die Spark-Leute haben einfach gute Arbeit geleistet, so dass sich jemand die Mühe gemacht hat, zu dokumentieren, was nötig ist, um eine bestimmte Art von Session-Unterstützung auf Spark 1.x aufzubauen; das kann man von den meisten anderen Systemen da draußen nicht behaupten). Es ist ziemlich aufwändig, und es gibt noch nicht einmal richtige Event-Time-Sessions, spekulative oder späte Abschüsse oder Rückzüge.

Wo: Custom Windowing

Bis jetzt haben wir hauptsächlich über vordefinierte Fensterstrategien gesprochen: feste Fenster, Schiebefenster und Sitzungsfenster. Mit den Standardfenstertypen kannst du viel anfangen, aber es gibt viele Anwendungsfälle, bei denen die Möglichkeit, eine benutzerdefinierte Fensterstrategie zu definieren, die Rettung ist (drei davon werden wir jetzt sehen).

Die meisten heutigen Systeme unterstützen benutzerdefinierte Fenster nicht in dem Maße, wie es in Beam der Fall ist,1 Deshalb konzentrieren wir uns auf den Ansatz von Beam. In Beam besteht eine benutzerdefinierte Windowing-Strategie aus zwei Dingen:

Fensterzuweisung

Dadurch wird jedes Element von in ein Anfangsfenster gesetzt. An der Grenze kann so jedes Element in einem eigenen Fenster platziert werden, was sehr leistungsstark ist.

(Optional) Fenster zusammenführen

Dadurch können die Fenster bei der Gruppierung zusammengelegt werden, was es möglich macht, dass sich die Fenster im Laufe der Zeit entwickeln, was wir bereits bei den Sitzungsfenstern in Aktion gesehen haben.

Um dir ein Gefühl dafür zu geben, wie einfach Fensterstrategien wirklich sind und wie nützlich die Unterstützung von benutzerdefinierten Fenstern sein kann, werden wir uns die Standardimplementierungen von festen Fenstern und Sitzungen in Beam im Detail ansehen und dann ein paar reale Anwendungsfälle betrachten, die benutzerdefinierte Variationen dieser Themen erfordern. Dabei werden wir sehen, wie einfach es ist, eine benutzerdefinierte Fensterstrategie zu erstellen, und wie einschränkend die fehlende Unterstützung für benutzerdefinierte Fenster sein kann, wenn dein Anwendungsfall nicht ganz in die Standardansätze passt.

Variationen über feste Fenster

Schauen wir uns zunächst die relativ einfache Strategie der festen Fenster an. Die Standardimplementierung von festen Fenstern ist so einfach, wie du es dir vielleicht vorstellst, und besteht aus der folgenden Logik:

Aufgabe

Das Element wird anhand seines Zeitstempels und der Parameter für die Größe und den Versatz des Fensters in das entsprechende feste Fenster eingefügt.

Zusammenführung

Keine.

Eine abgekürzte Version des Codes sieht aus wie Beispiel 4-4.

Beispiel 4-4. Abgekürzte FixedWindows-Implementierung
public class FixedWindows extends WindowFn<Object, IntervalWindow> {
  private final Duration size;
  private final Duration offset;
  public Collection<IntervalWindow> assignWindow(AssignContext c) {
    long start = c.timestamp().getMillis() - c.timestamp()
                   .plus(size)
                   .minus(offset)
                   .getMillis() % size.getMillis();
    return Arrays.asList(IntervalWindow(new Instant(start), size));
  }
}

Denk daran, dass wir dir den Code hier nicht so sehr zeigen, um dir beizubringen, wie man Windowing-Strategien schreibt (obwohl es schön ist, sie zu entmystifizieren und zu zeigen, wie einfach sie sind). Vielmehr geht es darum, zu zeigen, wie einfach und wie schwierig es ist, einige relativ einfache Anwendungsfälle mit bzw. ohne benutzerdefiniertes Windowing zu unterstützen. Schauen wir uns jetzt zwei Anwendungsfälle an, die Variationen des Themas "feste Fenster" sind.

Nicht ausgerichtete feste Fenster

Ein Merkmal der Standardimplementierung von festen Fenstern, auf das wir bereits hingewiesen haben, ist, dass die Fenster über alle Daten hinweg angeglichen werden. In unserem Beispiel wird das Fenster von 12 bis 13 Uhr für ein bestimmtes Team an die entsprechenden Fenster für alle anderen Teams angepasst, die ebenfalls von 12 bis 13 Uhr reichen. In Anwendungsfällen, in denen du ähnliche Zeitfenster über eine andere Dimension hinweg vergleichen willst, z. B. zwischen Teams, ist dieser Abgleich sehr nützlich. Sie hat jedoch einen kleinen Nachteil. Alle aktiven Fenster von 12:00 bis 13:00 Uhr werden ungefähr zur gleichen Zeit fertig, was bedeutet, dass das System einmal pro Stunde mit einer riesigen Menge an Fenstern konfrontiert wird, die sich materialisieren müssen.

Um zu sehen, was ich meine, schauen wir uns ein konkretes Beispiel an(Beispiel 4-5). Wir beginnen mit einer Punktesummen-Pipeline, wie wir sie in den meisten Beispielen verwendet haben, mit festen Zwei-Minuten-Fenstern und einem einzigen Wasserzeichen-Auslöser.

Beispiel 4-5. Auslöser für die Vollständigkeit des Wasserzeichens (wie in Beispiel 2-6)
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(AfterWatermark()))
  .apply(Sum.integersPerKey());

In diesem Fall schauen wir uns jedoch zwei verschiedene Schlüssel (siehe Abbildung 4-8) aus demselben Datensatz parallel an. Wir werden sehen, dass die Ausgaben für diese beiden Schlüssel alle ausgerichtet sind, weil die Fenster in allen Schlüsseln ausgerichtet sind. Das führt dazu, dass jedes Mal, wenn das Wasserzeichen das Ende eines Fensters passiert, N Fenster materialisiert werden, wobei N die Anzahl der Schlüssel mit Aktualisierungen in diesem Fenster ist. In diesem Beispiel, in dem N gleich 2 ist, ist das vielleicht nicht allzu schmerzhaft. Aber wenn N in die Tausende, Millionen oder mehr geht, kann diese Synchronisierung problematisch werden.

In Situationen, in denen ein Vergleich zwischen den Fenstern nicht notwendig ist, ist es oft wünschenswert, die Last der Fensterabschlüsse gleichmäßig über die Zeit zu verteilen. Dadurch wird die Systemauslastung besser vorhersehbar, was die Anforderungen für die Bewältigung von Lastspitzen verringern kann. In den meisten Systemen sind nicht ausgerichtete feste Fenster jedoch nur verfügbar, wenn das System sie von Haus aus unterstützt.2 Mit der Unterstützung für benutzerdefinierte Fenster ist es jedoch eine relativ triviale Änderung der Standardimplementierung für feste Fenster, um die Unterstützung für nicht ausgerichtete feste Fenster bereitzustellen. Wir wollen weiterhin garantieren, dass die Fenster aller Elemente, die zusammen gruppiert werden (d.h. die mit demselben Schlüssel), dieselbe Ausrichtung haben, während wir die Ausrichtungsbeschränkung für verschiedene Schlüssel lockern. Der Code ändert sich zur Standardstrategie für feste Fenster und sieht in etwa so aus wie in Beispiel 4-6.

Beispiel 4-6. Abgekürzte UnalignedFixedWindows-Implementierung
public class UnalignedFixedWindows
    extends WindowFn<KV<K, V>, IntervalWindow> {
  private final Duration size;
  private final Duration offset;
  public Collection<IntervalWindow> assignWindow(AssignContext c) {
    long perKeyShift = hash(c.element().key()) % size.getMillis();
    long start = perKeyShift + c.timestamp().getMillis()
                   - c.timestamp()
                      .plus(size)
                      .minus(offset)
                      .getMillis() % size.getMillis();
    return Arrays.asList(IntervalWindow(new Instant(start), size));
  }
}

Mit dieser Änderung werden die Fenster für alle Elemente mit demselben Schlüssel ausgerichtet,3 aber die Fenster für Elemente mit unterschiedlichen Schlüsseln sind (normalerweise) nicht ausgerichtet. Dadurch wird die Last der Fenstervervollständigung verteilt, aber die Vergleiche zwischen den Schlüsseln werden etwas weniger aussagekräftig. Wir können unsere Pipeline auf unsere neue Fensterstrategie umstellen, wie in Beispiel 4-7 gezeigt.

Beispiel 4-7. Unausgerichtete feste Fenster mit einem einzelnen Wasserzeichenauslöser
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(UnalignedFixedWindows.of(TWO_MINUTES))
               .triggering(AfterWatermark()))
  .apply(Sum.integersPerKey());

Wie das aussieht, kannst du in Abbildung 4-9 sehen, indem du verschiedene Ausrichtungen mit festem Fenster über denselben Datensatz wie zuvor vergleichst (in diesem Fall habe ich eine maximale Phasenverschiebung zwischen den beiden Ausrichtungen gewählt, um die Vorteile am deutlichsten hervorzuheben, da zufällig gewählte Phasen über eine große Anzahl von Tasten zu ähnlichen Effekten führen werden).

Beachte, dass es keine Fälle gibt, in denen wir mehrere Fenster für mehrere Tasten gleichzeitig ausgeben. Stattdessen kommen die Scheiben einzeln und in einer viel gleichmäßigeren Kadenz an. Dies ist ein weiteres Beispiel dafür, dass man Kompromisse in einer Dimension (Vergleichbarkeit der Schlüssel) gegen Vorteile in einer anderen Dimension (geringere Anforderungen an die Bereitstellung von Spitzenressourcen) eintauschen kann, wenn es der Anwendungsfall zulässt. Diese Flexibilität ist entscheidend, wenn du große Datenmengen so effizient wie möglich verarbeiten willst.

Sehen wir uns nun eine zweite Variante der festen Fenster an, die stärker mit den zu verarbeitenden Daten verbunden ist.

Pro Element/Schlüssel fixierte Fenster

Unser zweites Beispiel stammt von einem der frühen Anwender von Cloud Dataflow. Dieses Unternehmen generiert Analysedaten für seine Kunden, aber jeder Kunde kann die Größe des Fensters konfigurieren, über das er seine Kennzahlen aggregieren möchte. Mit anderen Worten: Jeder Kunde kann die Größe seiner festen Fenster selbst bestimmen.

Die Unterstützung eines solchen Anwendungsfalls ist nicht allzu schwierig, solange die Anzahl der verfügbaren Fenstergrößen selbst festgelegt ist. Du könntest dir zum Beispiel vorstellen, dass du die Möglichkeit hast, zwischen 30-, 60- und 90-minütigen festen Fenstern zu wählen und dann für jede dieser Optionen eine eigene Pipeline (oder einen Fork der Pipeline) zu starten. Nicht ideal, aber auch nicht schlimm. Das wird jedoch schnell unpraktisch, wenn die Anzahl der Optionen zunimmt, und wenn es darum geht, wirklich beliebige Fenstergrößen zu unterstützen (wie es der Anwendungsfall dieses Kunden erforderte), ist es völlig unpraktisch.

Da jeder Datensatz, den der Kunde verarbeitet, bereits mit Metadaten versehen ist, die die gewünschte Fenstergröße für die Aggregation beschreiben, war es zum Glück so einfach, eine beliebige feste Fenstergröße pro Benutzer zu unterstützen, indem man ein paar Zeilen der Bestandsimplementierung für feste Fenster änderte, wie in Beispiel 4-8 gezeigt.

Beispiel 4-8. Geänderte (und verkürzte) FixedWindows-Implementierung, die Fenstergrößen pro Element unterstützt
public class PerElementFixedWindows<T extends HasWindowSize>
    extends WindowFn<T, IntervalWindow> {
  private final Duration offset;
  public Collection<IntervalWindow> assignWindow(AssignContext c) {
    long perElementSize = c.element().getWindowSize();
    long start = perKeyShift + c.timestamp().getMillis()
                   - c.timestamp()
                      .plus(size)
                      .minus(offset)
                      .getMillis() % size.getMillis();
    return Arrays.asList(IntervalWindow(
        new Instant(start), perElementSize));
  }
}

Mit dieser Änderung wird jedes Element einem festen Fenster mit der entsprechenden Größe zugewiesen, die durch die Metadaten im Element selbst bestimmt wird.4 Die Änderung des Pipeline-Codes zur Verwendung dieser neuen Strategie ist wiederum trivial, wie in Beispiel 4-9 gezeigt.

Beispiel 4-9. Feste Fenstergrößen pro Element mit einem einzigen Wasserzeichenauslöser
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(new PerElementFixedWindows())
               .triggering(AfterWatermark()))
  .apply(Sum.integersPerKey());

Wenn du dir diese Pipeline in Aktion ansiehst(Abbildung 4-10), kannst du leicht erkennen, dass die Elemente für Schlüssel A alle eine Fenstergröße von zwei Minuten haben, während die Elemente für Schlüssel B eine Fenstergröße von einer Minute haben.

Die Art und Weise, wie die Einstellungen für die Fenstergröße gespeichert werden, ist zu anwendungsspezifisch, als dass es Sinn machen würde, sie in eine Standard-API einzubauen. Wie die Bedürfnisse dieses Kunden zeigen, gibt es jedoch solche Anwendungsfälle. Deshalb ist die Flexibilität, die die benutzerdefinierten Fenster bieten, so groß.

Variationen über Sitzungsfenster

Um die Nützlichkeit von benutzerdefinierten Fenstern zu verdeutlichen, schauen wir uns ein letztes Beispiel an, das eine Variante von Sitzungen ist. Die Sitzungsfenster sind verständlicherweise etwas komplexer als feste Fenster. Die Implementierung besteht aus Folgendem:

Aufgabe

Jedes Element wird zunächst in ein Proto-Sitzungsfenster gestellt, das mit dem Zeitstempel des Elements beginnt und sich über die Dauer der Lücke erstreckt.

Zusammenführung

Bei der Gruppierung werden auf alle in Frage kommenden Fenster sortiert und anschließend alle sich überschneidenden Fenster zusammengeführt.

Eine verkürzte Version des Sitzungscodes (von Hand aus einer Reihe von Hilfsklassen zusammengefügt) sieht etwa so aus wie in Beispiel 4-10 gezeigt.

Beispiel 4-10. Abgekürzte Implementierung von Sitzungen
public class Sessions extends WindowFn<Object, IntervalWindow> {
  private final Duration gapDuration;
  public Collection<IntervalWindow> assignWindows(AssignContext c) {
    return Arrays.asList(
      new IntervalWindow(c.timestamp(), gapDuration));
  }
  public void mergeWindows(MergeContext c) throws Exception {
    List<IntervalWindow> sortedWindows = new ArrayList<>();
    for (IntervalWindow window : c.windows()) {
      sortedWindows.add(window);
    }
    Collections.sort(sortedWindows);
    List<MergeCandidate> merges = new ArrayList<>();
    MergeCandidate current = new MergeCandidate();
    for (IntervalWindow window : sortedWindows) {
      if (current.intersects(window)) {
        current.add(window);
      } else {
        merges.add(current);
        current = new MergeCandidate(window);
      }
    }
    merges.add(current);
    for (MergeCandidate merge : merges) {
      merge.apply(c);
    }
  }
}

Wie zuvor geht es bei diesem Code nicht darum, dir zu zeigen, wie benutzerdefinierte Windowing-Funktionen implementiert werden oder wie die Implementierung von Sitzungen aussieht, sondern vielmehr darum, wie einfach du neue Anwendungen durch benutzerdefiniertes Windowing unterstützen kannst.

Begrenzte Sitzungen

Ein solcher Anwendungsfall, auf den ich schon mehrfach gestoßen bin, sind begrenzte Sitzungen: Sitzungen, die nicht über eine bestimmte Größe hinaus wachsen dürfen, sei es in Bezug auf die Zeit, die Anzahl der Elemente oder eine andere Dimension. Das kann semantische Gründe haben oder einfach eine Übung zum Schutz vor Spam sein. Angesichts der unterschiedlichen Arten von Begrenzungen (bei manchen Anwendungsfällen geht es um die Gesamtgröße der Sitzung in der Ereigniszeit, bei anderen um die Gesamtzahl der Elemente, bei wieder anderen um die Elementdichte usw.) ist es jedoch schwierig, eine saubere und übersichtliche API für begrenzte Sitzungen bereitzustellen. Viel praktischer ist es, den Nutzern die Möglichkeit zu geben, ihre eigene, auf den jeweiligen Anwendungsfall zugeschnittene Windowing-Logik zu implementieren. Ein Beispiel für einen solchen Anwendungsfall, bei dem die Sitzungsfenster zeitlich begrenzt sind, könnte etwa so aussehen wie in Beispiel 4-11 (wobei wir einige der hier verwendeten Standardformulierungen auslassen).

Beispiel 4-11. Implementierung der verkürzten Sitzungen
public class BoundedSessions extends WindowFn<Object, IntervalWindow> {
  private final Duration gapDuration;
  private final Duration maxSize;
  public Collection<IntervalWindow> assignWindows(AssignContext c) {
    return Arrays.asList(
      new IntervalWindow(c.timestamp(), gapDuration));
  }
  private Duration windowSize(IntervalWindow window) {
    return window == null
      ? new Duration(0)
      : new Duration(window.start(), window.end());
  }
  public void mergeWindows(MergeContext c) throws Exception {
    List<IntervalWindow> sortedWindows = new ArrayList<>();
    for (IntervalWindow window : c.windows()) {
      sortedWindows.add(window);
    }
    Collections.sort(sortedWindows);
    List<MergeCandidate> merges = new ArrayList<>();
    MergeCandidate current = new MergeCandidate();
    for (IntervalWindow window : sortedWindows) {
      MergeCandidate next = new MergeCandidate(window);
      if (current.intersects(window)) {
        current.add(window);
        if (windowSize(current.union) <= (maxSize - gapDuration))
          continue;
        // Current window exceeds bounds, so flush and move to next
        next = new MergeCandidate();
      }
      merges.add(current);
      current = next;
    }
    merges.add(current);
    for (MergeCandidate merge : merges) {
      merge.apply(c);
    }
  }
}

Wie immer ist es trivial, unsere Pipeline (in diesem Fall die Early/On-Time/Late-Version aus Beispiel 4-3) zu aktualisieren, um diese benutzerdefinierte Windowing-Strategie zu verwenden, wie du in Beispiel 4-12 sehen kannst.

Beispiel 4-12. Vorzeitige, pünktliche und verspätete Entlassungen über die Early/On-Time/Late API
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(BoundedSessions
                       .withGapDuration(ONE_MINUTE)
                       .withMaxSize(THREE_MINUTES))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AfterCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

Ausgeführt über unser laufendes Beispiel könnte es dann so aussehen wie in Abbildung 4-11.

Beachte, wie die große Sitzung mit dem Wert 36, die sich über [12:00.26, 12:05.20), also fast fünf Minuten, erstreckte, in der Implementierung der unbegrenzten Sitzungen in Abbildung 4-7 nun in zwei kürzere Sitzungen von 2 Minuten und 2 Minuten 53 Sekunden Länge aufgeteilt wird.

Wenn man bedenkt, wie wenige Systeme heute benutzerdefiniertes Windowing unterstützen, sollte man sich vor Augen führen, wie viel mehr Aufwand es bedeuten würde, so etwas mit einem System zu implementieren, das nur unbegrenzte Sitzungen unterstützt. Die einzige Möglichkeit wäre, einen Code zu schreiben, der der Sitzungsgruppierungslogik nachgelagert ist und die erzeugten Sitzungen betrachtet und sie zerschneidet, wenn sie die Längenbegrenzung überschreiten. Dies würde die Fähigkeit voraussetzen, eine Sitzung im Nachhinein zu zerlegen, was die Vorteile der inkrementellen Aggregation (auf die wir in Kapitel 7 näher eingehen) zunichte machen und die Kosten erhöhen würde. Auch die Vorteile des Spamschutzes, die man sich durch die Begrenzung der Sitzungslänge erhofft, würden dadurch zunichte gemacht, da die Sitzungen erst auf ihre volle Größe anwachsen müssten, bevor sie zerlegt oder gekürzt werden könnten.

Eine Größe passt nicht allen

Wir haben uns nun drei reale Anwendungsfälle angesehen, von denen jeder eine subtile Variation der gängigen Arten von Fenstern war, die typischerweise von Datenverarbeitungssystemen bereitgestellt werden: unausgerichtete feste Fenster, feste Fenster pro Element und begrenzte Sitzungen. In allen drei Fällen haben wir gesehen, wie einfach es ist, diese Anwendungsfälle mit benutzerdefinierten Fenstern zu unterstützen, und wie viel schwieriger (oder teurer) es wäre, diese Anwendungsfälle ohne sie zu unterstützen. Obwohl Custom Windowing in der Branche noch nicht weit verbreitet ist, bietet diese Funktion die nötige Flexibilität, um Kompromisse bei der Entwicklung von Datenverarbeitungspipelines zu finden, die komplexe, reale Anwendungsfälle mit großen Datenmengen so effizient wie möglich verarbeiten müssen.

Zusammenfassung

Fortgeschrittenes Windowing ist ein komplexes und vielfältiges Thema. In diesem Kapitel haben wir drei fortgeschrittene Konzepte behandelt:

Bearbeitungszeitfenster

Wir haben uns angeschaut, wie das mit dem Ereignis-Zeit-Fenster zusammenhängt. Wir haben die Stellen aufgezeigt, an denen es von Natur aus nützlich ist, und vor allem die Stellen identifiziert, an denen es nicht nützlich ist, indem wir die Stabilität der Ergebnisse, die uns das Ereignis-Zeit-Fenster bietet, besonders hervorgehoben haben.

Sitzungsfenster

Wir hatten unsere erste Einführung in die dynamische Klasse der zusammenführenden Fensterstrategien und und konnten sehen, wie viel Arbeit das System für uns erledigt, indem es ein so mächtiges Konstrukt bereitstellt, das man einfach an Ort und Stelle fallen lassen kann.

Benutzerdefinierte Fenster

Hier haben wir uns drei Beispiele aus der Praxis angeschaut, die in Systemen, die nur eine statische Auswahl an Fensterstrategien bieten, schwer oder gar nicht zu realisieren sind, aber in einem System mit Unterstützung für benutzerdefinierte Fenster relativ einfach zu implementieren sind:

  • Unausgerichtete feste Fenster, die eine gleichmäßigere Verteilung der Ergebnisse über die Zeit ermöglichen, wenn ein Wasserzeichen-Trigger in Verbindung mit festen Fenstern verwendet wird.

  • Feste Fenster pro Element, die die Flexibilität bieten, die Größe fester Fenster pro Element dynamisch zu wählen (z. B. um anpassbare Fenstergrößen pro Nutzer oder pro Kampagne zu ermöglichen), um die Semantik der Pipeline besser an den jeweiligen Anwendungsfall anzupassen.

  • Begrenzte Sitzungsfenster, die begrenzen, wie groß eine bestimmte Sitzung werden darf, z. B. um Spam-Versuchen entgegenzuwirken oder um die Latenzzeit für abgeschlossene Sitzungen, die von der Pipeline materialisiert werden, zu begrenzen.

Nachdem wir in Kapitel 3 mit Slava tief in die Materie der Wasserzeichen eingetaucht sind und hier einen umfassenden Überblick über fortgeschrittenes Windowing gegeben haben, sind wir nun weit über die Grundlagen der robusten Stream-Verarbeitung in mehreren Dimensionen hinausgekommen. Damit schließen wir unseren Fokus auf das Beam Model ab.

Als Nächstes folgt Reuvens Kapitel 5 über Konsistenzgarantien, Exakt-einmal-Verarbeitung und Seiteneffekte. Danach beginnen wir unsere Reise in Teil II, Streams und Tabellen, mit Kapitel 6.

1 Soweit ich weiß, ist Apache Flink das einzige andere System, das benutzerdefinierte Fenster in dem Maße unterstützt wie Beam. Und um fair zu sein, geht die Unterstützung sogar über die von Beam hinaus, da es einen benutzerdefinierten Fenster-Evictor bereitstellt. Kopf einziehen.

2 Und mir sind derzeit keine solchen Systeme bekannt.

3 Das setzt natürlich die Verwendung von verschlüsselten Daten voraus, aber da das Windowing ohnehin mit der Gruppierung nach Schlüsseln verbunden ist, ist diese Einschränkung nicht besonders lästig.

4 Und es ist nicht entscheidend, dass das Element selbst die Fenstergröße kennt. Du kannst genauso gut die passende Fenstergröße für die gewünschte Dimension nachschlagen und zwischenspeichern, zum Beispiel pro Benutzer.

Get Streaming-Systeme now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.