Kapitel 4. Kafka-Konsumenten: Daten aus Kafka lesen

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Anwendungen, die Daten aus Kafka lesen müssen, verwenden eine KafkaConsumer, um Kafka-Themen zu abonnieren und Nachrichten aus diesen Themen zu empfangen. Das Lesen von Daten aus Kafka unterscheidet sich ein wenig vom Lesen von Daten aus anderen Messaging-Systemen und es gibt ein paar einzigartige Konzepte und Ideen. Es kann schwierig sein, die Verwendung der Consumer API zu verstehen, ohne diese Konzepte zu kennen. Wir werden zunächst einige wichtige Konzepte erläutern und dann einige Beispiele durchgehen, die zeigen, wie die Consumer APIs für die Implementierung von Anwendungen mit unterschiedlichen Anforderungen genutzt werden können.

Kafka-Verbraucher-Konzepte

Um zu verstehen, wie du Daten aus Kafka lesen kannst, musst du zuerst die Konsumenten und Konsumentengruppen verstehen. Die folgenden Abschnitte behandeln diese Konzepte.

Verbraucher und Verbrauchergruppen

Angenommen, du hast eine Anwendung, die Nachrichten aus einem Kafka-Topic lesen, einige Validierungen gegen sie durchführen und die Ergebnisse in einen anderen Datenspeicher schreiben muss. In diesem Fall erstellt deine Anwendung ein Consumer-Objekt, abonniert das entsprechende Topic und beginnt, Nachrichten zu empfangen, sie zu validieren und die Ergebnisse zu schreiben. Das mag eine Zeit lang gut funktionieren, aber was ist, wenn die Geschwindigkeit, mit der die Produzenten Nachrichten in das Topic schreiben, die Geschwindigkeit übersteigt, mit der deine Anwendung sie validieren kann? Wenn du dich darauf beschränkst, dass nur ein einziger Konsument die Daten liest und verarbeitet, kann es passieren, dass deine Anwendung immer weiter ins Hintertreffen gerät und mit der Menge der eingehenden Nachrichten nicht mehr mithalten kann. Natürlich ist es notwendig, den Konsum von Topics zu skalieren. Genauso wie mehrere Produzenten in dasselbe Topic schreiben können, müssen wir mehreren Konsumenten erlauben, aus demselben Topic zu lesen und die Daten unter ihnen aufzuteilen.

Kafka-Konsumenten sind normalerweise Teil einer Konsumentengruppe. Wenn mehrere Kafka-Konsumenten ein Thema abonniert haben und zur gleichen Kafka-Gruppe gehören, erhält jeder Kafka-Konsument in der Gruppe Nachrichten aus einer anderen Teilmenge der Partitionen des Themas.

Nehmen wir das Thema T1 mit vier Partitionen. Nehmen wir an, wir haben einen neuen Verbraucher, C1, der der einzige Verbraucher in der Gruppe G1 ist, und abonnieren damit das Thema T1. Der Verbraucher C1 wird alle Nachrichten von allen vier T1-Partitionen erhalten. Siehe Abbildung 4-1.

kdg2 0401
Abbildung 4-1. Eine Verbrauchergruppe mit vier Partitionen

Wenn wir einen weiteren Verbraucher, C2, zur Gruppe G1 hinzufügen, erhält jeder Verbraucher nur Nachrichten aus zwei Partitionen. Vielleicht gehen die Nachrichten aus den Partitionen 0 und 2 an C1 und die Nachrichten aus den Partitionen 1 und 3 an den Verbraucher C2. Siehe Abbildung 4-2.

kdg2 0402
Abbildung 4-2. Vier Partitionen aufgeteilt auf zwei Verbraucher in einer Gruppe

Wenn G1 vier Verbraucher hat, dann liest jeder von ihnen Nachrichten von einer einzigen Partition. Siehe Abbildung 4-3.

kdg2 0403
Abbildung 4-3. Vier Verbraucher in einer Gruppe mit je einer Partition

Wenn wir einer Gruppe mit einem einzigen Thema mehr Verbraucher hinzufügen, als wir Partitionen haben, werden einige der Verbraucher im Leerlauf sein und überhaupt keine Nachrichten erhalten. Siehe Abbildung 4-4.

kdg2 0404
Abbildung 4-4. Mehr Verbraucher in einer Gruppe als Partitionen bedeuten untätige Verbraucher

Die Hauptmethode zur Skalierung des Datenverbrauchs eines Kafka-Themas ist das Hinzufügen weiterer Verbraucher zu einer Verbrauchergruppe. Es ist üblich, dass Kafka-Konsumenten Operationen mit hoher Latenz durchführen, z. B. das Schreiben in eine Datenbank oder eine zeitaufwändige Berechnung der Daten. In diesen Fällen kann ein einzelner Konsument unmöglich mit dem Datenfluss in ein Topic mithalten. Unsere Hauptmethode zur Skalierung ist das Hinzufügen weiterer Konsumenten, die sich die Last teilen, indem jeder Konsument nur eine Teilmenge der Partitionen und Nachrichten besitzt. Das ist ein guter Grund, Topics mit einer großen Anzahl von Partitionen zu erstellen - so können mehr Verbraucher hinzugefügt werden, wenn die Last steigt. Denk daran, dass es keinen Sinn macht, mehr Verbraucher hinzuzufügen, als du Partitionen in einem Thema hast - einige der Verbraucher werden einfach untätig sein. In Kapitel 2 findest du einige Vorschläge, wie du die Anzahl der Partitionen in einem Thema festlegen kannst.

Neben dem Hinzufügen von Verbrauchern, um eine einzelne Anwendung zu skalieren, ist es sehr üblich, dass mehrere Anwendungen Daten aus demselben Topic lesen müssen. Eines der Hauptziele bei der Entwicklung von Kafka war es, die in Kafka-Topics erzeugten Daten für viele Anwendungsfälle im gesamten Unternehmen verfügbar zu machen. In diesen Fällen wollen wir, dass jede Anwendung alle Nachrichten erhält und nicht nur eine Teilmenge. Um sicherzustellen, dass eine Anwendung alle Nachrichten in einem Topic erhält, muss die Anwendung eine eigene Consumer Group haben. Anders als viele herkömmliche Nachrichtensysteme lässt sich Kafka auf eine große Anzahl von Verbrauchern und Verbrauchergruppen skalieren, ohne die Leistung zu beeinträchtigen.

Wenn wir im vorherigen Beispiel eine neue Verbrauchergruppe (G2) mit einem einzigen Verbraucher hinzufügen, erhält dieser Verbraucher alle Nachrichten im Thema T1, unabhängig davon, was G1 tut. G2 kann mehr als einen Verbraucher haben. In diesem Fall erhält jeder von ihnen eine Teilmenge der Partitionen, genau wie wir es für G1 gezeigt haben, aber G2 als Ganzes erhält weiterhin alle Nachrichten, unabhängig von anderen Verbrauchergruppen. Siehe Abbildung 4-5.

kdg2 0405
Abbildung 4-5. Hinzufügen einer neuen Verbrauchergruppe, beide Gruppen erhalten alle Nachrichten

Zusammenfassend lässt sich sagen, dass du für jede Anwendung, die alle Nachrichten aus einem oder mehreren Themen benötigt, eine neue Verbrauchergruppe erstellst. Du fügst Verbraucher zu einer bestehenden Verbrauchergruppe hinzu, um das Lesen und Verarbeiten der Nachrichten aus den Themen zu skalieren, sodass jeder zusätzliche Verbraucher in einer Gruppe nur eine Teilmenge der Nachrichten erhält.

Verbrauchergruppen und Partition Rebalance

Wie wir im vorigen Abschnitt gesehen haben, teilen sich die Verbraucher in einer Verbrauchergruppe das Eigentum an den Partitionen in den Themen, die sie abonnieren. Wenn wir einen neuen Verbraucher in die Gruppe aufnehmen, beginnt er, Nachrichten aus Partitionen zu konsumieren, die zuvor von einem anderen Verbraucher abonniert wurden. Das Gleiche passiert, wenn ein Verbraucher herunterfährt oder abstürzt: Er verlässt die Gruppe, und die Partitionen, die er zuvor konsumiert hat, werden von einem der verbleibenden Verbraucher konsumiert. Eine Neuzuweisung von Fächern an Verbraucher erfolgt auch, wenn die Themen, die die Verbrauchergruppe konsumiert, geändert werden (z. B. wenn ein Administrator neue Fächer hinzufügt).

Das Verschieben von Partitionseigentum von einem Verbraucher zu einem anderen wird als Rebalance bezeichnet. Rebalances sind wichtig, weil sie der Verbrauchergruppe eine hohe Verfügbarkeit und Skalierbarkeit bieten (sie ermöglichen es uns, einfach und sicher Verbraucher hinzuzufügen und zu entfernen), aber im normalen Verlauf können sie ziemlich unerwünscht sein.

Es gibt zwei Arten von Rebalances, je nachdem, welche Strategie die Verbrauchergruppe für die Partitionszuweisung verwendet:1

Eifrige Neugewichtung

Bei einem Eager Rebalance hören alle Verbraucher auf zu konsumieren, geben ihr Eigentum an allen Partitionen auf, schließen sich der Verbrauchergruppe wieder an und erhalten eine neue Partitionszuweisung. Dies ist im Wesentlichen ein kurzes Zeitfenster, in dem die gesamte Verbrauchergruppe nicht verfügbar ist. Die Länge des Fensters hängt von der Größe der Verbrauchergruppe sowie von verschiedenen Konfigurationsparametern ab. Abbildung 4-6 zeigt, wie Eager Rebalances in zwei Phasen ablaufen: Zuerst geben alle Verbraucher ihre Partitionszuweisung auf, und nachdem sie dies abgeschlossen haben und der Gruppe wieder beigetreten sind, erhalten sie neue Partitionszuweisungen und können den Konsum wieder aufnehmen.

kdg2 0406
Abbildung 4-6. Eager Rebalance widerruft alle Partitionen, pausiert den Verbrauch und ordnet sie neu zu
Kooperative Rebalances

Beim kooperativen Rebalancing (auch inkrementelles Rebalancing genannt) wird in der Regel nur eine kleine Teilmenge der Partitionen von einem Verbraucher auf einen anderen übertragen und den Verbrauchern erlaubt, weiterhin Datensätze aus allen Partitionen zu verarbeiten, die nicht neu übertragen werden. Dies wird durch eine Neuverteilung in zwei oder mehr Phasen erreicht. In der ersten Phase informiert der Leiter der Verbrauchergruppe alle Verbraucher darüber, dass sie das Eigentum an einer Teilmenge ihrer Partitionen verlieren werden. In der zweiten Phase weist der Leiter der Verbrauchergruppe die verwaisten Partitionen den neuen Eigentümern zu. Dieser inkrementelle Ansatz kann einige Iterationen erfordern, bis eine stabile Zuweisung der Partitionen erreicht ist, aber er vermeidet die komplette "Stop-the-World"-Unverfügbarkeit, die beim eifrigen Ansatz auftritt. Das ist besonders wichtig bei großen Verbrauchergruppen, bei denen eine Neuverteilung viel Zeit in Anspruch nehmen kann. Abbildung 4-7 zeigt, dass kooperative Rebalances inkrementell sind und dass nur eine Teilmenge der Verbraucher und Partitionen betroffen ist.

kdg2 0407
Abbildung 4-7. Kooperatives Rebalancing unterbricht den Verbrauch nur für die Teilmenge der Partitionen, die neu zugewiesen werden

Die Verbraucher halten die Mitgliedschaft in einer Verbrauchergruppe und das Eigentum an den ihnen zugewiesenen Partitionen aufrecht, indem sie Heartbeats an einen Kafka-Broker senden, der als Gruppenkoordinator bezeichnet wird (dieser Broker kann für verschiedene Verbrauchergruppen unterschiedlich sein). Die Heartbeats werden von einem Hintergrund-Thread des Verbrauchers gesendet. Solange der Verbraucher in regelmäßigen Abständen Heartbeats sendet, wird davon ausgegangen, dass er am Leben ist.

Wenn der Verbraucher lange genug keine Heartbeats mehr sendet, kommt es zu einer Zeitüberschreitung und der Gruppenkoordinator hält ihn für tot und löst einen Rebalance aus. Wenn ein Verbraucher abgestürzt ist und keine Nachrichten mehr verarbeitet, braucht der Gruppenkoordinator ein paar Sekunden ohne Heartbeats, um ihn für tot zu erklären und den Rebalance auszulösen. Während dieser Sekunden werden keine Nachrichten von den Partitionen des toten Konsumenten verarbeitet. Wenn ein Verbraucher sauber geschlossen wird, teilt er dem Gruppenkoordinator mit, dass er die Gruppe verlässt, und der Gruppenkoordinator löst sofort einen Rebalance aus. Später in diesem Kapitel werden wir die Konfigurationsoptionen besprechen, die die Heartbeat-Häufigkeit, die Sitzungszeitüberschreitungen und andere Konfigurationsparameter steuern, mit denen das Verhalten der Verbraucher fein abgestimmt werden kann.

Wie funktioniert der Prozess der Zuweisung von Partitionen an Verbraucher?

Wenn ein Verbraucher einer Gruppe beitreten möchte, sendet er eine JoinGroup Anfrage an den Gruppenkoordinator. Der erste Verbraucher, der der Gruppe beitritt, wird der Gruppenleiter. Der Gruppenleiter erhält vom Gruppenkoordinator eine Liste aller Verbraucher in der Gruppe (diese enthält alle Verbraucher, die kürzlich einen Heartbeat gesendet haben und daher als lebendig gelten) und ist dafür verantwortlich, jedem Verbraucher eine Teilmenge von Partitionen zuzuweisen. Er verwendet eine Implementierung vonPartitionAssignor um zu entscheiden, welche Partitionen von welchem Verbraucher bearbeitet werden sollen.

Kafka verfügt über einige eingebaute Partitionszuweisungsrichtlinien, die wir im Abschnitt über die Konfiguration näher erläutern werden. Nach der Entscheidung über die Partitionszuweisung sendet der Anführer der Verbrauchergruppe die Liste der Zuweisungen an GroupCoordinator, die diese Informationen an alle Verbraucher weiterleitet. Jeder Verbraucher sieht nur seine eigene Zuweisung - der Gruppenleiter ist der einzige Client-Prozess, der die vollständige Liste der Verbraucher in der Gruppe und ihre Zuweisungen kennt. Dieser Vorgang wiederholt sich jedes Mal, wenn ein Rebalancing stattfindet.

Statische Gruppenmitgliedschaft

Standardmäßig ist die Identität eines Verbrauchers als Mitglied seiner Verbrauchergruppe vorübergehend. Wenn ein Verbraucher eine Verbrauchergruppe verlässt, werden ihm die ihm zugewiesenen Partitionen entzogen. Wenn er wieder beitritt, erhält er über das Rebalance-Protokoll eine neue Mitglieds-ID und einen neuen Satz von Partitionen.

All dies gilt, es sei denn, du konfigurierst einen Verbraucher mit einer eindeutigen group.instance.id, wodurch der Verbraucher zu einem statischen Mitglied der Gruppe wird. Wenn ein Verbraucher zum ersten Mal einer Verbrauchergruppe als statisches Mitglied der Gruppe beitritt, wird ihm wie üblich eine Reihe von Partitionen gemäß der Partitionszuweisungsstrategie zugewiesen, die die Gruppe verwendet. Wenn dieser Verbraucher herunterfährt, verlässt er dieGruppe jedoch nicht automatisch, sondern bleibt Mitglied der Gruppe, bis seine Sitzung beendet ist. Wenn der Verbraucher der Gruppe wieder beitritt, wird er mit seiner statischen Identität erkannt und bekommt dieselben Partitionen zugewiesen, die er zuvor innehatte, ohne dass ein Rebalancing ausgelöst wird. Der Gruppenkoordinator, der die Zuweisung für jedes Mitglied der Gruppe im Cache speichert, muss keinen Rebalancing-Vorgang auslösen, sondern kann die Cache-Zuweisung einfach an das neue statische Mitglied senden.

Wenn zwei Verbraucher der gleichen Gruppe mit der gleichen group.instance.id beitreten, erhält der zweite Verbraucher die Fehlermeldung, dass bereits ein Verbraucher mit dieser ID existiert.

Eine statische Gruppenzugehörigkeit ist nützlich, wenn deine Anwendung einen lokalen Status oder Cache unterhält, der mit den Partitionen gefüllt ist, die den einzelnen Verbrauchern zugewiesen sind. Wenn die Neuerstellung dieses Caches zeitaufwändig ist, möchtest du nicht, dass dieser Prozess bei jedem Neustart eines Verbrauchers stattfindet. Auf der anderen Seite ist es wichtig, sich daran zu erinnern, dass diePartitionen, die den einzelnen Verbrauchern gehören, nicht neu zugewiesen werden, wenn ein Verbraucher neu gestartet wird. Für eine gewisse Zeit wird kein Verbraucher Nachrichten aus diesen Partitionen konsumieren, und wenn der Verbraucher schließlich wieder startet, wird er hinter den neuesten Nachrichten in diesen Partitionen zurückbleiben. Du solltest sicher sein, dass der Verbraucher, dem diese Partitionen gehören, den Rückstand nach dem Neustart aufholen kann.

Es ist wichtig zu beachten, dass statische Mitglieder von Verbrauchergruppen die Gruppe nicht proaktiv verlassen, wenn sie heruntergefahren werden, und die Erkennung, wann sie "wirklich weg" sind, hängt von der session.timeout.ms Konfiguration ab. Du solltest den Wert hoch genug ansetzen, um zu vermeiden, dass bei einem einfachen Anwendungsneustart ein Rebalancing ausgelöst wird, aber niedrig genug, um eine automatische Neuzuweisung ihrer Partitionen zu ermöglichen, wenn es zu einer größeren Ausfallzeit kommt, um große Lücken in der Verarbeitung dieser Partitionen zu vermeiden.

Einen Kafka-Konsumenten erstellen

Der erste Schritt, um Datensätze zu konsumieren, besteht darin, eine KafkaConsumer Instanz zu erstellen. Die Erstellung einer KafkaConsumer ist der Erstellung einer KafkaProducersehr ähnlich - du erstellst eine Java Properties Instanz mit den Eigenschaften, die du an den Konsumenten weitergeben willst. Wir werden alle Eigenschaften später in diesem Kapitel ausführlich besprechen. Zu Beginn müssen wir nurdie drei obligatorischen Eigenschaften verwenden: bootstrap.servers, key.deserializer, und value.deserializer.

Die erste Eigenschaft, bootstrap.servers, ist der Verbindungsstring zu einem Kafka-Cluster. Sie wird genauso verwendet wie die Eigenschaft KafkaProducer (siehe Kapitel 3 für weitere Informationenzur Definition dieser Eigenschaft). Die beiden anderen Eigenschaften, key.deserializer und value.​dese⁠rial⁠izer, sind ähnlich wie die für den Producer definierten serializers, aber anstatt Klassen anzugeben, die Java-Objekte in Byte-Arrays umwandeln, musst du Klassen angeben, die ein Byte-Array nehmen und in ein Java-Objekt umwandeln können.

Es gibt eine vierte Eigenschaft, die nicht zwingend erforderlich ist, aber sehr häufig verwendet wird. Die Eigenschaft lautet group.id und gibt die Verbrauchergruppe an, zu der die KafkaConsumer Instanz zugehört. Es ist zwar möglich, Verbraucher zu erstellen, die keiner Verbrauchergruppe angehören, aber das ist unüblich, so dass wir für den Großteil des Kapitels davon ausgehen, dass der Verbraucher Teil einer Gruppe ist.

Der folgende Codeschnipsel zeigt, wie man eine KafkaConsumer erstellt:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer =
    new KafkaConsumer<String, String>(props);

Das meiste, was du hier siehst, sollte dir vertraut sein, wenn du Kapitel 3 über die Erstellung von Produzenten gelesen hast. Wir gehen davon aus, dass die Datensätze, die wir konsumieren, String Objekte als Schlüssel und Wert des Datensatzes haben werden. Die einzige neue Eigenschaft hier ist group.id, der Name der Verbrauchergruppe, zu der dieser Verbraucher gehört.

Abonnieren von Themen

Sobald wir einen Verbraucher erstellt haben, besteht der nächste Schritt darin, ein oder mehrere Themen zu abonnieren. Die Methode subscribe() nimmt eine Liste von Themen als Parameter an und ist daher ziemlich einfach zu verwenden:

consumer.subscribe(Collections.singletonList("customerCountries")); 1
1

Hier erstellen wir einfach eine Liste mit einem einzigen Element: dem Namen des Themas customerCountries.

Es ist auch möglich, subscribe mit einem regulären Ausdruck aufzurufen. Der Ausdruck kann mit mehreren Themennamen übereinstimmen. Wenn jemand ein neues Thema mit einem übereinstimmenden Namen anlegt, findet fast sofort ein Neuabgleich statt und die Verbraucher beginnen, aus dem neuen Thema zu konsumieren. Das ist nützlich für Anwendungen, die von mehreren Themen konsumieren müssen und mit den verschiedenen Datentypen umgehen können, die die Themen enthalten. Das Abonnieren mehrerer Topics mithilfe eines regulären Ausdrucks wird am häufigsten in Anwendungen verwendet, die Daten zwischen Kafka und einem anderen System oder Streams verarbeitenden Anwendungen replizieren.

Um zum Beispiel alle Testthemen zu abonnieren, können wir aufrufen:

consumer.subscribe(Pattern.compile("test.*"));
Warnung

Wenn dein Kafka-Cluster eine große Anzahl von Partitionen hat, vielleicht 30.000 oder mehr, solltest du wissen, dass die Filterung der Themen für das Abonnement auf der Client-Seite erfolgt. Das heißt, wenn du eine Teilmenge von Themen über einen regulären Ausdruck und nicht über eine explizite Liste abonnierst, fordert der Verbraucher in regelmäßigen Abständen die Liste aller Themen und ihrer Partitionen vom Broker an. Anhand dieser Liste erkennt der Kunde dann neue Themen, die er in sein Abonnement aufnehmen sollte, und abonniert sie. Wenn die Themenliste sehr umfangreich ist und es viele Kunden gibt, ist die Liste der Themen und Partitionen sehr groß, und das Abonnement mit regulären Ausdrücken bedeutet einen erheblichen Overhead für den Broker, den Kunden und das Netzwerk. Es gibt Fälle, in denen die Bandbreite, die von den Themen-Metadaten beansprucht wird, größer ist als die Bandbreite, die zum Senden der Daten verwendet wird. Das bedeutet auch, dass der Kunde, um einen regulären Ausdruck zu abonnieren, die Berechtigung benötigt, alle Topics im Cluster zu beschreiben, d. h. eine vollständige describe Berechtigung für den gesamten Cluster.

Die Abstimmungsschleife

Das Herzstück der Consumer-API ist eine einfache Schleife, mit der du den Server nach weiteren Daten abfragst. Der Hauptteil eines Verbrauchers sieht wie folgt aus:

Duration timeout = Duration.ofMillis(100);

while (true) { 1
    ConsumerRecords<String, String> records = consumer.poll(timeout); 2

    for (ConsumerRecord<String, String> record : records) { 3
        System.out.printf("topic = %s, partition = %d, offset = %d, " +
                        "customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
                record.key(), record.value());
        int updatedCount = 1;
        if (custCountryMap.containsKey(record.value())) {
            updatedCount = custCountryMap.get(record.value()) + 1;
        }
        custCountryMap.put(record.value(), updatedCount);

        JSONObject json = new JSONObject(custCountryMap);
        System.out.println(json.toString()); 4
    }
}
1

Dies ist tatsächlich eine Endlosschleife. Verbraucher sind in der Regel langlaufende Anwendungen, die Kafka ständig nach weiteren Daten abfragen. Wir werden später im Kapitel zeigen, wie man die Schleife sauber beendet und den Konsumenten schließt.

2

Dies ist die wichtigste Zeile in diesem Kapitel. Genauso wie Haie sich bewegen müssen, um nicht zu sterben, müssen die Konsumenten Kafka ständig abfragen, sonst gelten sie als tot und die Partitionen, die sie konsumieren, werden an einen anderen Konsumenten in der Gruppe übergeben, der sie weiter konsumiert. Der Parameter, den wir an poll() übergeben, ist ein Timeout-Intervall und legt fest, wie lange poll() blockiert wird, wenn keine Daten im Puffer des Konsumenten verfügbar sind. Wenn er auf 0 gesetzt ist oder wenn bereits Datensätze verfügbar sind, kehrt poll() sofort zurück; andernfalls wartet es die angegebene Anzahl von Millisekunden.

3

poll() gibt eine Liste von Datensätzen zurück. Jeder Datensatz enthält das Thema und die Partition, aus der der Datensatz stammt, den Offset des Datensatzes innerhalb der Partition und natürlich den Schlüssel und den Wert des Datensatzes. Normalerweise wollen wir über die Liste iterieren und die Datensätze einzeln verarbeiten.

4

Die Verarbeitung endet normalerweise damit, dass ein Ergebnis in einen Datenspeicher geschrieben oder ein gespeicherter Datensatz aktualisiert wird. Hier geht es darum, eine laufende Zählung der Kunden aus jedem Land zu erhalten, also aktualisieren wir eine Hashtabelle und geben das Ergebnis als JSON aus. Ein realistischeres Beispiel würde das Ergebnis der Aktualisierung in einem Datenspeicher speichern.

Die poll Schleife macht viel mehr, als nur Daten zu holen. Wenn du poll() zum ersten Mal mit einem neuen Verbraucher aufrufst, ist sie dafür verantwortlich, die GroupCoordinator zu finden, der Verbrauchergruppe beizutreten und eine Partitionszuweisung zu erhalten. Wenn ein Rebalance ausgelöst wird, wird dies ebenfalls in der Poll-Schleife behandelt, einschließlich der zugehörigen Callbacks. Das bedeutet, dass fast alles, was bei einem Verbraucher oder bei den in seinen Hörern verwendeten Callbacks schief gehen kann, wahrscheinlich als eine von poll() ausgelöste Ausnahme auftaucht.

Wenn poll() länger als max.poll.interval.ms nicht aufgerufen wird, gilt der Verbraucher als tot und wird aus der Verbrauchergruppe entfernt. Vermeide also alles, was die Poll-Schleife für unvorhersehbare Zeiträume blockieren könnte.

Gewinde Sicherheit

Du kannst nicht mehrere Verbraucher in einem Thread haben, die zur gleichen Gruppe gehören, und du kannst nicht mehrere Threads haben, die sicher den gleichen Verbraucher benutzen. Ein Verbraucher pro Thread ist die Regel. Um mehrere Verbraucher derselben Gruppe in einer Anwendung auszuführen, musst du jeden in einem eigenen Thread laufen lassen. Es ist sinnvoll, die Verbraucherlogik in ein eigenes Objekt zu verpacken und dann Javas ExecutorService zu verwenden, um mehrere Threads zu starten, jeder mit seinem eigenen Verbraucher. Im Confluent-Blog gibt es ein Tutorial, das zeigt, wie das geht.

Warnung

In älteren Versionen von Kafka lautete die vollständige Methodensignatur poll(long); diese Signatur ist jetzt veraltet und die neue API lautet poll(Duration). Neben der Änderung des Argumenttyps hat sich auch die Semantik der Methodenblockierung subtil geändert. Die ursprüngliche Methode poll(long) blockiert so lange, wie sie braucht, um die benötigten Metadaten von Kafka zu erhalten, auch wenn dies länger ist als die Timeout-Dauer. Die neue Methode, poll(Duration), hält sich an die Timeout-Beschränkungen und wartet nicht auf Metadaten. Wenn du einen bestehenden Consumer-Code hast, der poll(0) als Methode verwendet, um Kafka zu zwingen, die Metadaten zu holen, ohne irgendwelche Datensätze zu konsumieren (ein ziemlich üblicher Hack), kannst du ihn nicht einfach in poll(Duration.ofMillis(0)) ändern und das gleiche Verhalten erwarten. Du musst dir einen neuen Weg überlegen, um dein Ziel zu erreichen. Oft besteht die Lösung darin, die Logik in der Methode rebalanceListener.onPartitionAssignment() zu platzieren, die garantiert aufgerufen wird, nachdem du Metadaten für die zugewiesenen Partitionen hast, aber bevor die Datensätze ankommen. Eine andere Lösung wurde von Jesse Anderson in seinem Blogbeitrag "Kafka's Got a Brand-New Poll" dokumentiert .

Ein anderer Ansatz besteht darin, dass ein Verbraucher eine Warteschlange mit Ereignissen füllt und mehrere Worker-Threads die Arbeit aus dieser Warteschlange ausführen. Ein Beispiel für dieses Muster findest du in einem Blogbeitrag von Igor Buzatović.

Konfigurieren von Verbrauchern

Bisher haben wir uns auf das Erlernen der Consumer-API konzentriert, aber haben wir uns nur ein paar der Konfigurationseigenschaften angesehen - nur die obligatorischen bootstrap.servers, group.id, key.deserializer und value.deserializer. Die gesamte Consumer-Konfiguration ist in der Apache Kafka-Dokumentation dokumentiert. Die meisten Parameter haben vernünftige Standardwerte und müssen nicht geändert werden, aber einige haben Auswirkungen auf die Leistung und Verfügbarkeit der Verbraucher. Werfen wir einen Blick auf einige der wichtigsten Eigenschaften.

fetch.min.bytes

Mit dieser Eigenschaft kann ein Verbraucher die Mindestmenge an Daten angeben, die er vom Broker erhalten möchte, wenn er Datensätze abruft, standardmäßig ein Byte. Wenn ein Broker eine Anfrage für Datensätze von einem Verbraucher erhält, die neuen Datensätze aber weniger Bytes umfassen als fetch.min.bytes, wartet der Broker, bis mehr Nachrichten verfügbar sind, bevor er die Datensätze an den Verbraucher zurückschickt. Dadurch werden sowohl der Verbraucher als auch der Broker entlastet, da sie weniger Hin- und Her-Nachrichten verarbeiten müssen, wenn die Themen nicht viel neue Aktivität aufweisen (oder in den Stunden mit geringerer Aktivität am Tag). Du solltest diesen Parameter höher als den Standardwert setzen, wenn der Verbraucher zu viel CPU-Leistung beansprucht, wenn nicht viele Daten verfügbar sind, oder die Belastung des Brokers verringern, wenn du eine große Anzahl von Verbrauchern hast.

fetch.max.wait.ms

Mit der Einstellung fetch.min.bytes sagst du Kafka, dass es warten soll, bis es genug Daten zum Senden hat, bevor es dem Verbraucher antwortet. fetch.max.wait.ms lässt dich kontrollieren, wie lange es warten soll. In der Standardeinstellung wartet Kafka bis zu 500 ms. Dies führt zu einer zusätzlichen Latenz von bis zu 500 ms, falls nicht genügend Daten in das Kafka-Topic fließen, um die Mindestmenge an Daten zu liefern. Wenn du die potenzielle Latenz einschränken möchtest (in der Regel aufgrund von SLAs, die die maximale Latenz der Anwendung regeln), kannst du fetch.max.wait.ms auf einen niedrigeren Wert setzen. Wenn du fetch.max.wait.ms auf 100 ms und fetch.min.bytes auf 1 MB setzt, erhält Kafka eine Fetch-Anfrage vom Konsumenten und antwortet mit Daten, wenn es entweder 1 MB Daten zurückgeben kann oder nach 100 ms, je nachdem, was zuerst eintritt.

fetch.max.bytes

Mit dieser Eigenschaft kannst du die maximalen Bytes angeben, die Kafka zurückgibt, wenn der Verbraucher einen Broker abfragt (standardmäßig 50 MB). Sie wird verwendet, um die Größe des Speichers zu begrenzen, den der Verbraucher zum Speichern der vom Server zurückgegebenen Daten verwendet, unabhängig davon, wie viele Partitionen oder Nachrichten zurückgegeben wurden. Beachte, dass die Datensätze in Stapeln an den Kunden gesendet werden. Wenn der erste Datensatz-Stapel, den der Broker senden muss, diese Größe überschreitet, wird der Stapel gesendet und das Limit ignoriert. So wird sichergestellt, dass der Verbraucher weiterhin Fortschritte machen kann. Es ist erwähnenswert, dass es eine passende Broker-Konfiguration gibt, mit der der Kafka-Administrator auch die maximale Abrufgröße begrenzen kann. Die Maklerkonfiguration kann nützlich sein, da Anfragen nach großen Datenmengen zu großen Lesevorgängen von der Festplatte und langen Sendevorgängen über das Netzwerk führen können, was zu Konflikten führen und die Belastung des Maklers erhöhen kann.

max.poll.records

Diese Eigenschaft bestimmt die maximale Anzahl der Datensätze, die ein einzelner Aufruf von poll() zurückgibt. Damit kannst du die Datenmenge (aber nicht die Größe der Daten) steuern, die deine Anwendung in einer Iteration der Abrufschleife verarbeiten muss.

max.partition.fetch.bytes

Diese Eigenschaft bestimmt die maximale Anzahl von Bytes, die der Server pro Partition zurückgibt (standardmäßig 1 MB). Wenn KafkaConsumer.poll() ConsumerRecords zurückgibt, verwendet das Datensatzobjekt höchstens max.partition.fetch.bytes pro Partition, die dem Verbraucher zugewiesen ist. Beachte, dass die Steuerung der Speichernutzung mit dieser Konfiguration ziemlich komplex sein kann, da du keine Kontrolle darüber hast, wie viele Partitionen in der Broker-Antwort enthalten sind. Wir empfehlen daher dringend, stattdessen fetch.max.bytes zu verwenden, es sei denn, du hast besondere Gründe, ähnliche Datenmengen von jeder Partition zu verarbeiten.

session.timeout.ms und heartbeat.interval.ms

Die Zeitspanne, in der ein Verbraucher keinen Kontakt zu den Brokern hat und trotzdem als lebendig gilt, beträgt standardmäßig 10 Sekunden. Wenn mehr als session.timeout.ms vergeht, ohne dass der Verbraucher einen Heartbeat an den Gruppenkoordinator sendet, gilt er als tot und der Gruppenkoordinator löst eine Neuverteilung der Verbrauchergruppe aus, um die Partitionen des toten Verbrauchers den anderen Verbrauchern in der Gruppe zuzuweisen. Diese Eigenschaft steht in engem Zusammenhang mit der Eigenschaft heartbeat.interval.ms, die festlegt, wie oft der Kafka-Konsument einen Heartbeat an den Gruppenkoordinator sendet, während ses⁠sion.timeout.ms festlegt, wie lange ein Konsument keinen Heartbeat senden darf. Daher werden diese beiden Eigenschaften in der Regel gemeinsam geändert -heartbeat.​interval.ms muss niedriger sein als session.timeout.ms und wird normalerweise auf ein Drittel des Timeout-Wertes gesetzt. Wenn session.timeout.ms also 3 Sekunden beträgt, sollte heartbeat.​inter⁠val.ms auf 1 Sekunde eingestellt werden. Wenn du session.timeout.ms niedriger als den Standardwert einstellst, können die Verbrauchergruppen einen Ausfall früher erkennen und sich davon erholen, aber es kann auch zu unerwünschten Rebalancen kommen. Wenn du session.timeout.ms höher einstellst, verringert sich die Wahrscheinlichkeit eines ungewollten Ausgleichs, aber es dauert auch länger, bis ein echterAusfall erkannt wird.

max.abfrage.intervall.ms

Mit dieser Eigenschaft kannst du die Zeitspanne festlegen, die ein Verbraucher ohne Abfrage vergehen kann, bevor er als tot gilt. Wie bereits erwähnt, sind Heartbeats und Sitzungs-Timeouts die wichtigsten Mechanismen, mit denen Kafka tote Verbraucher erkennt und ihre Partitionen entfernt. Wir haben jedoch auch erwähnt, dass die Heartbeats von einem Hintergrund-Thread gesendet werden. Es besteht die Möglichkeit, dass der Haupt-Thread, der Kafka konsumiert, blockiert ist, der Hintergrund-Thread aber weiterhin Heartbeats sendet. Das bedeutet, dass Datensätze aus Partitionen, die diesem Konsumenten gehören, nicht verarbeitet werden. Der einfachste Weg, um festzustellen, ob der Konsument noch Datensätze verarbeitet, ist zu prüfen, ob er weitere Datensätze anfordert. Die Intervalle zwischen den Anfragen nach weiteren Datensätzen sind jedoch schwer vorherzusagen und hängen von der Menge der verfügbaren Daten, der Art der Verarbeitung durch den Verbraucher und manchmal auch von der Latenzzeit zusätzlicher Dienste ab. In Anwendungen, die jeden zurückgegebenen Datensatz zeitaufwändig verarbeiten müssen, wird max.poll.records verwendet, um die Menge der zurückgegebenen Daten zu begrenzen und somit die Dauer zu begrenzen, bis die Anwendung wieder für poll() verfügbar ist. Selbst wenn max.poll.records definiert ist, lässt sich das Intervall zwischen den Aufrufen von poll() nur schwer vorhersagen, und max.poll.interval.ms wird als Fail-Safe oder Backstop verwendet. Das Intervall muss so groß sein, dass es nur sehr selten von einem gesunden Verbraucher erreicht wird, aber niedrig genug, um erhebliche Auswirkungen durch einen hängenden Verbraucher zu vermeiden. Der Standardwert ist 5 Minuten. Wenn die Zeitüberschreitung erreicht ist, sendet der Hintergrund-Thread eine Anfrage zum Verlassen der Gruppe, um dem Broker mitzuteilen, dass der Verbraucher tot ist und die Gruppe wieder ins Gleichgewicht kommen muss, und hört dann auf, Heartbeats zu senden.

default.api.timeout.ms

Dies ist die Zeitüberschreitung, die für (fast) alle API -Aufrufe des Verbrauchers gilt, wenn du beim Aufruf der API keine explizite Zeitüberschreitung angibst. Der Standardwert ist 1 Minute, und da er höher ist als der Standardwert für das Anfrage-Timeout, wird bei Bedarf ein erneuter Versuch unternommen. Die bemerkenswerte Ausnahme von APIs, die diesen Standard verwenden, ist die Methode poll(), die immer eine explizite Zeitüberschreitung erfordert.

request.timeout.ms

Dies ist die maximale Zeitspanne, die der Verbraucher auf eine Antwort des Brokers warten wird. Wenn der Broker innerhalb dieser Zeit nicht antwortet, geht der Kunde davon aus, dass der Broker überhaupt nicht antwortet, schließt die Verbindung und versucht, sich erneut zu verbinden. Der Standardwert für diese Konfiguration ist 30 Sekunden, und es wird empfohlen, ihn nicht zu verringern. Es ist wichtig, dem Broker genug Zeit zu lassen, um die Anfrage zu bearbeiten, bevor er aufgibt - es bringt wenig, Anfragen an einen bereits überlasteten Broker zu senden, und das Trennen und Wiederherstellen der Verbindung führt zu noch mehr Overhead.

auto.offset.reset

Diese Eigenschaft steuert das Verhalten des Verbrauchers, wenn er mit dem Lesen einer Partition beginnt, für die er keinen Offset hat, oder wenn der Offset, den er hat, ungültig ist (in der Regel, weil der Verbraucher so lange außer Betrieb war, dass der Datensatz mit diesem Offset bereits aus dem Broker veraltet ist). Die Standardeinstellung ist "latest", was bedeutet, dass der Verbraucher in Ermangelung eines gültigen Offsets mit dem Lesen der neuesten Datensätze beginnt (Datensätze, die geschrieben wurden, nachdem der Verbraucher gestartet wurde). Die Alternative ist "earliest", was bedeutet, dass der Verbraucher ohne einen gültigen Offset alle Daten in der Partition von Anfang an lesen wird. Wenn du auto.offset.reset auf none setzt, wird eine Ausnahme ausgelöst, wenn du versuchst, mit einem ungültigen Offset zu konsumieren.

enable.auto.commit

Dieser Parameter legt fest, ob der Verbraucher die Versätze automatisch festlegt und ist standardmäßig auf true eingestellt. Setze ihn auf false, wenn du kontrollieren möchtest, wann die Versätze festgelegt werden, was notwendig ist, um Duplikate zu minimieren und fehlende Daten zu vermeiden. Wenn du enable.auto.commit auf true setzt, kannst du mit auto.commit.interval.ms auch festlegen, wie oft die Versätze übertragen werden. Wir werden die verschiedenen Optionen für das Übertragen von Offsets später in diesem Kapitel ausführlicher besprechen.

partition.zuweisung.strategie

Wir haben gelernt, dass Partitionen den Verbrauchern in einer Verbrauchergruppe zugewiesen werden. A PartitionAssignor ist eine Klasse, die anhand der Verbraucher und der Themen, die sie abonniert haben, entscheidet, welche Partitionen welchem Verbraucher zugewiesen werden. Standardmäßig hat Kafka die folgenden Zuweisungsstrategien:

Reichweite

Weist jedem Verbraucher eine fortlaufende Teilmenge von Partitionen aus jedem Thema zu, das er abonniert hat. Wenn also die Verbraucher C1 und C2 die beiden Themen T1 und T2 abonniert haben und jedes der Themen drei Partitionen hat, werden C1 die Partitionen 0 und 1 aus den Themen T1 und T2 zugewiesen, während C2 die Partition 2 aus diesen Themen erhält. Da jedes Thema eine ungerade Anzahl von Partitionen hat und die Zuweisung für jedes Thema unabhängig voneinander erfolgt, hat der erste Verbraucher am Ende mehr Partitionen als der zweite. Das passiert immer dann, wenn die Range-Zuweisung verwendet wird und die Anzahl der Verbraucher die Anzahl der Partitionen in jedem Thema nicht sauber aufteilt.

RoundRobin

Nimmt alle Partitionen aus allen abonnierten Themen und weist sie den Verbrauchern der Reihe nach zu, eine nach der anderen. Wenn C1 und C2 die zuvor beschriebene RoundRobin-Zuweisung verwenden würden, hätte C1 die Partitionen 0 und 2 aus dem Thema T1 und die Partition 1 aus dem Thema T2. C2 würde die Partition 1 aus dem Thema T1 und die Partitionen 0 und 2 aus dem Thema T2 erhalten. Wenn alle Verbraucher die gleichen Themen abonniert haben (ein sehr häufiges Szenario), führt die RoundRobin-Zuweisung dazu, dass alle Verbraucher die gleiche Anzahl von Partitionen haben (oder höchstens eine Partitionsdifferenz).

Klebrig

Der Sticky Assignor verfolgt zwei Ziele: Erstens soll die Zuweisung so ausgewogen wie möglich sein, und zweitens lässt er im Falle einer Neuverteilung so viele Zuweisungen wie möglich bestehen und minimiert so den Aufwand, der mit dem Verschieben von Teilungszuweisungen von einem Verbraucher zum anderen verbunden ist. Wenn alle Verbraucher dasselbe Thema abonniert haben, ist die erste Zuweisung des Sticky Assignors genauso ausgeglichen wie die des RoundRobin Assignors. Spätere Zuweisungen sind genauso ausgewogen, verringern aber die Anzahl der Partitionsbewegungen. In Fällen, in denen Verbraucher in derselben Gruppe verschiedene Themen abonnieren, ist die Zuweisung durch den Sticky Assignor ausgeglichener als die des RoundRobin Assignors.

Kooperativ klebrig

Diese Zuweisungsstrategie ist identisch mit der des Sticky Assignor, unterstützt aber kooperative Rebalances, bei denen die Verbraucher weiterhin von den Partitionen konsumieren können, die nicht neu zugewiesen wurden. Unter "Verbrauchergruppen und Partitionsausgleich" erfährst du mehr über den kooperativen Ausgleich. Wenn du von einer älteren Version als 2.3 aktualisierst, musst du einen bestimmten Upgrade-Pfad befolgen, um die kooperative Sticky-Zuweisungsstrategie zu aktivieren, also achte besonders auf die Upgrade-Anleitung.

Unter partition.assignment.strategy kannst du eine Strategie für die Partitionszuweisung auswählen. Die Standardeinstellung ist org.apache.kafka.clients.consumer.RangeAssignor, die die oben beschriebene Bereichsstrategie implementiert. Du kannst sie durch org.apache.kafka.clients.consumer.RoundRobinAssignor, org.apache.kafka.​clients.consumer.StickyAssignor oder org.apache.kafka.clients.consumer.​CooperativeStickyAssignor ersetzen. Eine fortgeschrittenere Option ist es, deine eigene Zuweisungsstrategie zu implementieren. partition.assignment.strategy sollte auf den Namen deiner Klasse verweisen.

client.id

Dies kann eine beliebige Zeichenkette sein und wird von den Brokern verwendet, um vom Client gesendete Anfragen zu identifizieren, z. B. Fetch-Anfragen. Sie wird in der Protokollierung und in den Metriken sowie für Quoten verwendet.

client.rack

Standardmäßig rufen die Verbraucher Nachrichten vom führenden Replikat jeder Partition ab. Wenn sich der Cluster jedoch über mehrere Rechenzentren oder Cloud-Verfügbarkeitszonen erstreckt, bietet es sowohl Leistungs- als auch Kostenvorteile, wenn die Nachrichten von einem Replikat abgerufen werden, das sich in derselben Zone wie der Verbraucher befindet. Um den Abruf vom nächstgelegenen Replikat zu ermöglichen, musst du die Konfiguration client.rack einstellen und die Zone bestimmen, in der sich der Client befindet. Dann kannst du die Broker so konfigurieren, dass sie den Standard replica.selector.class durch org.apache.kafka.common.replica.RackAwareReplicaSelector ersetzen.

Du kannst auch deine eigene replica.selector.class mit einer benutzerdefinierten Logik für die Auswahl des besten Replikats implementieren, das du auf der Grundlage von Client- und Partitions-Metadaten konsumieren möchtest.

group.instance.id

Dies kann eine beliebige eindeutige Zeichenfolge sein und wird verwendet, um einem Verbraucher eine statische Gruppenzugehörigkeit zu geben.

receive.buffer.bytes und send.buffer.bytes

Dies sind die Größen der TCP-Sende- und Empfangspuffer, die von den Sockets beim Schreiben und Lesen von Daten verwendet werden. Wenn diese Werte auf -1 gesetzt sind, werden die Standardwerte des Betriebssystems verwendet. Es kann sinnvoll sein, diese Werte zu erhöhen, wenn Produzenten oder Konsumenten mit Brokern in einem anderen Rechenzentrum kommunizieren, da diese Netzwerkverbindungen in der Regel eine höhere Latenz und eine geringere Bandbreite haben.

Verrechnungen.aufbewahrung.minuten

Dies ist eine Broker-Konfiguration, aber es ist wichtig, sie zu kennen, da sie sich auf das Verhalten der Konsumenten auswirkt. Solange eine Verbrauchergruppe aktive Mitglieder hat (d.h. Mitglieder, die ihre Mitgliedschaft in der Gruppe aktiv aufrechterhalten, indem sie Heartbeats senden), wird der letzte von der Gruppe für jede Partition zugesagte Offset von Kafka aufbewahrt, damit er im Falle einer Neuzuweisung oder eines Neustarts abgerufen werden kann. Sobald eine Gruppe jedoch leer ist, behält Kafka die zugesagten Offsets nur für die in dieser Konfiguration festgelegte Dauer - standardmäßig 7 Tage. Sobald die Offsets gelöscht sind, verhält sich die Gruppe, wenn sie wieder aktiv wird, wie eine brandneue Verbrauchergruppe, die sich an nichts mehr erinnert, was sie in der Vergangenheit konsumiert hat. Beachte, dass sich dieses Verhalten einige Male geändert hat. Wenn du alsoeine ältereVersion als 2.1.0 verwendest, solltest du dich in der Dokumentation deiner Version über das erwartete Verhalten informieren.

Zusagen und Aufrechnungen

Wenn wir poll() aufrufen, werden die Datensätze zurückgegeben, die in Kafka geschrieben wurden und die die Verbraucher in unserer Gruppe noch nicht gelesen haben. Das bedeutet, dass wir eine Möglichkeit haben, zu verfolgen, welche Datensätze von einem Verbraucher der Gruppe gelesen wurden. Wie bereits erwähnt, ist eine der einzigartigen Eigenschaften von Kafka, dass es keine Bestätigungen von Verbrauchern verfolgt, wie es viele JMS-Warteschlangen tun. Stattdessen können die Verbraucher mit Kafka ihre Position (Offset) in jeder Partition verfolgen.

Wir nennen die Aktualisierung der aktuellen Position in der Partition offsetcommit. Im Gegensatz zu traditionellen Warteschlangen werden bei Kafka die Datensätze nicht einzeln übertragen. Stattdessen übertragen die Konsumenten die letzte erfolgreich verarbeitete Nachricht aus einer Partition und gehen implizit davon aus, dass jede Nachricht vor der letzten ebenfalls erfolgreich verarbeitet wurde.

Wie bestätigt ein Verbraucher einen Offset? Er sendet eine Nachricht an Kafka, das ein spezielles __consumer_offsets Topic mit dem bestätigten Offset für jede Partition aktualisiert. Solange alle deine Konsumenten aktiv sind und arbeiten, hat dies keine Auswirkungen. Wenn jedoch ein Verbraucher abstürzt oder ein neuer Verbraucher der Verbrauchergruppe beitritt, löst dies einen Rebalance aus. Nach einem Rebalancing werden jedem Verbraucher möglicherweise andere Partitionen zugewiesen als die, die er zuvor bearbeitet hat. Um zu wissen, wo er seine Arbeit fortsetzen soll, liest der Verbraucher den letzten Commit-Offset jeder Partition und fährt von dort aus fort.

Wenn der bestätigte Offset kleiner ist als der Offset der letzten Nachricht, die der Client verarbeitet hat, werden die Nachrichten zwischen dem letzten verarbeiteten Offset und dem bestätigten Offset zweimal verarbeitet. Siehe Abbildung 4-8.

kdg2 0408
Abbildung 4-8. Aufbereitete Nachrichten

Wenn der bestätigte Offset größer ist als der Offset der letzten Nachricht, die der Client tatsächlich verarbeitet hat, werden alle Nachrichten zwischen dem letzten verarbeiteten Offset und dem bestätigten Offset von der Verbrauchergruppe verpasst. Siehe Abbildung 4-9.

kdg2 0409
Abbildung 4-9. Verpasste Nachrichten zwischen Offsets

Es ist klar, dass die Verwaltung von Offsets einen großen Einfluss auf die Kundenanwendung hat. Die KafkaConsumer API bietet mehrere Möglichkeiten, Offsets zu binden.

Welcher Offset ist gebunden?

Wenn du Offsets automatisch oder ohne Angabe der gewünschten Offsets festlegst, wird standardmäßig der Offset nach dem letzten Offset, der von poll() zurückgegeben wurde, festgelegt. Das ist wichtig zu wissen, wenn du versuchst, bestimmte Offsets manuell festzulegen, oder wenn du versuchst, bestimmte Offsets festzulegen. Es ist aber auch mühsam, immer wieder zu lesen: "Übertrage den Offset, der um eins größer ist als der letzte Offset, den der Client von poll() erhalten hat", und in 99% der Fälle spielt das keine Rolle. Deshalb schreiben wir "Commit the last offset", wenn wir uns auf das Standardverhalten beziehen, und wenn du Offsets manuell manipulieren musst, behalte diesen Hinweis bitte im Hinterkopf.

Automatisches Commit

Der einfachste Weg, Offsets zu übermitteln, ist, den Verbraucher dies für dich tun zu lassen. Wenn du enable.auto.commit=true konfigurierst, überträgt der Verbraucher alle fünf Sekunden den letzten Versatz, den dein Client von poll() erhalten hat. Das Fünf-Sekunden-Intervall ist die Standardeinstellung und wird durch die Einstellung auto.commit.interval.ms gesteuert. Wie alles andere im Consumer werden auch die automatischen Commits von der Poll-Schleife gesteuert. Jedes Mal, wenn du einen Poll durchführst, prüft der Verbraucher, ob es Zeit für eine Übergabe ist.

Bevor du diese bequeme Option nutzt, solltest du dir jedoch über dieKonsequenzen im Klaren sein.

Stell dir vor, dass automatische Commits standardmäßig alle fünf Sekunden stattfinden. Nehmen wir an, dass drei Sekunden nach dem letzten Commit unser Verbraucher abgestürzt ist. Nach dem Rebalancing beginnen die überlebenden Konsumenten damit, die Partitionen zu konsumieren, die zuvor im Besitz des abgestürzten Brokers waren. Aber sie beginnen mit dem letzten Offset, der übertragen wurde. In diesem Fall ist der Offset drei Sekunden alt, so dass alle Ereignisse, die in diesen drei Sekunden eingetroffen sind, doppelt verarbeitet werden. Es ist möglich, das Übermittlungsintervall so zu konfigurieren, dass die Übermittlung häufiger erfolgt und das Fenster, in dem Datensätze dupliziert werden, verringert wird, aber es ist nicht möglich, sie vollständig zu eliminieren.

Wenn die Funktion "Autocommit" aktiviert ist, wird bei der nächsten Abfrage der letzte von der vorherigen Abfrage zurückgegebene Offset übertragen. Er weiß nicht, welche Ereignisse tatsächlich verarbeitet wurden. Deshalb ist es wichtig, dass immer alle von poll() zurückgegebenen Ereignisse verarbeitet werden, bevor poll() erneut aufgerufen wird. (Genau wie poll() überträgt auch close() die Offsets automatisch.) Normalerweise ist das kein Problem, aber pass auf, wenn du Ausnahmen behandelst oder die Poll-Schleife vorzeitig verlässt.

Automatische Commits sind praktisch, aber sie geben Entwicklern nicht genug Kontrolle, um doppelte Nachrichten zu vermeiden.

Commit Current Offset

Die meisten Entwickler haben mehr Kontrolle über den Zeitpunkt, zu dem die Offsetsübertragen werden- sowohl um die Möglichkeit verpasster Nachrichten auszuschließen als auch um die Anzahl der Nachrichten zu reduzieren, die beim Rebalancing dupliziert werden. Die Verbraucher-API bietet die Möglichkeit, den aktuellen Offset zu einem Zeitpunkt zu übermitteln, der für den Anwendungsentwickler sinnvoll ist und nicht auf einem Timer basiert.

Wenn du enable.auto.commit=false einstellst, werden Offsets nur dann übertragen, wenn die Anwendung dies ausdrücklich wünscht. Die einfachste und zuverlässigste der Commit-APIs ist commitSync(). Diese API überträgt den letzten von poll() zurückgegebenen Versatz und kehrt zurück, sobald der Versatz übertragen wurde. Wenn die Übertragung aus irgendeinem Grund fehlschlägt, wird eine Ausnahme ausgelöst.

Es ist wichtig, sich daran zu erinnern, dass commitSync() den letzten von poll() zurückgegebenen Offset festschreibt. Wenn du also commitSync() aufrufst, bevor du mit der Verarbeitung aller Datensätze in der Sammlung fertig bist, riskierst du, dass du die Nachrichten verpasst, die zwar festgeschrieben, aber nicht verarbeitet wurden, falls die Anwendung abstürzt. Wenn die Anwendung abstürzt, während sie noch Datensätze in der Sammlung verarbeitet, werden alle Nachrichten vom Beginn des letzten Stapels bis zum Zeitpunkt des Neuabgleichs zweimal verarbeitet - das kann besser oder schlechter sein als fehlende Nachrichten.

So würden wir commitSync verwenden, um die Offsets zu übertragen, nachdem wir den letzten Stapel von Nachrichten verarbeitet haben:

Duration timeout = Duration.ofMillis(100);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %d, offset =
            %d, customer = %s, country = %s\n",
            record.topic(), record.partition(),
            record.offset(), record.key(), record.value()); 1
    }
    try {
        consumer.commitSync(); 2
    } catch (CommitFailedException e) {
        log.error("commit failed", e) 3
    }
}
1

Gehen wir davon aus, dass wir mit dem Drucken des Inhalts eines Datensatzes dessen Verarbeitung abgeschlossen haben. Deine Anwendung wird wahrscheinlich noch viel mehr mit den Datensätzen machen - sie verändern, anreichern, aggregieren, auf einem Dashboard anzeigen oder die Nutzer über wichtige Ereignisse informieren. Wann du mit einem Datensatz "fertig" bist, solltest du anhand deines Anwendungsfalls festlegen.

2

Sobald wir alle Datensätze im aktuellen Stapel "verarbeitet" haben, rufen wircommitSync auf, um den letzten Offset im Stapel zu bestätigen, bevor wir nach weiterenNachrichten fragen.

3

commitSync Wiederholungsversuche, solange kein Fehler auftritt, der nicht behoben werden kann. Wenn das passiert, können wir nicht viel tun, außer einen Fehler zu protokollieren.

Asynchrones Commit

Ein Nachteil des manuellen Commits ist, dass die Anwendung blockiert wird, bis der Broker auf die Commit-Anfrage antwortet. Dadurch wird der Durchsatz der Anwendung eingeschränkt. Der Durchsatz kann verbessert werden, wenn die Übertragungen seltener erfolgen, aber dann erhöht sich die Anzahl der potenziellen Duplikate, die durch einen Rebalance entstehen können.

Eine weitere Option ist die asynchrone Commit-API. Anstatt darauf zu warten, dass der Broker auf eine Übergabe antwortet, senden wir einfach die Anfrage und fahren fort:

Duration timeout = Duration.ofMillis(100);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
            offset = %d, customer = %s, country = %s\n",
            record.topic(), record.partition(), record.offset(),
            record.key(), record.value());
    }
    consumer.commitAsync(); 1
}
1

Übernimm den letzten Offset und mach weiter.

Der Nachteil ist, dass commitSync() die Übertragung so lange wiederholt, bis sie entweder erfolgreich ist oder ein nicht rückgängig zu machender Fehler auftritt, während commitAsync() die Übertragung nicht wiederholt. Der Grund dafür, dass es nicht erneut versucht, ist, dass zu dem Zeitpunkt, an dem commitAsync() eine Antwort vom Server erhält, bereits eine spätere Übertragung erfolgreich war. Stell dir vor, wir haben eine Anfrage zur Übergabe des Offsets 2000 gesendet. Es gibt ein vorübergehendes Kommunikationsproblem, so dass der Broker die Anfrage nie erhält und daher auch nicht antwortet. In der Zwischenzeit haben wir einen anderen Stapel verarbeitet und den Offset 3000 erfolgreich übertragen. Wenn commit​Async() nun die zuvor fehlgeschlagene Übermittlung wiederholt, könnte es gelingen, den Versatz 2000 zu übermitteln , nachdem der Versatz 3000 bereits verarbeitet und übermittelt wurde. Im Falle eines Rebalancing führt dies zu weiteren Duplikaten.

Wir erwähnen diese Komplikation und die Bedeutung der korrekten Reihenfolge der Commits, weil commitAsync() dir auch die Möglichkeit gibt, einen Callback zu übergeben, der ausgelöst wird, wenn der Broker antwortet. Es ist üblich, den Callback zu verwenden, um Commit-Fehler zu protokollieren oder sie in einer Metrik zu zählen, aber wenn du den Callback für Wiederholungsversuche verwenden willst, musst du dir des Problems mit der Commit-Reihenfolge bewusst sein:

Duration timeout = Duration.ofMillis(100);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition,
        OffsetAndMetadata> offsets, Exception e) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        }
    }); 1
}
1

Wir senden den Commit und machen weiter, aber wenn der Commit fehlschlägt, werden der Fehler und die Offsets protokolliert.

Wiederholung von Async Commits

Ein einfaches Muster für die richtige Reihenfolge der Übertragungen bei asynchronen Wiederholungen ist die Verwendung einer monoton steigenden Sequenznummer. Erhöhe die Sequenznummer bei jedem Commit und füge die Sequenznummer zum Zeitpunkt des Commits dem commitAsync Callback hinzu. Wenn du dich bereit machst, einen Wiederholungsversuch zu senden, überprüfe, ob die Commit-Sequenznummer, die der Callback erhalten hat, gleich der Instanzvariablen ist; wenn das der Fall ist, gab es keinen neueren Commit und es ist sicher, es erneut zu versuchen. Wenn die Sequenznummer der Instanz höher ist, darfst du es nicht erneut versuchen, da bereits ein neuerer Commit gesendet wurde.

Kombination von synchronen und asynchronen Commits

Normalerweise sind gelegentliche Fehler bei der Übertragung ohne erneuten Versuch kein großes Problem, denn wenn das Problem nur vorübergehend ist, wird die nächste Übertragung erfolgreich sein. Wenn wir aber wissen, dass dies der letzte Commit ist, bevor wir den Konsumenten schließen oder bevor wir einen Rebalance durchführen, wollen wir besonders sichergehen, dass der Commit erfolgreich ist.

Daher ist es ein gängiges Muster, commitAsync() mit commitSync() kurz vor dem Herunterfahren zu kombinieren. So funktioniert es (wir werden im Abschnitt über die Rebalance-Listener besprechen, wie man kurz vor dem Rebalance festlegt):

Duration timeout = Duration.ofMillis(100);

try {
    while (!closing) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d,
                customer = %s, country = %s\n",
                record.topic(), record.partition(),
                record.offset(), record.key(), record.value());
        }
        consumer.commitAsync(); 1
    }
    consumer.commitSync(); 2
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
        consumer.close();
}
1

Obwohl alles in Ordnung ist, verwenden wir commitAsync. Es ist schneller, und wenn ein Commit fehlschlägt, dient der nächste Commit als Wiederholungsversuch.

2

Aber wenn wir schließen, gibt es keinen "nächsten Commit". Wir rufen commitSync() auf, weil es so lange versucht wird, bis es erfolgreich ist oder ein nicht behebbarer Fehler auftritt.

Festlegen eines bestimmten Versatzes

Mit dem Commit des letzten Offsets kannst du nur so oft committen, wie du Batches bearbeitest. Aber was ist, wenn du noch häufiger committen willst? Was ist, wenn poll() einen großen Stapel zurückgibt und du die Offsets in der Mitte des Stapels festschreiben willst, um zu vermeiden, dass du all diese Zeilen erneut verarbeiten musst, wenn ein Ausgleich stattfindet? Du kannst nicht einfach commitSync() oder commitAsync()aufrufen - dann wird der letzte zurückgegebene Offset übertragen, den du noch nicht verarbeiten konntest.

Zum Glück kannst du mit der Verbraucher-API commitSync() und commitAsync() aufrufen und eine Zuordnung von Partitionen und Offsets übergeben, die du festschreiben möchtest. Wenn du gerade einen Stapel Datensätze verarbeitest und die letzte Nachricht, die du von Partition 3 im Thema "Kunden" erhalten hast, den Offset 5000 hat, kannst du commitSync() aufrufen, um den Offset 5001 für Partition 3 im Thema "Kunden" zu übertragen. Da dein Konsument möglicherweise mehr als eine Partition nutzt, musst du die Offsets für alle Partitionen verfolgen, was deinen Code noch komplexer macht.

So sieht ein Commit von bestimmten Offsets aus:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>(); 1
int count = 0;

....
Duration timeout = Duration.ofMillis(100);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s, offset = %d,
            customer = %s, country = %s\n",
            record.topic(), record.partition(), record.offset(),
            record.key(), record.value()); 2
        currentOffsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset()+1, "no metadata")); 3
        if (count % 1000 == 0)   4
            consumer.commitAsync(currentOffsets, null); 5
        count++;
    }
}
1

Dies ist die Karte, die wir verwenden werden, um Versätze manuell zu verfolgen.

2

Erinnere dich: println steht für die Verarbeitung der Datensätze, die du konsumierst.

3

Nachdem wir jeden Datensatz gelesen haben, aktualisieren wir die Offset-Map mit dem Offset der nächsten Nachricht, die wir voraussichtlich verarbeiten werden. Der Offset sollte immer der Offset der nächsten Nachricht sein, die deine Anwendung lesen wird. An dieser Stelle beginnen wir beim nächsten Mal mit dem Lesen.

4

Hier entscheiden wir uns dafür, die aktuellen Offsets alle 1.000 Datensätze zu übertragen. In deiner Anwendung kannst du die Übergabe nach der Zeit oder vielleicht nach dem Inhalt der Datensätze vornehmen.

5

Ich habe mich für den Aufruf von commitAsync() entschieden (ohne Callback, daher ist der zweite Parameter null), aber commitSync() ist auch hier völlig gültig. Wenn du bestimmte Offsets festlegst, musst du natürlich trotzdem alle Fehlerbehandlungen durchführen, die wir in den vorherigen Abschnitten gesehen haben.

Hörer wieder ins Gleichgewicht bringen

Wie wir bereits im vorherigen Abschnitt über das Übertragen von Offsets erwähnt haben, wird ein Verbraucher vor dem Beenden und vor dem Neuausgleich der Partitionen einige Aufräumarbeiten durchführen wollen.

Wenn du weißt, dass dein Konsument den Besitz einer Partition verlieren wird, willst du die Offsets des letzten Ereignisses, das du verarbeitet hast, übertragen. Vielleicht musst du auch Datei-Handles, Datenbankverbindungen und Ähnliches schließen.

Die Verbraucher-API ermöglicht es dir, deinen eigenen Code auszuführen, wenn Partitionen zum Verbraucher hinzugefügt oder von ihm entfernt werden. Dazu übergibst du eine ConsumerRebalanceListener, wenn du die Methode subscribe() aufrufst, die wir zuvor besprochen haben. ConsumerRebalanceListener hat drei Methoden, die du implementieren kannst:

public void onPartitionsAssigned(Collection<TopicPartition> partitions)

Wird aufgerufen, nachdem die Partitionen dem Consumer neu zugewiesen wurden, aber bevor der Consumer beginnt, Nachrichten zu konsumieren. Hier kannst du alle Zustände vorbereiten oder laden, die du mit der Partition verwenden willst, die richtigen Offsets suchen, falls nötig, oder ähnliches. Alle Vorbereitungen, die hier getroffen werden, sollten garantiert innerhalb von max.poll.timeout.ms zurückgegeben werden, damit der Konsument der Gruppe erfolgreich beitreten kann.

public void onPartitionsRevoked(Collection<TopicPartition> partitions)

Wird aufgerufen, wenn der Verbraucher Partitionen aufgeben muss, die er zuvor besaß - entweder als Ergebnis eines Rebalancings oder wenn der Verbraucher geschlossen wird. Im Normalfall, wenn ein eifriger Rebalancing-Algorithmus verwendet wird, wird diese Methode aufgerufen, bevor das Rebalancing beginnt und nachdem der Verbraucher keine Nachrichten mehr konsumiert. Wenn ein kooperativer Rebalancing-Algorithmus verwendet wird, wird diese Methode am Ende des Rebalancings aufgerufen, und zwar nur mit der Teilmenge der Partitionen, die der Verbraucher aufgeben muss. Hier willst du die Offsets festlegen, damit derjenige, der diese Partition als Nächstes bekommt, weiß, wo er anfangen muss.

public void onPartitionsLost(Collection<TopicPartition> partitions)

Wird nur aufgerufen, wenn ein kooperativer Rebalancing-Algorithmus verwendet wird, und nur in Ausnahmefällen, wenn die Partitionen anderen Verbrauchern zugewiesen wurden, ohne dass sie zuvor vom Rebalancing-Algorithmus widerrufen wurden (in normalen Fällen wird onPartitions​Revoked() aufgerufen). An dieser Stelle bereinigst du alle Zustände oder Ressourcen, die mit diesen Partitionen verwendet werden. Beachte, dass dies mit Bedacht geschehen muss - der neue Eigentümer der Partitionen hat möglicherweise bereits seinen eigenen Status gespeichert, und du musst Konflikte vermeiden. Wenn du diese Methode nicht implementierst, wird stattdessen onPartitions​Revoked() aufgerufen.

Tipp

Wenn du einen kooperativen Rebalancing-Algorithmus verwendest, beachte, dass:

  • onPartitionsAssigned() wird bei jedem Rebalancing aufgerufen, um den Verbraucher darüber zu informieren, dass ein Rebalancing stattgefunden hat. Wenn dem Verbraucher jedoch keine neuen Partitionen zugewiesen wurden, wird er mit einer leeren Sammlung aufgerufen.

  • onPartitionsRevoked() wird unter normalen Rebalancing-Bedingungen aufgerufen, aber nur, wenn der Verbraucher den Besitz von Partitionen aufgegeben hat. Er wird nicht mit einer leeren Sammlung aufgerufen.

  • onPartitionsLost() wird nur in Ausnahmefällen aufgerufen und die Partitionen in der Sammlung haben bereits neue Eigentümer, wenn die Methode aufgerufen wird.

Wenn du alle drei Methoden implementiert hast, ist gewährleistet, dass onPartitionsAssigned() bei einem normalen Rebalancing vom neuen Eigentümer der Partitionen aufgerufen wird, die erst dann neu zugewiesen werden, wenn der vorherige Eigentümer onPartitionsRevoked() abgeschlossen und seinen Besitz aufgegeben hat.

In diesem Beispiel zeigt, wie man onPartitionsRevoked() verwendet, um Offsets zu übertragen, bevor man den Besitz einer Partition verliert:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>();
Duration timeout = Duration.ofMillis(100);

private class HandleRebalance implements ConsumerRebalanceListener { 1
    public void onPartitionsAssigned(Collection<TopicPartition>
        partitions) { 2
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance. " +
            "Committing current offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets); 3
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance()); 4

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d,
                 customer = %s, country = %s\n",
                 record.topic(), record.partition(), record.offset(),
                 record.key(), record.value());
             currentOffsets.put(
                 new TopicPartition(record.topic(), record.partition()),
                 new OffsetAndMetadata(record.offset()+1, null));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // ignore, we're closing
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(currentOffsets);
    } finally {
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }
}
1

Wir beginnen mit der Implementierung einer ConsumerRebalanceListener.

2

In diesem Beispiel müssen wir nichts tun, wenn wir eine neue Partition erhalten; wir fangen einfach an, Nachrichten zu konsumieren.

3

Wenn wir jedoch im Begriff sind, eine Partition aufgrund eines Rebalancings zu verlieren, müssen wir Offsets festlegen. Wir übertragen die Versätze für alle Partitionen, nicht nur für die Partitionen, die wir verlieren werden - da die Versätze für Ereignisse gelten, die bereits verarbeitet wurden, ist das nicht weiter schlimm. Und wir nutzen commitSync(), um sicherzustellen, dass die Versätze übertragen werden, bevor der Rebalancing-Prozess beginnt.

4

Der wichtigste Teil: Übergib die ConsumerRebalanceListener an diesubscribe() Methode, damit sie vom Verbraucher aufgerufen wird.

Datensätze mit bestimmten Offsets verbrauchen

Bisher haben wir gesehen, wie man poll() verwendet, um mit dem Lesen von Nachrichten ab dem letzten Commit-Offset in jeder Partition zu beginnen und alle Nachrichten nacheinander zu verarbeiten. Manchmal möchtest du jedoch mit dem Lesen an einem anderen Offset beginnen. Kafka bietet eine Reihe von Methoden, die bewirken, dass die nächste poll() mit dem Konsumieren an einem anderen Offset beginnt.

Wenn du mit dem Lesen aller Nachrichten am Anfang der Partition beginnen möchtest, oder wenn du den ganzen Weg bis zum Ende der Partition überspringen und nur neue Nachrichten lesen möchtest, gibt es spezielle APIs dafür: seekToBeginning(Collection<TopicPartition> tp) und seekToEnd(Collection<TopicPartition> tp).

Mit der Kafka-API kannst du auch nach einem bestimmten Offset suchen. Diese Fähigkeit kann auf verschiedene Weise genutzt werden: Eine zeitkritische Anwendung kann zum Beispiel einige Datensätze überspringen, wenn sie in Verzug gerät, oder ein Verbraucher, der Daten in eine Datei schreibt, kann auf einen bestimmten Zeitpunkt zurückgesetzt werden, um Daten wiederherzustellen, wenn die Datei verloren gegangen ist.

Hier ist ein kurzes Beispiel, wie du den aktuellen Versatz auf allen Partitionen auf Datensätze setzen kannst, die zu einem bestimmten Zeitpunkt produziert wurden:

Long oneHourEarlier = Instant.now().atZone(ZoneId.systemDefault())
          .minusHours(1).toEpochSecond();
Map<TopicPartition, Long> partitionTimestampMap = consumer.assignment()
        .stream()
        .collect(Collectors.toMap(tp -> tp, tp -> oneHourEarlier)); 1
Map<TopicPartition, OffsetAndTimestamp> offsetMap
        = consumer.offsetsForTimes(partitionTimestampMap); 2

for(Map.Entry<TopicPartition,OffsetAndTimestamp> entry: offsetMap.entrySet()) {
    consumer.seek(entry.getKey(), entry.getValue().offset()); 3
}
1

Wir erstellen eine Map von allen Partitionen, die diesem Verbraucher zugewiesen sind (überconsumer.assignment()) zu dem Zeitstempel, auf den wir die Verbraucher zurücksetzen wollen.

2

Dann erhalten wir die Offsets, die zu diesen Zeitstempeln aktuell waren. Diese Methode sendet eine Anfrage an den Broker, in der ein Zeitstempel-Index verwendet wird, um die relevanten Offsets zurückzugeben.

3

Schließlich setzen wir den Offset jeder Partition auf den Offset zurück, der im vorherigen Schritt zurückgegeben wurde.

Aber wie steigen wir aus?

Früher in diesem Kapitel, als wir die Umfrageschleife besprochen haben, haben wir dir gesagt, dass du dir keine Sorgen darüber machen musst, dass der Verbraucher in einer Endlosschleife abfragt, und dass wir besprechen werden, wie du die Schleife sauber beenden kannst. Lasst uns also besprechen, wie man die Schleife sauber verlässt.

Wenn du dich entscheidest, den Verbraucher zu beenden, und du willst ihn sofort beenden, obwohl der Verbraucher möglicherweise auf eine lange poll() wartet, brauchst du einen anderen Thread, um die consumer.wakeup(). Wenn du die Verbraucherschleife im Haupt-Thread ausführst, kann dies von ShutdownHook aus geschehen. Beachte, dass consumer.wakeup() die einzige Verbrauchermethode ist, die sicher von einem anderen Thread aus aufgerufen werden kann.Ein Aufruf von wakeup führt dazu, dass poll() mit WakeupException beendet wird, oder wenn consumer.wakeup() aufgerufen wurde, während nicht auf Poll wartete, wird die Ausnahme bei der nächsten Iteration ausgelöst, wenn poll() aufgerufen wird. WakeupException muss nicht behandelt werden, aber bevor du den Thread beendest, musst du consumer.close() aufrufen. Durch das Schließen des Verbrauchers werden die Offsets bei Bedarf übertragen und der Gruppenkoordinator erhält eine Nachricht, dass der Verbraucher die Gruppe verlässt. Der Verbraucherkoordinator löst sofort einen Neuausgleich aus und du musst nicht warten, bis die Sitzung einen Timeout hat, bevor die Partitionen des Verbrauchers, den du schließt, einem anderen Verbraucher in der Gruppe zugewiesen werden.

So sieht der Exit-Code aus, wenn der Consumer im Hauptanwendungsthread läuft. Dieses Beispiel ist etwas verkürzt, aber du kannst dir das vollständige Beispiel auf GitHub ansehen:

Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        System.out.println("Starting exit...");
        consumer.wakeup(); 1
        try {
            mainThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

...
Duration timeout = Duration.ofMillis(10000); 2

try {
    // looping until ctrl-c, the shutdown hook will cleanup on exit
    while (true) {
        ConsumerRecords<String, String> records =
            movingAvg.consumer.poll(timeout);
        System.out.println(System.currentTimeMillis() +
            "--  waiting for data...");
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s\n",
                record.offset(), record.key(), record.value());
        }
        for (TopicPartition tp: consumer.assignment())
            System.out.println("Committing offset at position:" +
                consumer.position(tp));
        movingAvg.consumer.commitSync();
    }
} catch (WakeupException e) {
    // ignore for shutdown 3
} finally {
    consumer.close(); 4
    System.out.println("Closed consumer and we are done");
}
1

ShutdownHook läuft in einem separaten Thread. Die einzige sichere Maßnahme, die du ergreifen kannst, ist der Aufruf von wakeup, um aus der poll Schleife auszubrechen.

2

Ein besonders langes Poll-Timeout. Wenn die Abfrageschleife kurz genug ist und es dir nichts ausmacht, ein wenig zu warten, bevor du sie beendest, brauchst du wakeupnicht aufzurufen - es reicht aus, bei jeder Iteration einen atomaren Booleschen Wert zu überprüfen. Lange Poll-Timeouts sind nützlich, wenn du Themen mit geringem Durchsatz konsumierst. Auf diese Weise verbraucht der Client weniger CPU, weil er ständig in einer Schleife läuft, während der Broker keine neuen Daten zurückgibt.

3

Ein anderer Thread, der wakeup aufruft, führt dazu, dass poll eine WakeupException auslöst. Du solltest die Ausnahme abfangen, um sicherzustellen, dass deine Anwendung nicht unerwartet beendet wird, aber es ist nicht nötig, etwas damit zu tun.

4

Bevor du den Verbraucher verlässt, solltest du ihn sauber schließen.

Deserialisierer

Wie im vorherigen Kapitel beschrieben, benötigen Kafka-Produzenten Serialisierer, um Objekte in Byte-Arrays umzuwandeln, die dann an Kafka gesendet werden. Ebenso benötigen Kafka-Konsumenten Deserialisierer, um von Kafka empfangene Byte-Arrays in Java-Objekte zu konvertieren. In den vorangegangenen Beispielen haben wir einfach angenommen, dass sowohl der Schlüssel als auch der Wert jeder Nachricht Strings sind, und wir haben die Standardkonfiguration StringDeserializer in derKonsumentenkonfiguration verwendet.

In Kapitel 3 über den Kafka-Producer haben wir gesehen, wie man benutzerdefinierte Typen serialisiert und wie man Avro und AvroSerializers verwendet, um Avro-Objekte aus Schemadefinitionen zu erzeugen und sie dann zu serialisieren, wenn man Nachrichten an Kafka produziert. Jetzt schauen wir uns an, wie du benutzerdefinierte Deserialisierer für deine eigenen Objekte erstellst und wie du Avro und seine Deserialisierer nutzen kannst.

Es sollte klar sein, dass der Serialisierer, der zur Erzeugung von Ereignissen für Kafka verwendet wird, mit dem Deserialisierer übereinstimmen muss, der verwendet wird, wenn die Ereignisse konsumiert werden. Wenn du mit IntSerializer serialisierst und dann mit StringDeserializer deserialisierst, wird das nicht gut ausgehen. Das bedeutet, dass du als Entwickler den Überblick darüber behalten musst, welche Serializer verwendet wurden, um in jedes Topic zu schreiben, und sicherstellen musst, dass jedes Topic nur Daten enthält, die die von dir verwendeten Deserializer interpretieren können. Das ist einer der Vorteile der Verwendung von Avro und der Schema Registry für die Serialisierung und Deserialisierung - AvroSerializer kann sicherstellen, dass alle Daten, die in ein bestimmtes Topic geschrieben werden, mit dem Schema des Topics kompatibel sind, was bedeutet, dass sie mit dem passenden Deserializer und Schema deserialisiert werden können. Kompatibilitätsfehler - sowohl auf der Erzeuger- als auch auf der Verbraucherseite - werden einfach mit einer entsprechenden Fehlermeldung abgefangen, so dass du nicht versuchen musst, Byte-Arrays auf Serialisierungsfehler zu untersuchen.

Wir zeigen zunächst, wie man einen benutzerdefinierten Deserialisierer schreibt, auch wenn dies die weniger verbreitete Methode ist, und gehen dann zu einem Beispiel über, wie man Avro verwendet, um Nachrichtenschlüssel und -werte zu deserialisieren.

Benutzerdefinierte Deserialisierer

Nehmen wir dasselbe benutzerdefinierte Objekt, das wir in Kapitel 3 serialisiert haben, und schreiben wir einen Deserialisierer dafür:

public class Customer {
    private int customerID;
    private String customerName;

    public Customer(int ID, String name) {
        this.customerID = ID;
        this.customerName = name;
    }

    public int getID() {
        return customerID;
    }

    public String getName() {
        return customerName;
    }
}

Der benutzerdefinierte Deserializer sieht wie folgt aus:

import org.apache.kafka.common.errors.SerializationException;

import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerDeserializer implements Deserializer<Customer> { 1

    @Override
    public void configure(Map configs, boolean isKey) {
        // nothing to configure
    }

    @Override
    public Customer deserialize(String topic, byte[] data) {
        int id;
        int nameSize;
        String name;

        try {
            if (data == null)
                return null;
            if (data.length < 8)
                throw new SerializationException("Size of data received " +
                    "by deserializer is shorter than expected");

            ByteBuffer buffer = ByteBuffer.wrap(data);
            id = buffer.getInt();
            nameSize = buffer.getInt();

            byte[] nameBytes = new byte[nameSize];
            buffer.get(nameBytes);
            name = new String(nameBytes, "UTF-8");

            return new Customer(id, name); 2

        } catch (Exception e) {
  	        throw new SerializationException("Error when deserializing " +   	        "byte[] to Customer " + e);
        }
    }

    @Override
    public void close() {
        // nothing to close
    }
}
1

Der Verbraucher benötigt auch die Implementierung der Klasse Customer, und sowohl die Klasse als auch der Serialisierer müssen in den produzierenden und konsumierenden Anwendungen übereinstimmen. In einer großen Organisation mit vielen Konsumenten und Produzenten, die sich den Zugang zu den Daten teilen, kann dies zu einer Herausforderung werden.

2

Wir kehren hier einfach die Logik des Serializers um - wir holen die Kunden-ID und den Namen aus dem Byte-Array und verwenden sie, um das benötigte Objekt zu erstellen.

Der Konsumentencode, der diesen Deserialisierer verwendet, sieht ähnlich aus wie dieses Beispiel:

Duration timeout = Duration.ofMillis(100);
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    CustomerDeserializer.class.getName());


KafkaConsumer<String, Customer> consumer =
    new KafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("customerCountries"))

while (true) {
    ConsumerRecords<String, Customer> records = consumer.poll(timeout);
    for (ConsumerRecord<String, Customer> record : records) {
        System.out.println("current customer Id: " +
            record.value().getID() + " and
            current customer name: " +  record.value().getName());
    }
    consumer.commitSync();
}

Auch hier ist es wichtig zu beachten, dass die Implementierung eines eigenen Serializers und Deserializers nicht empfohlen wird. Er koppelt Produzenten und Konsumenten eng aneinander und ist anfällig und fehleranfällig. Eine bessere Lösung wäre die Verwendung eines Standardnachrichtenformats wie JSON, Thrift, Protobuf oder Avro. Wir werden jetzt sehen, wie man Avro-Deserialisierer mit dem Kafka-Konsumenten verwendet. Hintergrundinformationen zu Apache Avro, seinen Schemata und Schemakompatibilitätsfunktionen findest du in Kapitel 3.

Avro Deserialisierung mit Kafka Consumer verwenden

Nehmen wir an, dass wir die Implementierung der Klasse Customer in Avro verwenden, die in Kapitel 3 gezeigt wurde. Um diese Objekte aus Kafka zu konsumieren, willst du eine konsumierende Anwendung ähnlich der folgenden implementieren:

Duration timeout = Duration.ofMillis(100);
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "io.confluent.kafka.serializers.KafkaAvroDeserializer"); 1
props.put("specific.avro.reader","true");
props.put("schema.registry.url", schemaUrl); 2
String topic = "customerContacts"

KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));

System.out.println("Reading topic:" + topic);

while (true) {
    ConsumerRecords<String, Customer> records = consumer.poll(timeout); 3

    for (ConsumerRecord<String, Customer> record: records) {
        System.out.println("Current customer name is: " +
            record.value().getName()); 4
    }
    consumer.commitSync();
}
1

Wir verwenden KafkaAvroDeserializer, um die Avro-Nachrichten zu deserialisieren.

2

schema.registry.url ist ein neuer Parameter. Dieser verweist einfach darauf, wo wir die Schemata speichern. Auf diese Weise kann der Verbraucher das Schema verwenden, das vom Produzenten registriert wurde, um die Nachricht zu deserialisieren.

3

Wir geben die erzeugte Klasse Customer als Typ für den Datensatzwert an.

4

record.value() ist eine Customer Instanz und wir können sie entsprechend verwenden.

Eigenständiger Verbraucher: Warum und wie man einen Verbraucher ohne Gruppe verwendet

Bisher haben wir über Verbrauchergruppen gesprochen, bei denen die Partitionen automatisch den Verbrauchern zugewiesen werden und automatisch ausgeglichen werden, wenn Verbraucher hinzugefügt oder aus der Gruppe entfernt werden. Normalerweise ist dieses Verhalten genau das, was du willst, aber in manchen Fällen willst du etwas viel Einfacheres. Manchmal weißt du, dass du einen einzelnen Verbraucher hast, der immer Daten aus allen Partitionen eines Themas oder aus einer bestimmten Partition eines Themas lesen muss. In diesem Fall gibt es keinen Grund für Gruppen oder Rebalances - weise einfach das verbraucherspezifische Topic und/oder die Partitionen zu, konsumiere die Nachrichten und übertrage die Offsets bei Gelegenheit (obwohl du group.id immer noch so konfigurieren musst, dass die Offsets übertragen werden, denn ohne den Aufruf von subscribe wird der Verbraucher keiner Gruppe beitreten).

Wenn du genau weißt, welche Fächer der Verbraucher lesen soll, abonnierst du kein Thema, sondern weist dir selbst ein paar Fächer zu. Ein Verbraucher kann entweder Themen abonnieren (und Teil einer Verbrauchergruppe sein) oder sich selbst Fächer zuweisen, aber nicht beides gleichzeitig.

Hier ist ein Beispiel dafür, wie ein Verbraucher sich alle Partitionen eines bestimmten Themas zuweisen und daraus konsumieren kann:

Duration timeout = Duration.ofMillis(100);
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic"); 1

if (partitionInfos != null) {
    for (PartitionInfo partition : partitionInfos)
        partitions.add(new TopicPartition(partition.topic(),
            partition.partition()));
    consumer.assign(partitions); 2

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);

        for (ConsumerRecord<String, String> record: records) {
            System.out.printf("topic = %s, partition = %s, offset = %d,
                customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(),
                record.key(), record.value());
        }
        consumer.commitSync();
    }
}
1

Wir beginnen damit, den Cluster nach den im Thema verfügbaren Partitionen zu fragen. Wenn du nur eine bestimmte Partition nutzen willst, kannst du diesen Teil überspringen.

2

Sobald wir wissen, welche Partitionen wir wollen, rufen wir assign() mit der Liste auf.

Abgesehen von der fehlenden Neugewichtung und der Notwendigkeit, die Partitionen manuell zu finden, ist alles andere wie gewohnt. Bedenke, dass der Verbraucher nicht benachrichtigt wird, wenn jemand neue Partitionen zum Thema hinzufügt. Du musst dies in den Griff bekommen, indem du consumer.partitionsFor() regelmäßig überprüfst oder die Anwendung einfach bouncen lässt, wenn Partitionen hinzugefügt werden.

Zusammenfassung

Wir begannen dieses Kapitel mit einer ausführlichen Erläuterung der Kafka-Konsumentengruppen und der Art und Weise, wie sie es mehreren Konsumenten ermöglichen, sich die Arbeit des Lesens von Ereignissen aus Topics zu teilen. Im Anschluss an die theoretische Diskussion haben wir ein praktisches Beispiel für einen Konsumenten angeführt, der ein Topic abonniert und fortlaufend Ereignisse liest. Anschließend haben wir uns die wichtigsten Konfigurationsparameter für Konsumenten angesehen und erläutert, wie sie sich auf das Verhalten der Konsumenten auswirken. Einen großen Teil des Kapitels haben wir der Diskussion von Offsets gewidmet und wie Konsumenten diese verfolgen. Zu verstehen, wie Verbraucher Offsets festhalten, ist entscheidend für das Schreiben zuverlässiger Verbraucher, deshalb haben wir uns die Zeit genommen, die verschiedenen Möglichkeiten zu erläutern. Anschließend haben wir weitere Teile der Verbraucher-APIs, den Umgang mit Rebalances und das Schließen des Verbrauchers besprochen.

Abschließend haben wir die Deserialisierer besprochen, die von den Konsumenten verwendet werden, um die in Kafka gespeicherten Bytes in Java-Objekte umzuwandeln, die von den Anwendungen verarbeitet werden können. Wir haben die Avro-Deserialisierer ausführlich besprochen, obwohl sie nur eine Art von Deserialisierer sind, die du verwenden kannst, weil sie am häufigsten mit Kafka verwendet werden.

1 Diagramme von Sophie Blee-Goldman, aus ihrem Blogbeitrag vom Mai 2020, "From Eager to Smarter in Apache Kafka Consumer Rebalances".

Get Kafka: The Definitive Guide, 2. Auflage now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.