Kapitel 4. Streaming Data Produkte

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

In , einem Streaming Data Mesh, besitzen die Domains ihre Daten. So entsteht eine dezentrale Datenplattform, die dabei hilft, die Probleme mit der Agilität und Skalierbarkeit von Data Lake und Warehouses zu lösen. Domains müssen nun anderen Domains ihre Daten zur Verfügung stellen. Deshalb ist es wichtig, dass sie ihre Daten als Produkte mit hoher Qualität und Vertrauen behandeln.

Derzeit sind Dateningenieure sehr an die Vorstellung gewöhnt, dass sich alle ihre Daten in einem zentralen Datenspeicher wie einem Data Lake oder Warehouse befinden. Sie sind es gewohnt, bei der Arbeit mit Daten Wege zu finden, den "Ozean" (in diesem Fall den See) zum Kochen zu bringen. Ein Streaming Data Mesh ermöglicht es uns, diese Vorstellung zu verdrängen. In diesem Kapitel werden wir die Anforderungen an Streaming-Data-Produkte erläutern.

In unserer Laufbahn als Dateningenieure haben wir viele Wrapper für Apache Spark geschrieben, eine weit verbreitete Analyse-Engine für die Verarbeitung großer Datenmengen. Erst in den letzten Jahren haben wir verstanden, warum Unternehmen uns darum gebeten haben.

Big-Data-Tools wie Apache Spark, Apache Flink und Apache Kafka Streams waren für viele Ingenieure, die mit der Lösung von Big-Data-Problemen betraut waren, unzugänglich. Um auf Kapitel 1 zurückzukommen: Die Auflockerung der monolithischen Rolle des Dateningenieurs ist ein Nebeneffekt des Data Mesh.

Dieser ist ein sehr wichtiger Punkt, denn ein zweiter Nebeneffekt ist, dass komplexe Data-Engineering-Tools wie Spark, Flink und Kafka Streams für Generalisten zugänglicher werden, damit sie ihre Big-Data-Probleme lösen können. Das ist der Grund, warum uns diese Unternehmen gebeten haben, große Datenverarbeitungsmaschinen zu verpacken. Tabelle 4-1 zeigt eine Liste von Projekten, an denen wir beteiligt waren, um bestimmten Ingenieuren bei der Abfrage von Big Data in Data Lakes zu helfen.

Tabelle 4-1. Apache Spark Wrapper-Projekte und die Ingenieure, die jedes Projekt unterstützt
Das Projekt Der Ingenieur

Big Data Integrator - ein Apache Spark UI Wrapper mit Drag-and-Drop-Funktionen

Integrationsingenieure und Geschäftsanalysten, die Daten verarbeiten müssen, um Berichte in einem Business Intelligence Tool zu erstellen.

Sparknado-ein Apache Spark-Wrapper, der die Airflow-Syntax zum Aufbau von Spark-DAGs verwendet

Airflow-Ingenieure entwickeln Spark-Anwendungen, um Daten in Snowflake zu übertragen.

Apache Envelope - eine YAML-basierte Spark-Konfiguration

Ingenieure, die eine Spark DAG definieren wollen, ohne in Python oder Scala programmieren zu können.

Splunk SPL to Spark - eine Pipe-Syntax, die der Splunk Search Language (SPL) ähnelt

Sicherheitsbedrohungsjäger, die mit Splunk vertraut sind, können in den Netzwerkprotokollen, die in einem Data Lake gespeichert sind, nach Bedrohungen suchen.

Diese Erfahrung bestätigt die Notwendigkeit, Datentools so zu vereinfachen, dass sie für Generalisten und manchmal auch für Spezialisten leichter zugänglich sind, und nicht so sehr, um die Arbeitsbelastung der Dateningenieure zu reduzieren. So können sie schneller auf Unregelmäßigkeiten reagieren und den Kunden sofort Ergebnisse liefern.

Bei der Entwicklung von Datenprodukten ist es wichtig, dies zu verstehen. Indem wir den Bereichen Zugang zu Datenwerkzeugen ermöglichen, können sie komplexe Datenprobleme lösen. Diese Fähigkeit war für sie bisher unerreichbar.

In den folgenden Abschnitten gehen wir auf die Anforderungen bei der Definition und Erstellung von Datenprodukten ein, die in einem Streaming Data Mesh veröffentlicht werden sollen. Wir werden versuchen, den Umfang der Details auf die Erstellung von Streaming-Datenprodukten zu beschränken und verweisen auf Details zu Self-Services und Data Governance, die sich auf Streaming-Datenprodukte beziehen, in den entsprechenden Kapiteln.

Anforderungen an Datenprodukte definieren

Dieser Abschnitt ist eine Zusammenfassung der in Tabelle 4-2 aufgeführten Anforderungen an Datenprodukte. Das Ziel dieser Anforderungen ist es, den Verbrauchern und Domain-Konsumenten eine reibungslose und angenehme Erfahrung mit dem Datennetz zu ermöglichen. Der Zustand des Datennetzes wird durch die Erfahrungen der Verbraucher mit den Datenprodukten bestimmt. Diese Anforderungen helfen dabei, ein Datenprodukt zu entwerfen und zu implementieren, das die Bedürfnisse der Verbraucher erfüllt.

Tabelle 4-2. Vorschläge für Anforderungen an Datenprodukte
Anforderungen Überlegungen zur Umsetzung

Die Datenprodukte sollten von hoher Qualität sein.

  • Die Daten sollten in die von allen Bereichen des Datennetzes definierten Standards transformiert werden .

  • Schlechte Datensätze sollten bei Bedarf herausgefiltert werden.

Datenprodukte müssen sicher und vertrauenswürdig sein.

  • Alle personenbezogenen Daten (PII) oder persönlichen Gesundheitsdaten (PHI) sollten entweder durch Tokenisierung oder Verschlüsselung verschleiert werden.

  • Autorisierung und Authentifizierung werden als ACLs oder RBACs-Regeln definiert.

Die Datenprodukte müssen interoperabel sein.

  • Datenprodukte sollten Schemata enthalten, die das endgültige Datenprodukt darstellen. Schemata dienen als Vertrag zwischen der produzierenden und der konsumierenden Domäne.

  • Datenprodukte sollten in einem Format vorliegen, das Verbraucher unterstützt, die in mehreren Programmiersprachen geschrieben sind (polyglott). Einige Beispiele sind JSON, Apache Avro und protobuf.

Datenprodukte müssen leicht konsumierbar sein .

  • Das Datenprodukt muss mit einfachen technischen Mitteln schnell und leicht zugänglich sein.

  • Das Datenprodukt muss für das Lesen und Servieren optimiert werden. Einige Möglichkeiten, Daten zu optimieren, sind Partitionierung, Indizierung, Bucketing und Komprimierung.

Datenprodukte sollten die Abstammung bewahren.

  • Die Datenabfolge muss aus den Metadaten rekursiv durch die anderen Domänen abgeleitet werden. Wenn das Datenprodukt zum Beispiel das Datenprodukt einer anderen Domäne verwendet, bleibt die Abstammung von dem abgeleiteten Datenprodukt erhalten und kann geparst werden.

Datenprodukte sollten leicht durchsuchbar und selbstbeschreibend sein .

  • Das Datenprodukt sollte mit einer Suchmaschine auffindbar sein und so aussagekräftig sein, dass die Verbraucher ihm vertrauen.

Datenprodukte sollten historische Daten enthalten.

  • Das Datenprodukt sollte die Bereitstellung historischer Daten unterstützen, nicht nur die neuesten Änderungen.

Das nachträgliche Hinzufügen von Anforderungen an Datenprodukte könnte sich als kostspielige technische Schuld erweisen. So ist zum Beispiel die Abstammung höchstwahrscheinlich eine Anforderung, die ohne die dafür benötigten referenzierbaren Metadaten nur schwer im Nachhinein hinzugefügt werden kann. Daher ist es wichtig, diese komplexen Anforderungen frühzeitig zu berücksichtigen, um kostspielige technische Schulden zu vermeiden.

Wenn andere Bereiche Datenprodukte anfordern, stelle sicher, dass sie diese Anforderungen einhalten. Behalte sie im Hinterkopf, wenn du die Quellen auswählst, die du für die Erstellung deiner Datenprodukte benötigst.

Warnung

Einige haben Datenprodukte mit Masterdaten verwechselt. Datenmastering ist ein Prozess, bei dem eine unumstrittene Master-Version eines Datensatzes erstellt wird, die golden record genannt wird. Stammdaten sind ein Merkmal eines Datenprodukts und nicht unbedingt eine Anforderung an ein Datenprodukt. Wenn dein Datenprodukt z. B. Stammdaten für Mitarbeiter bereitstellen soll, ist ein geeignetes Stammdatenmanagement-Tool (MDM) in der Domäne erforderlich, um diese Aufgabe zu erfüllen.

Identifizierung von Datenproduktderivaten

Die Datenprodukte werden aus den Datenquellen innerhalb eines Bereichs abgeleitet. Sie können auch durch Datenprodukte aus anderen Bereichen angereichert werden. Erinnere dich daran, dass die Datenprodukte Eigentum der Fachexperten sind, die sie erstellt haben. Wenn das Datenprodukt, das du erstellst, mit Daten aus einer anderen Domäne angereichert werden muss, musst du diese Daten in deine eigene Domäne einbringen, um deine eigenen Datenprodukte anzureichern. Unter Datenderivaten verstehen wir sowohl Daten innerhalb der Domäne als auch Daten, die aus anderen Domänen stammen. Wenn du diese Derivate identifizierst und ihre Integrationspunkte verstehst, kannst du Lösungen finden, um sie in die Streaming-Plattform einzubringen.

Es gibt zwei Arten von Daten: in Ruhe und in Bewegung. Wir müssen mit dem Ingesting von Datenderivaten beginnen, was bedeutet, dass wir die ruhenden Daten in bewegte Daten verwandeln müssen. Wir wollen auch dafür sorgen, dass die Daten, die bereits in Bewegung sind, in Bewegung bleiben. Es ist wichtig, schon früh im Ingestion-Prozess an die Optimierung der Daten zu denken, damit alle nachgelagerten Komponenten von dieser Optimierung profitieren können. Beginne mit der Partitionierung der Daten in den Quellthemen der Streaming-Plattform. Eine ausreichende Anzahl von Partitionen sorgt für eine effiziente Bereitstellung von Datenprodukten für die Verbraucher und für eine ausgewogene Verarbeitung in der Datenpipeline.

Derivate aus anderen Bereichen

Ableitungen, die als Datenprodukte aus anderen Bereichen stammen, müssen referenzierbar sein, damit ein vollständiges Stammbild erstellt werden kann. Dies könnte mehrere Domänen umfassen, die rekursiv durch das Netz durchlaufen werden. Die Aufbewahrung einer Momentaufnahme der aktuellen Abstammung eines Datenprodukts wird mit der Zeit veraltet sein, da sich die Qualität, die Skalierbarkeit und die Struktur des Datenprodukts weiterentwickelt haben, wie z. B. bei Schemata. In späteren Kapiteln werden wir die Data Governance und die Schema-Evolution als zentrale Komponente im Datennetz diskutieren. Techniken zur Erhaltung der Abstammung werden in Kapitel 6 näher erläutert.

Andere Datenprodukte aus anderen Bereichen zu konsumieren und die eigenen anzureichern, ist die wahre Essenz der Arbeit in einem Datennetz. Dazu musst du den Zugriff auf das Datenprodukt anfordern und dann den Echtzeit-Stream des Datenprodukts abonnieren. Danach sollte ein Thema in der Streaming-Plattform erscheinen, das das Echtzeit-Streaming-Datenprodukt aus einem anderen Bereich darstellt. Das Abonnement des Datenprodukts bezieht sich nicht nur auf die Daten, sondern auch auf die Metadaten. Diese Metadaten ermöglichen eine domänenübergreifende Abstammung.

Ingesting von Datenproduktderivaten mit Kafka Connect

Nachdem die Datenproduktderivate für unsere Streaming-Datenprodukte identifiziert hat, müssen wir sie auf eine Streaming-Plattform bringen. Der einfachste Weg, Daten in oder aus einer Kafka-kompatiblen Streaming-Plattform zu bekommen, ist die Nutzung von Kafka Konnektoren. (Andere Plattformen wie Spark oder Flink verfügen ebenfalls über eigene Konnektoren.) Kafka Connect ist ein Framework, das die Implementierung von Daten aus einer Datenquelle in eine Streaming-Plattform ermöglicht. Umgekehrt ermöglicht es auch das Schreiben von Daten in eine Datensenke der Streaming-Plattform. Siehe Abbildung 4-1.

Kafka Connect
Abbildung 4-1. Eine Datenpipeline, die zeigt, wie ein Kafka-Quellkonnektor in Kafka schreibt und ein Kafka-Sinkkonnektor aus Kafka liest

Kafka Connect bietet außerdem eine Low-Code-Erfahrung, was bedeutet, dass für Domain-Ingenieure keine Codierung erforderlich ist. Wir werden Kafka Connect verwenden, um die Quelldaten in die Streaming-Plattform einzuspeisen. Es gibt auch andere Möglichkeiten, Daten in eine Streaming-Plattform einzuspeisen, aber nur wenige unterstützen die change data capture (CDC)-Konnektoren, die wir brauchen. Wir werden über CDC in "Debezium Connectors" sprechen .

Das Open-Source-Framework Kafka Connect ermöglicht einen einfachen und skalierbaren Ingress und Egress von Daten. Das ist einer der Gründe, warum wir diese Lösung für unsere Diskussion ausgewählt haben. Die Konnektoren können sich mit bestimmten Datenquellen und einer Kafka-kompatiblen Streaming-Plattform wie Redpanda verbinden, um Daten zu streamen. Andere Produkte wie Apache Pulsar ermöglichen es Kafka-Clients, Nachrichten aus Kafka in ihrer Plattform zu produzieren und zu konsumieren.

Kafka-Konnektoren laufen nicht allein. Sie laufen in einem Kafka-Connect-Cluster, der es ihnen ermöglicht, verteilbar und hochverfügbar zu sein. Jeder Knoten im Connect-Cluster wird als Connect Worker bezeichnet. Jeder Connector enthält eine Konfigurationseigenschaft namens tasks.max. Tasks sind die Hauptprozesse, die Daten in einem Connector streamen. Wenn mehrere Tasks konfiguriert sind, kann der Connect-Cluster sie auf seine Worker verteilen, damit sie parallel laufen und so skalierbar sind. Wenn einer der Connect-Worker im Cluster ausfällt, werden die Daten auf die verbleibenden Worker umverteilt (siehe Abbildung 4-2). Die Eigenschaft task.max legt die maximale Anzahl der Aufgaben fest, die für diesen Connector in einem Connect-Cluster erstellt werden sollen. Der Konnektor kann weniger Aufgaben erstellen, wenn er diesen Grad der Parallelität nicht erreichen kann.

Kafka Connect Tasks
Abbildung 4-2. Ein einzelner Source-Connector, dessen drei Aufgaben auf drei Connect Worker in einem Kafka-Connect-Cluster verteilt sind; Connect Worker können für hohe Verfügbarkeit und Skalierbarkeit auf separaten Maschinen eingesetzt werden

Durch die Verwendung von Kafka Connect und den vielen vorgefertigten Konnektoren wird der Aufbau des Streaming Data Mesh viel schneller und überlässt die Entwicklung und den Support den Drittanbietern. Konnektoren für beliebte Datenbanken und Cloud-Datenspeicher wurden bereits erstellt. Viele von ihnen werden von den Anbietern selbst entwickelt und sind mit Supportplänen ausgestattet. Wenn ein Konnektor nicht verfügbar ist, sollte das zentrale Data Mesh-Team in der Lage sein, einen solchen zu erstellen und ihn den Domänen im Data Mesh zur Verfügung zu stellen.

Ein weiterer Grund, warum Kafka Connect eine gute Lösung ist, besteht darin, dass es die Ingestion standardisiert und damit die Bereitstellung und Verwaltung von Konnektoren vereinfacht. Der Kafka-Connect-Cluster bietet eine RESTful-Schnittstelle und lässt sich leicht in eine CI/CD-Pipeline wie Jenkins, Kubernetes, Ansible oder Terraform integrieren.

Kafka Connect verfügt über einige Transformationsfunktionen, die single message transforms (SMTs) genannt werden, aber diese sind auf einfache Transformationen beschränkt. Das heißt, keine Joins oder Aggregationen. Die Datenumwandlung wird am besten in den Streaming-Prozessen konsolidiert, wo sowohl einfache als auch komplexe Umwandlungen unterstützt werden. Transformationen werden in "Umwandlung von Datenderivaten in Datenprodukte" behandelt .

Tipp

Es ist die bewährte Methode, Transformationen in einem Stream-Prozessor durchzuführen, der Zustände speichern kann, und nicht in den Konnektoren, die keine Zustände speichern können. Außerdem könnte es schwierig sein, Transformationen in einem Konnektor zu erfassen, um den Verlauf zu erhalten. Stream-Prozessoren, die gerichtete azyklische Graphen (DAGs) erstellen, die den Verarbeitungsprozess darstellen, können ihre DAGs hingegen serialisieren, um den Verlauf zu erhalten.

Beim Ingesting der Datenprodukte müssen wir uns frühzeitig Gedanken über das spätere Streaming-Datenprodukt machen und darüber, wie wir seine einfache Nutzung sicherstellen können. Denk auch daran, dass die Frage, ob die Daten asynchron oder synchron eingespeist werden, einen Einfluss darauf hat, wie die Domäne dein Streaming-Datenprodukt konsumieren und nutzen wird.

Verbrauchbarkeit

Konsumierbarkeit ist eine sehr wichtige Anforderung, da sie sich direkt auf die Erfahrungen auswirkt, die die Verbraucher in einem Streaming-Datennetz machen werden. Wenn andere Bereiche Streaming-Datenprodukte nicht einfach konsumieren können, entscheiden sie sich möglicherweise gegen das Streaming-Datennetz und bauen ihre eigenen Integrationen von Hand auf, um Probleme mit dem Datennetz zu umgehen. Einige Faktoren, die beim Ingesting von Datenderivaten zu beachten sind und die sich auf die Nutzbarkeit durch andere Bereiche auswirken, sind folgende

  • Mangelnde Skalierbarkeit

  • Mangel an Interoperabilität

Skalierbarkeit

Wenn über das Ingesting von Datenderivaten in die Streaming-Plattform nachdenkt, ist es wichtig zu wissen, in welchem Umfang du die Daten ingestieren musst. In der Streaming-Plattform musst du sicherstellen, dass die Anzahl der Partitionen dem erwarteten Datendurchsatz (oder der Rate) entspricht, mit der die Daten gestreamt werden sollen. Je mehr Partitionen ein Kafka-Topic hat, desto schneller kann es die Daten streamen. Durch Partitionen ermöglicht Kafka die Parallelisierung. Bei anderen Streaming-Plattformen musst du die entsprechenden Topics auf ähnliche Weise konfigurieren.

Eine grobe Formel, die von Jun Rao (einem ursprünglichen Entwickler von Apache Kafka) vorgeschlagen wurde, um die Partitionsanzahl zu bestimmen, basiert auf dem Durchsatz. Diese Formel ermittelt die Partitionsanzahl, indem sie den Höchstwert zwischen diesen beiden Werten ermittelt:

  • Der gewünschte Durchsatz(t) geteilt durch den Durchsatz der Datenproduktion in der Partition, die du erreichen kannst(p)

  • Der gewünschte Durchsatz(t) geteilt durch den Durchsatz, der Daten aus der Partition verbraucht, die du erreichen kannst(c)

Die Formel lautet wie folgt:

  • max(t/p, t/c)

Das folgende Beispiel zeigt den gewünschten Durchsatz von 3 MBps (Megabyte pro Sekunde). Der Produzent kann 1 MBps produzieren. Nehmen wir an, dass 3 Verbraucher die Daten abonnieren wollen, was 3 MBps zu je 1 MBps bedeutet. Das Ergebnis sind 3 Partitionen. In diesem Beispiel ist die Anzahl für die meisten Anwendungsfälle von Kafka tatsächlich gering. Vielleicht möchtest du dich auf zukünftige Durchsatzsteigerungen vorbereiten, indem du die Anzahl auf 5 oder 6 erhöhst:

  • max(3 MBps/1 MBps, 3 MBps/3 MBps) = max(3, 1)= 3 Partitionen

Andere Faktoren können helfen, den gewünschten Durchsatz zu erreichen, liegen aber außerhalb des Rahmens dieses Buches.

Nachdem die Anzahl der Partitionen bestimmt wurde, ist es wichtig zu verstehen, wie die Daten gleichmäßig auf alle Partitionen verteilt werden können, um eine ausgewogene Parallelität zu erreichen. In Apache Kafka werden die Daten als Schlüssel- und Wertepaar gesendet. Der Schlüssel wird gehasht, um zu bestimmen, welcher Partition er zugewiesen wird. Diese Formel veranschaulicht, wie das funktioniert:

  • Schlüssel % p = Partitionszuweisung

Dieser Hash-Algorithmus funktioniert gut, wenn der Schlüssel gleichmäßig über alle Datensätze in den an das Kafka-Topic gesendeten Daten verteilt ist. Mit anderen Worten: Der Schlüssel sollte eine hohe Kardinalität und eine gleichmäßige Verteilung im gesamten Datensatz haben. Eine hohe Kardinalität und eine gleichmäßige Verteilung schaffen eine gute Datenbalance zwischen allen Topic-Partitionen. Eine niedrige Kardinalität oder eine ungleichmäßige Verteilung schafft unausgewogene Parallelität. Eine unausgewogene Parallelität beim Ingestion erzeugt ein Ungleichgewicht für alle nachgelagerten Komponenten in der Datenpipeline. In den Kafka-Topics äußert sich dies als Hotspotting, bei dem nur eine oder wenige Partitionen die meiste Arbeit erledigen, was dazu führt, dass deine gesamte Datenpipeline langsam läuft. Es wäre von Vorteil, ein Profil deiner Daten zu erstellen, um ein Gefühl für die Kardinalität und die Verteilung der Schlüssel in den Daten zu bekommen. Die Definition von Schlüsseln mit hoher Kardinalität und gleichmäßiger Verteilung ist ein wichtiger Schritt bei der verteilten Verarbeitung, da die meisten verteilten Systeme die Arbeitslast nach Schlüsseln auf ihre Arbeiter/innen verteilen.

Eine weitere Möglichkeit, die Skalierbarkeit zu verbessern, ist die Verwendung eines geeigneten Daten-Serialisierungsformats, das wir im Folgenden besprechen.

Interoperabilität und Datenserialisierung

Die Skalierbarkeit und Interoperabilität von ist von entscheidender Bedeutung, wenn wir darüber sprechen, wie wir die Daten in eine Streaming-Plattform einspeisen. Interoperabilität ist die Fähigkeit, Informationen auszutauschen oder nahtlos mit anderen Systemen zusammenzuarbeiten. Bei einem Streaming-Datennetz kann dies durch die Erstellung von Schemata erreicht werden, die die Domänenobjekte oder -modelle definieren, und durch die Wahl eines geeigneten Daten-Serialisierungsformats. Die Schemata helfen den Domänen im Streaming Data Mesh, Informationen auszutauschen und nahtlos mit anderen Domänen zusammenzuarbeiten, und das Format der Datenserialisierung ermöglicht den Informationsaustausch zwischen Systemen, die normalerweise nicht miteinander kompatibel sind.

Interoperabilität und Datenserialisierung sind Anforderungen an das Datennetz, die unter die Säule Data Governance fallen, die in Kapitel 5 besprochen wird, daher gehen wir dort näher darauf ein. Es ist jedoch wichtig, dass wir bereits bei der Dateneingabe darüber nachdenken, denn dies wirkt sich auf alle nachgelagerten Systeme und Komponenten der Datenpipeline aus. Zunächst müssen wir das Schema definieren, das die Datenableitung darstellt. Ein Schema legt fest, wie die Daten strukturiert werden sollen, wenn sie die Streaming Data Pipeline durchlaufen. Nehmen wir zum Beispiel an, du nimmst globale COVID-19-Statistiken auf. Die Form deiner Daten könnte wie Tabelle 4-3 aussehen, die nur zwei Länder enthält. (Beachte, dass Tabelle 4-4 eine Fortsetzung der Daten in Tabelle 4-3 ist).

Tabelle 4-3. COVID-19-Spieldaten im Tabellenformat
Land CountryCode Datum ID

Vereinigte Staaten von Amerika

USA

2022-04-07T19:47:08.275Z

2291245c-5bf8-460f-af77-05ce78cc60c9

Philippinen

PH

2022-04-07T19:47:08.275Z

82aa04f7-05a1-4caa-b309-0e7cfbfae5ea

Tabelle 4-4. COVID-19-Spieldaten im Tabellenformat (Fortsetzung)
NewConfirmed NeueTodesfälle NewRecovered TotalConfirmed TotalTodesfälle TotalRecovered

40176

1241

0

80248986

983817

0

0

0

0

3680244

59422

0

Die COVID-19-Datenquelle, die wir lesen, wird in Form von JSON-Datensätzen bereitgestellt, also würden wir das Format der Datenserialisierung als JSON erstellen. In Beispiel 4-1 zeigen wir eine JSON-Schemadefinition, die der Struktur von Tabelle 4-3 entspricht.

Beispiel 4-1. JSON-Schema, das die Tabellenstruktur in Tabelle 4-3 definiert
{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": { 1
    "ID": {
      "type": "string"
    },
    "Country": {
      "type": "string"
    },
    "CountryCode": {
      "type": "string"
    },
    "NewConfirmed": {
      "type": "integer"
    },
    "TotalConfirmed": {
      "type": "integer"
    },
    "NewDeaths": {
      "type": "integer"
    },
    "TotalDeaths": {
      "type": "integer"
    },
    "NewRecovered": {
      "type": "integer"
    },
    "TotalRecovered": {
      "type": "integer"
    },
    "Date": {
      "type": "string"
    }
  },
  "required": [ 2
    "ID",
    "Country",
    "CountryCode",
    "NewConfirmed",
    "TotalConfirmed",
    "NewDeaths",
    "TotalDeaths",
    "NewRecovered",
    "TotalRecovered",
    "Date"
  ]
}
1

Listet die Felder und ihre Datentypen auf

2

Listet die Felder auf, die erforderlich sind

Tabelle 4-5 zeigt einige Daten-Serialisierungsformate, die für das Streaming von Daten geeignet sind. Es gibt noch viele andere Serialisierungsformate wie Parquet und ORC, die aber nicht für Streaming-Daten geeignet sind. Sie sind eher für ruhende Daten in Data Lakes geeignet.

Tabelle 4-5. Unterstützte Daten-Serialisierungsformate im Streaming
Name Maintainer Binär Für Menschen lesbar

JSON

Douglas Crockford

Nein

Ja

Apache Avro

Apache Software Foundation

Ja

Teilweise

Protokollpuffer (protobuf)

Google

Ja

Teilweise

Viele Daten-Serialisierungsformate wie Parquet und ORC helfen dabei, Abfragen in einem Data Lake effizienter zu gestalten. Andere, wie Avro und Protobuf, verbessern die Leistung in einem Service Mesh (Microservice Interkommunikation). JSON ist wahrscheinlich das am häufigsten verwendete Format zur Datenserialisierung, da es einfach zu handhaben und für Menschen lesbar ist.

Alle Optionen in Tabelle 4-5 können durch ein Schema definiert werden. Wie diese Schemata Verträge zwischen produzierenden und konsumierenden Domänen schaffen und welche Rolle sie bei der Unterstützung der Data Governance im Streaming Data Mesh spielen, werden wir in Kapitel 5 näher erläutern.

Synchrone Datenquellen

Auf haben wir am Anfang dieses Abschnitts darüber gesprochen, wie Domänen Streaming-Datenprodukte konsumieren - synchron oder asynchron. Beschreiben wir zunächst den synchronen Datenkonsum in Form von Client und Server, wobei der Client Daten anfordert und der Server die Daten bereitstellt.

Synchrone (auch als Batch bezeichnete) Datennutzung bedeutet, dass der Verbraucher (der Client) der Daten einer Anfrage-Antwort-Beziehung mit dem Server folgt. Der Kunde fordert Daten von einem Datendienst an, und der Dienst gibt schnell eine Momentaufnahme des Ergebnisses von der Quelle zurück.

Wenn diese Seiten abruft, werden die Ergebnisse mit Hilfe von Frameworks wie ODBC oder JDBC abgefragt, um eine Verbindung herzustellen und diese Batches von einer Datenquelle abzurufen. Dieser Ansatz zwingt dich dazu, Snapshots von Daten zu erfassen, die der Batch-Semantik folgen. Jeder Snapshot wird als Batch betrachtet.

Abbildung 4-3 zeigt den Client, der den ersten Snapshot der Daten aus der Datenbank anfordert. Wenn der Kunde einen weiteren Snapshot anfordert, musst du den zweiten vom ersten Snapshot subtrahieren, um nur die inkrementellen Änderungen zu finden. Oder du kannst den ersten Snapshot mit dem zweiten Snapshot überschreiben, aber dann gehen die Änderungen verloren. Wenn Datenbankänderungen eine Aktion auslösen sollen, musst du die inkrementellen Änderungen finden.

Paginating table
Abbildung 4-3. Ermitteln inkrementeller Änderungen aus einem Datenbank-Snapshot

Wenn zwischen den Client-Snapshots Änderungen auftreten, verpasst du außerdem Änderungen, die zu ihrem ursprünglichen Wert zurückkehren. In Tabelle 4-6 gehen die Ereignisse verloren, die von Robert zu Bob und dann wieder zurück zu Robert geändert wurden.

Tabelle 4-6. Zwischen Snapshots verlorene Änderungen
Zeit Veranstaltungen

12:00

Erster Schnappschuss

12:01

Name geändert von Robert zu Bob

12:03

Name wieder von Bob in Robert geändert

12:05

Zweiter Schnappschuss

Dieses Beispiel zeigt, wie synchrone Daten-APIs Clients, die Daten anfordern, zu einer Batching-Semantik zwingen, die zu einem gewissen Datenverlust führen kann, wie bei inkrementellen Snapshots. Einige ereignisgesteuerte Anwendungssysteme versuchen, synchrone APIs zu nutzen und sie an Echtzeit-Datenströme anzupassen. Das kommt häufig vor, weil viele Systeme keine Unterstützung für Streaming bieten, so dass wir einen Datenstrom emulieren müssen, der von diesen Datenquellen ausgeht.

Die Nutzer dieser Daten müssen wissen, dass die Ergebnisse mit Hilfe von Schnappschüssen erstellt werden und dass sie mit einem Datenverlust rechnen müssen.

Asynchrone Datenquellen und Änderungsdatenerfassung

Asynchrone Datenquellen verfolgen einen anderen Ansatz: Die Clients abonnieren Änderungen an den Daten, anstatt Snapshots zu erstellen. Bei jeder Änderung wird ein Eintrag in das Änderungsprotokoll gemacht, und die abonnierten Clients werden benachrichtigt und können auf die Änderung reagieren. Diese wird Change Data Capture(CDC) genannt.

Kafka-Konnektoren, die CDC unterstützen, lesen das Datenbank-Commit-Log und erfassen die Änderungen an einer Datenbanktabelle. Diese Änderungen sind Echtzeitströme von Datenbanktabellenänderungen, einschließlich Einfügungen, Aktualisierungen und Löschungen. Das bedeutet, dass du die in Tabelle 4-6 beschriebenen Änderungen nicht verlierst.

Tipp

Wenn möglich, nimm Datenderivate nach CDC und nicht nach Snapshots auf, um sicherzustellen, dass alle Transaktionen erfasst werden.

Um auf den "Data Divide" zurückzukommen : Das Ziel war es, Daten aus den operativen Datenbanken in die analytische Ebene zu verschieben. Diese operativen Datenbanken enthalten die Transaktionen, die das Geschäft antreiben. Der Verlust von Transaktionen zwischen Snapshots könnte in vielen Anwendungsfällen ein kritisches Problem darstellen. Am besten ist es, einen Kafka-Konnektor zu verwenden, der CDC aus der operativen Datenbank durchführen und an die Streaming-Plattform weiterleiten kann.

Debezium Steckverbinder

Eine Reihe von CDC-Konnektoren, die sogenannten Debezium-Konnektoren, erfassen Änderungen in einer Datenbank aus einem Änderungsprotokoll und senden sie an eine Streaming-Plattform. Die am häufigsten verwendeten Konnektoren sind die folgenden:

  • MySQL CDC

  • MongoDB

  • PostgreSQL

  • Oracle

  • SQL Server

  • IBM Db2

  • Apache Cassandra

  • Vitess

Für, die keine Kafka-Streaming-Plattformen sind, kann ein Debezium-Server als Alternative zum Kafka-Connect-Cluster verwendet werden. Abbildung 4-4 zeigt, wie der Debezium-Server den Kafka-Connect-Cluster ersetzen kann, um an andere Streaming-Plattformen zu senden. Änderungsereignisse können in verschiedene Formate wie JSON oder Apache Avro serialisiert und dann an eines der zahlreichen Messaging-Systeme gesendet werden.

Debezium Server
Abbildung 4-4. Ein Debezium-Server, der einen Kafka-Connect-Cluster ersetzt, um Konnektoren für alternative Streaming-Plattformen bereitzustellen: Redpanda und Apache Pulsar

Der Debezium-Server kann auch andere Streaming-Plattformen bedienen, die kein Commit-Log haben. Diese Plattformen erscheinen nicht in Abbildung 4-4. Es ist wichtig, sich daran zu erinnern, dass nur Streaming-Plattformen, die Daten in einem Commit-Log speichern, Kappa-Architekturen und letztendlich ein Streaming-Datennetz am besten unterstützen können, wie in Kapitel 2 erwähnt.

Wenn du dir frühzeitig Gedanken über die Konsumierbarkeit machst, ersparst du dir spätere Beschwerden der Verbraucher. Wenn du klarstellst, wie die Streaming-Datenprodukte abgerufen werden (synchron oder asynchron), wissen sie, was sie erwarten können.

Das Ergebnis der Dateneingabe in eine Streaming-Plattform ist ein Thema für jedes Datenderivat. Wir werden diese Derivate transformieren und anreichern, um das endgültige Streaming-Datenprodukt zu erstellen.

Umwandlung von Datenderivaten in Datenprodukte

Unter haben wir im vorherigen Abschnitt die Datenderivate aufgenommen, damit unsere Streaming-Datenprodukte konsumierbar sind, wobei wir uns auf Skalierbarkeit und Interoperabilität konzentriert haben. In diesem Abschnitt geht es um die Umwandlung von Datenderivaten, um sicherzustellen, dass unsere Streaming-Datenprodukte von hoher Qualität und Sicherheit sind.

Wir werden auch einige einfache Tools für die Umwandlung von Datenderivaten in Streaming-Datenprodukte vorstellen. Diese Tools nutzen SQL, um Daten umzuwandeln. Die Verwendung von SQL ist eine allgegenwärtige Fähigkeit für viele Ingenieure, auch für Fachbereichsingenieure. Es ist die bevorzugte Methode, um Fachingenieure in die Lage zu versetzen, Streaming-Data-Produkte zu erstellen.

Datenstandardisierung

ist eine bewährte Methode, um Formatstandards für den Austausch von Daten mit anderen Bereichen festzulegen. Diese Standards sind Teil einer Reihe von Data-Governance-Richtlinien, die für alle Bereiche gelten (wir werden Data-Governance in Kapitel 5 behandeln). Die Standardisierung von Daten schafft Konsistenz in allen Bereichen und macht die Daten für sie einfach nutzbar. Andernfalls müssen alle Bereiche wissen, wie sie verschiedene Formate für jeden Bereich umwandeln können, der sich nicht an den Standard hält. Bei der Umwandlung von Datenderivaten müssen wir die Daten also so umwandeln, dass sie diesen Standards entsprechen. So kann es zum Beispiel sein, dass die Data-Governance-Richtlinien vorschreiben, dass alle Telefonnummern in einem Standardformat wie 1-234-567-8900 vorliegen müssen. In den ursprünglichen Quelldaten sind die Nummern aber möglicherweise nicht in diesem Format vorhanden. Wir müssen sicherstellen, dass alle Formatierungsstandards auf die Daten angewendet werden, bevor sie als Streaming-Datenprodukt veröffentlicht werden.

Schutz sensibler Informationen

Du musst außerdem sicherstellen, dass sensible Daten durch Tokenisierung,Verschlüsselung oder Weglassen verschleiert werden. Zum Beispiel gelten geschützte Gesundheitsinformationen (PHI) und persönlich identifizierbare Informationen (PII) als sensible Daten. PHI- und PII-Daten unterliegen gesetzlichen Bestimmungen wie dem Health Insurance Portability and Accountability Act (HIPAA) und der General Data Protection Regulation (GDPR). Vorschriften, die über den HIPAA und die GDPR hinausgehen, werden in diesem Buch nicht behandelt.

Die HIPAA-Datenschutzrichtlinie schützt alle individuell identifizierbaren Gesundheitsdaten, die von einer betroffenen Einrichtung oder ihrem Geschäftspartner in jeglicher Form oder auf jeglichem Medium, sei es elektronisch, auf Papier oder mündlich, gespeichert oder übermittelt werden. In den Datenschutzbestimmungen werden diese Informationen als "geschützte Gesundheitsinformationen" (PHI) bezeichnet. Individuell identifizierbare Gesundheitsinformationen sind Informationen, einschließlich demografischer Daten, die sich auf einen der folgenden Punkte beziehen:1

  • Die vergangene, gegenwärtige oder zukünftige körperliche oder geistige Gesundheit oder Verfassung der Person

  • Die Bereitstellung von Gesundheitsversorgung für den Einzelnen

  • Die vergangene, gegenwärtige oder zukünftige Zahlung für die Bereitstellung von Gesundheitsleistungen für diePerson

Die DSGVO verpflichtet Unternehmen in der EU, die Daten ihrer Beschäftigten, Kunden und Drittanbieter zu schützen und zu sichern. Die Unternehmen sind nun gesetzlich verpflichtet, diese personenbezogenen Daten sicher aufzubewahren und zu schützen.2

Tabelle 4-7 zeigt einige Methoden zur Verschleierung von sensiblen Daten wie PHI und PII, um die Vorschriften zu deren Schutz einzuhalten.

Tabelle 4-7. Beispiele für die Verschleierung sensibler Daten
Methode Zweck

Tokenisierung

Ersetzt die Daten durch ein Token. Später kann das Token nachgeschlagen werden, um den ursprünglichen Wert zu erhalten, solange das System, das das Token nachschlägt, die Erlaubnis hat, es abzurufen. Oft wird bei der Tokenisierung von Daten das ursprüngliche Format beibehalten und ein Teil des Wertes offengelegt. Eine Kreditkartennummer kann zum Beispiel die letzten vier Ziffern enthalten: xxxx-xxxx-xxxx-1234

Verschlüsselung

Ersetzt den Wert durch einen verschlüsselten Wert. Der Wert kann mit einem Schlüssel entschlüsselt werden. Ein System kann die Entschlüsselung der Daten anfordern, um den ursprünglichen Wert zu erhalten, solange es den Schlüssel hat. Das Format wird bei dieser Methode nicht beibehalten. Hier ist ein Beispiel für eine Kreditkarte: 1234-5678-9012-3456 verschlüsselt zu MTIzNDEyMzQxMjM0MTIzNAo=

Filtern

Bei dieser Methode werden die sensiblen Informationen komplett weggelassen.

Die SQL-Sprache kann mit benutzerdefinierten Funktionen erweitert werden, um diese Verschleierungsmethoden zu ermöglichen, auf die wir im nächsten Abschnitt näher eingehen werden.

SQL

Wie bereits erwähnt hat, ist SQL die Sprache der Wahl für die Umwandlung von Streaming-Datenprodukten, weil sie für viele Fachleute zugänglich ist. Deshalb müssen wir eine Streaming-Data-Verarbeitungsplattform wählen, die SQL unterstützt. Zum Zeitpunkt der Erstellung dieses Artikels gibt es noch nicht viele Streaming Data Processing Engines. Wir werden uns zwei Optionen ansehen: einen SaaS-Stream-Prozessor und ksqlDB.

SaaS Stream Prozessor

Ein SaaS-Stream-Prozessor ist ein Cloud-SaaS-Produkt, das SQL verwendet, um Daten zu transformieren, die von einer Streaming-Plattform wie Kafka abgerufen werden. Sie werden in der Regel auf Apache Flink implementiert (das echtes natives Streaming bietet), im Gegensatz zum strukturierten Streaming von Apache Spark (das Mikrobatches mit niedriger Latenz bietet, die Streaming nur emulieren). Apache Flink verarbeitet Ereignisse in Echtzeit mit geringerer Latenz als Apache Spark Structured Streaming.

Apache Flink wird in den Benutzeroberflächen nicht angezeigt. Stattdessen wird dem Nutzer eine SQL-Schnittstelle angezeigt. Datenproduktingenieure können von einer Streaming-Plattform wie Kafka konsumieren, zustandsabhängige Transformationen durchführen und dann die Ausgabe in eine Senke oder eine andere Streaming-Plattform schreiben, wie in Abbildung 4-5 gezeigt.

Das Modell von Flink für die Stream-Verarbeitung umfasst eine Komponente namens connectors, die als Datenquelle und -senke fungiert (ähnlich wie Kafka-Konnektoren). Die Komponente stream repräsentiert Streaming-Daten, die Datenströme enthalten. Die Komponente pipeline schließlich kann Datenströme mithilfe von SQL zusammenführen und aggregieren, um neue Datenströme zu erstellen. Zusammen bilden diese Komponenten ein einfaches und benutzerfreundliches Datenverarbeitungstool zum Aufbau von Streaming Data Pipelines.

Debezium Server
Abbildung 4-5. SaaS-Stream-Prozessor, der Datenderivate zusammenführt/anreichert, von denen eines ein Datenprodukt aus einer anderen Domäne ist, und zurück ins Netz veröffentlicht

Apache Flink ermöglicht es den Domänen auch, Daten aus der produzierenden Domäne in ihre eigene zu replizieren und so das Mesh aufzubauen, wie in Abbildung 4-6 dargestellt.

Debezium Server
Abbildung 4-6. Apache Flink repliziert Daten von der produzierenden zur konsumierenden Domäne

Jede stream in Flink repräsentiert Streaming-Daten, die als neues Streaming-Datenprodukt behandelt und von vielen anderen Domänen konsumiert werden können.

Ein großer Vorteil von Flink ist, dass es von vielen Kafka-Clustern konsumieren kann. Es kann auch von alternativen Streaming-Plattformen wie Redpanda und Apache Pulsar konsumieren und sie in einer einzigen Streaming-Pipeline zusammenführen. Außerdem kann Flink Streaming-Plattformen (wie RabbitMQ oder Kinesis) mischen, die kein Commit-Log verwenden, und bietet so eine völlig anpassungsfähige Lösung für die Stream-Verarbeitung (siehe Abbildung 4-7).

Apache Flink kann auch Self-Services anbieten, um Streaming-Data-Pipelines einfach bereitzustellen und zu erstellen. Die Bereitstellung der Infrastruktur und das Schreiben von Code, für die spezielle Kenntnisse erforderlich sind, entfallen damit.

Agnostic
Abbildung 4-7. Eine als SQL implementierte Streaming-Pipeline, die mehrere Streaming-Plattformen verbinden kann, auch solche, die kein Commit-Log verwenden, um Event-Sourcing zu ermöglichen

ksqlDB

ksqlDB ist ein weiteres Stream-Processing-Tool, das eine SQL-Schnittstelle für den Aufbau von Datenpipelines auf Kafka bietet. Die SQL-ähnliche Sprache von ksqlDB abstrahiert die eigentliche Implementierung von Kafka-Streams. ksqlDB erstellt aus Kafka-Themen tabellenartige Strukturen. Es kann Daten zwischen Kafka-Themen verbinden und aggregieren und bietet Funktionen für die Kappa-Architektur.

ksqlDB folgt dem anerkannten SQL-92-Standard, der dritten Revision des Standards. Dieser wird vom American National Standards Institute (ANSI) definiert, das für die Pflege dieses Standards für SQL-Spezifikationen verantwortlich ist.

Ein ksqlDB-Einsatz ist auf einen einzigen Kafka-Cluster beschränkt. Er kann nicht mehrere Streaming-Plattformen kombinieren. Es bietet jedoch eine Möglichkeit, Konnektoren bereitzustellen. Sie ermöglicht es Domain-Ingenieuren, bei der Erstellung von Streaming-Data-Pipelines vollständig in einem einzigen Tool zu bleiben.

Bereitstellen von Konnektoren in ksqlDB

ksqlDB führt nicht nur zustandsbehaftete Transformationen durch, sondern hat auch die einzigartige Fähigkeit, Kafka-Konnektoren für den Ingress und Egress von Daten zu erstellen. Domain-Ingenieure müssen nicht länger ein Tool verlassen, um Daten zu importieren und zu exportieren. Über eine einzige ksqlDB-Befehlszeilenschnittstelle (CLI) können Domäneningenieure eine gesamte Datenpipeline von den Quellen bis zu den veröffentlichten Datenprodukten erstellen, die vollständig dem Kappa-Architekturmuster entspricht.

Beispiel 4-2 zeigt, wie du einen Kafka-Quellkonnektor erstellst. In diesem Beispiel wird ein Debezium CDC-Quellkonnektor eine Verbindung zu einer MySQL-Datenbank herstellen. Der Konnektor liest alle Operationen in der Reihenfolge, in der sie in die Datenbank übertragen werden: Einfügungen, Aktualisierungen und Löschungen.

Beispiel 4-2. ksqlDB-Anweisung, die einen Kafka-Quellkonnektor erstellt, um Daten in Kafka zu bringen; aus dem resultierenden Topic könnte dann ein Stream/eine Tabelle erstellt werden
/* creates a connector that reads from MySQL */
CREATE SOURCE CONNECTOR app_data WITH ( 1
    'connector.class': 'io.debezium.connector.mysql.MySqlConnector', 2
    'tasks.max': '1',  3
    'database.hostname': 'mysql',  4
    'database.port': '3306',
    'database.user': 'debezium',
    'database.password': 'dbz',
    'database.server.id': '184054',
    'database.server.name': 'dbserver1',
    'database.include.list': 'inventory',
    'database.history.kafka.bootstrap.servers': 'kafka:9092',  5
    'database.history.kafka.topic': 'schema-changes.inventory'
);
1

SOURCE Schlüsselwort für Quellkonnektoren.

2

io.debezium.connector.mysql.MySqlConnector ist der Klassenname des Verbinders. Wenn du deinen eigenen Konnektor erstellst, wird sein Klassenname hier eingetragen.

3

Die Anzahl der Aufgaben, die der Connector erstellen wird.

4

Der Hostname der Datenbank.

5

Der Kafka-Bootstrap-Server.

Ähnlich kannst du in Beispiel 4-3 einen Sink-Connector erstellen, um Daten aus der Streaming-Plattform zu holen. In diesem Fall erstellt die Anweisung einen Sink-Connector, der Daten aus einem Kafka-Topic in eine Amazon S3-Senke schreibt.

Beispiel 4-3. ksqlDB-Anweisung zum Erstellen eines Kafka-Sink-Connectors, der aus einem Topic liest und in ein Ziel schreibt
/* creates a connector that writes to a data lake */
CREATE SINK CONNECTOR training WITH ( 1
    'connector.class': 'S3_SINK', 2
    'tasks.max': '1',
    'aws.access.key.id': '$AWS_ACCESS_KEY_ID', 3
    'aws.secret.access.key': '$AWS_SECRET_ACCESS_KEY', 4
    's3.bucket.name': '$S3_BUCKET', 5
    'time.interval' : 'HOURLY', 6
    'data.format': 'BYTES',
    'topics': '$KAFKA_TOPIC_NAME_OUT2'
);
1

SINK Schlüsselwort für Senkverbinder.

2

S3_SINK ist ein Alias für den Klassennamen des S3-Sink-Connectors.

3

Der AWS-Zugangsschlüssel.

4

Das AWS-Geheimnis.

5

Der S3-Eimer.

6

Legt fest, wie deine Nachrichten im S3-Bucket gruppiert werden. Gültige Einträge sind DAILY oder HOURLY.

Du kannst Transformationen zwischen den Source- und Sink-Anweisungen definieren, um eine Streaming-Datenpipeline zu erstellen. Du kannst die SQL-Anweisungen auch in einer Datei speichern, damit sie auf einmal ausgeführt werden können. Auf diese Weise kannst du deine Streaming Data Pipeline von der Entwicklung über das Staging bis hin zur Produktion vorantreiben.

Benutzerdefinierte Funktionen in ksqlDB

In ksqlDB bist du bei der Definition von Streaming-Datentransformationen auf die SQL-Sprache beschränkt. Sie hat nicht die Fähigkeit, komplexe Logik darzustellen, wie es imperative Programmiersprachen wie C++, Java, Python usw. können. Um eine komplexere Logik auszuführen, könnte eine benutzerdefinierte Funktion (UDF) in Java geschrieben werden, die die komplexe Logik enthält, die in SQL nicht dargestellt werden kann. Diese UDF kann dann in ksqlDB aufgerufen werden, ohne dass die SQL-Grammatik, die ksqlDB verwendet, verletzt wird.

Beispiel 4-4 zeigt ein Beispiel für eine ksqlDB UDF, die zwei Zahlen miteinander multipliziert. Die auf den Java-Quelltext angewendeten Annotationen ermöglichen es den ksqlDB-Klassenladern, diese UDF als verfügbare Funktion zu registrieren und zu verwenden.

Beispiel 4-4. Eine ksqlDB benutzerdefinierte Funktion, die von ksqlDB geladen wird und als Funktion in einer ksqlDB-Anweisung verwendet werden kann
package com.example;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

import java.util.Map;

@UdfDescription(name = "Mul", 1
                author = "example user",
                version = "1.0.2",
                description = "Multiplies 2 numbers together")
public class MulUdf {

    @Udf(description = "Multiplies 2 integers together.") 2
    public long formula(@UdfParameter int v1, @UdfParameter int v2) { 3
        return (v1 * v2);
    }

    @Udf(description = "Multiplies 2 doubles together")
    public long formula(@UdfParameter double v1, @UdfParameter double v2) {
        return ((int) (Math.ceil(v1) * Math.ceil(v2)));
    }

}
1

@UdfDescription liefert eine Beschreibung der Klasse.

2

@Udf identifiziert den UDF.

3

@UdfParameter identifiziert die Parameter in der Funktion.

Diese Funktion ermöglicht das Überladen von Funktionen, um mehrere Datentypen zu berücksichtigen. Beispiel 4-5 zeigt, wie das gemacht werden kann.

Beispiel 4-5. Ein Beispiel für die Verwendung der UDF in einer ksqlDB-Anweisung
select formula(col1, col2) as product 1
from mytable
emit changes; 2
1

Aufrufen der Funktion formula.

2

Gibt eine Push-Abfrage mit einer kontinuierlichen Ausgabe an. Push-Abfragen sind unbegrenzt, d. h., sie enden nicht, weil die Ergebnisse kontinuierlich an den Verbraucher weitergeleitet werden. Umgekehrt sind Pull-Abfragen begrenzt oder enden irgendwann.

Hinweis

Die Erstellung von UDFs in Java ist für Fachbereichsingenieure möglicherweise nicht möglich. In diesem Fall sollte das zentrale Data-Mesh-Team, das über die notwendigen Fähigkeiten verfügt, die UDFs programmieren. Die Dateningenieure, die bisher die Datenpipelines im Data Lake entwickelt und gewartet haben, werden nun Teil des zentralen Data-Mesh-Teams.

In diesem Abschnitt haben wir zwei Lösungen für die Umwandlung von Datenderivaten vorgestellt. Beide verwenden SQL, um diese Umwandlungen durchzuführen. Wir transformieren Daten, um sicherzustellen, dass unsere Streaming-Datenprodukte von hoher Qualität und vertrauenswürdig sind. Ein weiterer Grund für die Umwandlung von Daten ist es, dem Datenprodukt mehr Informationen hinzuzufügen, damit es für die Nutzerinnen und Nutzer der Domäne nützlicher ist. Im nächsten Abschnitt werden wir darüber sprechen, wie wir Daten mit Data-Warehousing-Konzepten anreichern können.

Extrahieren, Transformieren und Laden

Sprechen wir nun über die Umwandlung von Daten in einem Extraktions-, Transformations- und Ladeprozess(ETL). Traditionell werden bei diesem Prozess Daten aus einer Datenquelle extrahiert, in ein Format umgewandelt, das von den Verbrauchern genutzt werden kann, und dann in ein System geladen, in dem diese Verbraucher die Daten lesen können (z. B. ein Data Lake oder DataWarehouse). ETL ist ein Muster, das typischerweise verwendet wird, um Daten aus mehreren unterschiedlichen Systemen in einem einzigen zentralen System wie einem Data Warehouse zusammenzuführen (siehe Abbildung 4-8).

ETL Warehouse
Abbildung 4-8. Eine ETL-Datenpipeline, die Daten aus verschiedenen Quellen extrahiert, in einen Data Lake schreibt, um sie dort bereitzustellen und umzuwandeln, und sie dann in ein Data Warehouse lädt, um sie zu nutzen

Der Staging-Bereich in Abbildung 4-8 ist der Ort, an dem die Transformationen durchgeführt werden. Der Staging-Bereich ist in der Regel ein Data Lake, weil er eine große Menge an Daten aufnehmen kann. Die Transformation wird von Massively Parallel Processing (MPP)-Anwendungen und Batch-Jobs ausgeführt, die die Daten dann an ein Data Warehouse senden.

Die operative Datenbank ist die Informationsquelle für das Data Warehouse. Sie enthält detaillierte Informationen, die für das Tagesgeschäft des Unternehmens benötigt werden. Die Daten in der operativen Datenbank ändern sich häufig, da Aktualisierungen vorgenommen werden, um den aktuellen Wert der letzten Geschäftsvorgänge widerzuspiegeln. Um dies zu ermöglichen, werden OLTP-Datenbanken (Online Transaction Processing) verwendet, um die sich ändernden Daten in Echtzeit zu verwalten.

Data-Warehouse-Systeme dienen als Datenspeicher, der es Nutzern und Entscheidungsträgern ermöglicht, geschäftliche Entscheidungen zu treffen. Diese Systeme können Informationen in bestimmten Formaten organisieren und präsentieren, um den unterschiedlichen Bedürfnissen der verschiedenen Nutzer/innen gerecht zu werden. Diese Systeme werden oft auch als OLAP-Systeme (Online Analytical Processing) bezeichnet.

Operative Datenbanken und Data Warehouses sind beides relationale Datenbanken, die jedoch unterschiedlichen Zwecken dienen. Operative Datenbanksysteme verarbeiten sowohl operative als auch transaktionale Daten. Operative Daten sind die Daten, die in den Abläufen eines bestimmten Systems oder Geschäftsbereichs enthalten sind. Im Einzelhandel oder E-Tail zum Beispiel verwaltet eine operative Datenbank diskrete Käufe von Artikeln in Echtzeit und behält den Überblick über den Artikelbestand. Ein Data Warehouse hingegen speichert die historischen Aufzeichnungen von Transaktionen, die über einen langen Zeitraum hinweg stattfinden. Wenn z. B. ein großer Online-Händler die Leistung einer bestimmten Marke oder eines bestimmten Artikels über die letzten 10 Jahre verfolgen möchte, kann er dies durch Abfragen des Data Warehouse erreichen. Dies kann nützliche Informationen wie die Leistung von Werbekampagnen liefern, saisonale Trends im Kaufverhalten aufzeigen oder sogar helfen, den Marktanteil einer Marke insgesamt besser zu verstehen.

Der Hauptunterschied zwischen einer operativen Datenbank und einem Data Warehouse besteht darin, dass eine operative Datenbank flüchtig ist und ständig aktualisiert wird, während das Data Warehouse eine historische Momentaufnahme des aktuellen Stands einer Transaktion darstellt.

Nicht alle Daten in einem Data Warehouse ändern sich im gleichen Rhythmus. Um beim Beispiel des Einzelhandels zu bleiben: Der Gesamtwarenkorb eines Kunden kann sich viele Male ändern, und der Status des Warenkorbs ändert sich mit jedem hinzugefügten oder entfernten Artikel, bis der Kunde sich entscheidet, zur Kasse zu gehen und den Kauf abzuschließen. Produkteigenschaften wie Marke, Größe, Farbe, Geschmack oder Gewicht ändern sich viel langsamer als eine Reihe von Verkaufstransaktionen. Informationen über den Kunden wie Standort, Alter, Interessen und andere demografische und firmenbezogene Daten ändern sich möglicherweise überhaupt nicht oder nur dann, wenn der Kunde uns eine solche Änderung mitteilt oder wenn wir von einer Drittquelle einen Feed mit abgestimmten demografischen Daten erhalten.

Pflege von Data-Warehouse-Konzepten

Auch wenn eine Data-Mesh-Architektur die analytische Ebene dekomponieren und dezentralisieren soll, sollten die Konzepte, die diese Systeme erfolgreich machen, nicht verloren gehen oder beeinträchtigt werden. Das Konzept eines Sternschemas (ein Modell, das einem Stern ähnelt, indem es Fakten von dimensionalen Daten trennt), die Art und Weise, wie Transformationen definiert werden, und die Struktur der Daten machen das Datenmodell einfach zu verstehen und umzusetzen. Dieselben Konzepte können auch außerhalb des Data Warehouse verwendet werden, um Streaming-ETL-Datenpipelines zu entwerfen und besser nutzbare Datenprodukte zu erstellen.

Wie wir in Kapitel 1 eingeführt haben, sind die Daten in einem Unternehmen in eine operative und eine analytische Ebene unterteilt. Bereiche, die in der analytischen Ebene angesiedelt sind, unterscheiden sich stark von Bereichen, die in der operativen Ebene angesiedelt sind. Bei den Daten, die als Datenprodukte aus der operativen Ebene veröffentlicht werden, handelt es sich in der Regel um unveränderliche, mit Zeitstempeln versehene Schnappschüsse der operativen Daten im Zeitverlauf. Historisch gesehen entwickeln sich Änderungen an Datenprodukten in der analytischen Ebene langsamer als ihre operativen Gegenstücke. Aus diesem Grund ist eine Datendomäne in der analytischen Ebene für die effiziente Bereitstellung und den Zugang zu großen Datenbeständen für die Verbraucher verantwortlich. Datendomänen bieten der Außenwelt eine Sicht auf die Daten - Sichtweisen, die als Standards für den Datenzugriff veröffentlicht werden. Hinter den Kulissen ablaufende Prozesse, wie z. B. ETL, die zur Erstellung der Domäne verwendet werden, sind für nachgelagerte Verbraucher nicht sichtbar.

Data Warehousing Grundlagen

Im heutigen Geschäftsklima benötigen Unternehmen zuverlässige Berichte und Analysen großer Datenmengen. Die Daten müssen für verschiedene Berichtszwecke konsolidiert und auf verschiedenen Ebenen integriert werden, ohne die betrieblichen Systeme des Unternehmens zu beeinträchtigen. Das Data Warehouse macht dies möglich, indem es einen Speicher für die elektronisch gespeicherten Daten eines Unternehmens schafft, die aus den operativen Systemen extrahiert wurden, und sie für Ad-hoc-Abfragen und geplante Berichte durch ETL-Prozesse verfügbar macht.

Es gibt viele Ansätze zum Aufbau eines Data Warehouses, jeder mit seinen eigenen Vor- und Nachteilen. Dieses Buch konzentriert sich auf den Star-Schema-Ansatz für Data Warehousing und seine Anwendungen auf Data Mesh und Streaming Data Mesh. Wir kennen zwar andere Data-Warehouse-Ansätze und ihre mögliche Anwendung auf Data Mesh, wie z. B. Data Vault 2.0, aber dieses Buch geht nicht näher auf diese Ansätze ein.

Das Sternschema ist die einfachste Form eines Dimensionsmodells, das in Business Intelligence und Data Warehousing verwendet wird. Das Sternschema besteht aus einer oder mehreren Faktentabellen, die auf eine beliebige Anzahl von Dimensionstabellen verweisen. Wie der Name schon sagt, ist das physische Modell des Sternschemas sternförmig, mit einer Faktentabelle in der Mitte und Dimensionstabellen drum herum, die die Spitzen des Sterns darstellen.

Eine Faktentabelle enthält alle Primärschlüssel der einzelnen Dimensionen sowie Fakten oder Kennzahlen, die mit diesen Dimensionen verbunden sind. Dimensionstabellen bieten beschreibende Informationen für alle in der Faktentabelle erfassten Messungen. Da Dimensionen informativ sind und sich viel langsamer ändern als Faktentabellen, sind Dimensionen im Vergleich zur Faktentabelle relativ klein. Häufig verwendete Dimensionen sind Personen, Produkte, Ort (oder Geografie) und vor allem die Zeit (siehe Abbildung 4-9).

Die Trennung von Fakten- und Dimensionsdaten ist extrem wichtig, um ein Data Warehouse zu skalieren. Diese Trennung ermöglicht es, dass sich Attribute zu Fakten im Laufe der Zeit ändern, ohne dass die gesamte Faktentabelle neu eingegeben werden muss. Nehmen wir zum Beispiel an, wir verfolgen die Verkäufe eines Produkts und der Eigentümer einer Marke ändert sich im Laufe der Zeit. In einem Einzeltabellenmodell (Modelle, die Fakten- und Dimensionsdaten in einer einzigen Tabelle zusammenfassen) wären teure Aktualisierungen oder Trunkierungen/Ladungen erforderlich, um die gesamte Tabelle zu aktualisieren, damit sie den richtigen Markeneigentümer wiedergibt. In einem Dimensionsmodell muss nur das Attribut "Markeninhaber" der Produktdimension geändert werden. Diese Änderung wird dann ohne viel Aufhebens in nachgelagerten Berichts- und Analyseanwendungen berücksichtigt.

Simple Star Schema
Abbildung 4-9. Ein einfaches Sternschema ohne langsam wechselnde Dimensionen

Bei der Umwandlung operativer Daten in das Sternschema muss sorgfältig abgewogen werden, ob es sich um Fakten- oder um Dimensionsdaten handelt. Überlegungen zum Design der Datenbank haben direkte Auswirkungen auf die ETL, die das Data Warehouse speist. Daten können beispielsweise auf Transaktionsebene in ein Data Warehouse eingefügt werden, oder sie können im Batch importiert, umgewandelt und eingefügt werden. Alle diese Ansätze erfordern ETL und eine angemessene Denormalisierung, um die Anforderungen des Data Warehouse zu erfüllen - die Fähigkeit, Daten schnell abzufragen und Datenanforderungen zu erfüllen.

Dimensionsdaten versus Faktendaten in einem Streaming-Kontext

Sowohl Fakten- als auch Dimensionsdaten ändern ihren Zustand im Laufe der Zeit, allerdings in unterschiedlichem Tempo. Um das Verhalten eines Kunden während eines Online-Einkaufs vollständig zu verstehen, muss das Data Warehouse in der Lage sein, jede Interaktion des Kunden mit den Produkten zu verfolgen, vielleicht sogar zu verfolgen, was der Kunde in seinen Warenkorb gelegt hat, was er entfernt hat und in welcher Reihenfolge. Für ein herkömmliches Data-Warehouse-System stellt dies eine Herausforderung dar, da jede Batch-Insertion von Daten in das Warehouse nur den Zeitpunkt berücksichtigt, an dem ein Snapshot gemacht wurde. Mit der Einführung der Kappa-Architektur und der Verwendung von Tiered Storage wird die Bereitstellung von Daten in Echtzeit jedoch einfacher und einfacher. Einblicke können nun nahezu in Echtzeit geliefert werden, und das Data Warehouse kann sich dies bei der Datenaufnahme zunutze machen.

Wie bereits erwähnt, ändern sich dimensionale Daten auch im Laufe der Zeit. Dies ist ein wichtiges Thema, das beim Aufbau eines Sternschema-Datenmodells oft übersehen wird. Diese so genannten sich langsam verändernden Dimensionen erfordern ebenfalls Aufmerksamkeit, denn für genaue Analysen ist es wichtig, Kunden, Produkte und Standorte so zu sehen, wie sie zum Zeitpunkt einer Transaktion oder einer Reihe von Transaktionen bekannt waren. Es ist wichtig, die Eigenschaften eines bestimmten Produkts heute zu kennen, aber es ist auch wichtig zu wissen, was dieses Produkt vor sechs Monaten oder sogar vor einem Jahr war. Das demografische Profil eines Kunden zum Zeitpunkt des Kaufs zu kennen, ist wichtig, um zu verstehen und vorherzusagen, wie sich andere Kunden verhalten könnten.

Auch langsam wechselnde Dimensionen haben unter der historischen Art der Data Warehousing-Ingestion gelitten, die wir in der Vergangenheit gesehen haben. Die Kappa-Architektur vereinfacht die Definition von sich langsam verändernden Dimensionen. Anstatt Informationen über die Dimensionen in bestimmten Intervallen zu erhalten, muss man nur einen bestimmten Zeitpunkt in einem Datenstrom betrachten und seine Merkmale bestimmen, um zu verstehen, wie eine Dimension zu einem bestimmten Zeitpunkt aussieht. In einem Streaming Data Mesh werden die Dimensionen eines Datenmodells zu Datenprodukten, zusammen mit den Faktdaten. Dies ermöglicht es dem Datenproduktentwickler, eine standardisierte Schnittstelle zu den Dimensionsdaten zu veröffentlichen, die eine zeitpunktbezogene Suche ermöglicht. Anstatt Ansichten auf der Grundlage eines SCD-Typ-6-Setups zu erstellen und diese Ansicht abzufragen, wird die Geschäftslogik zur Erstellung von zeitpunktbezogenen Abfragen nun im Datenprodukt selbst gekapselt.

Materialisierte Ansichten in Streams

In sind materialisierte Ansichten in ihrer einfachsten Form vorverarbeitete Abfrageergebnisse, die auf der Festplatte gespeichert werden. Die Idee ist, dass die Vorverarbeitung der Abfrage immer läuft, so dass ein Nutzer jederzeit die materialisierte Ansicht abfragen kann und die neuesten Ergebnisse erhält. Im Gegensatz dazu ist eine traditionelle Ansicht eine Abfrage, die nicht vorverarbeitet ist und zum Zeitpunkt der Abfrage der Ansicht ausgeführt wird. Die Ergebnisse einer traditionellen Ansicht werden nicht auf der Festplatte gespeichert.

Sowohl die materialisierte als auch die traditionelle Ansicht liefern dasselbe Ergebnis, nur dass die materialisierte Ansicht schneller läuft, da die Ergebnisse bereits vorberechnet sind, während die traditionelle Ansicht die Ansicht erst verarbeiten muss, bevor sie das Ergebnis liefert. Da die Vorverarbeitung in einer materialisierten Ansicht im Hintergrund stattfindet, hat sie asynchrone Eigenschaften. Da eine herkömmliche Ansicht die Abfrage nur auf Anfrage verarbeitet und das Ergebnis zurückgibt, ist sie synchron.

Lass uns diese Konzepte weiter ausbauen. In "Ingesting von Datenproduktderivaten mit Kafka Connect" haben wir die Unterschiede zwischen synchronen und asynchronen Datenquellen besprochen. Der Hauptunterschied besteht darin, dass synchrone Datenquellen der Batching-Semantik folgen, während asynchrone Datenquellen der Streaming-Semantik folgen.

Diese Beschreibung der materialisierten Ansichten wurde im Zusammenhang mit einer einzelnen Datenbank erklärt. Materialisierte Ansichten gibt es aber nicht nur in einer einzigen Datenbank. Die Semantik der materialisierten Ansicht zur Vorverarbeitung von Daten existiert, wenn Daten von einer aktiven Instanz einer Datenbank in eine passive Instanz repliziert werden, wie in Abbildung 4-10 zu sehen ist.

In dieser Abbildung einer Disaster-Recovery-Lösung verwendet die Anwendung die "aktive" Datenbank und schlägt auf die "passive" Datenbank fehl, falls die aktive Datenbank abstürzt. Die Daten werden über das Write-Ahead-Log (WAL) übertragen und "materialisieren" sich in der passiven Datenbank. Jede Transaktion, die in der aktiven Datenbank stattgefunden hat, wird im WAL aufgezeichnet und in der passiven Datenbank im Hintergrund ausgeführt. Diese Replikation der Daten erfolgt also asynchron und ist ein Beispiel für eine materialisierte Ansicht, an der zwei Datenbanken beteiligt sind.

Database Replication
Abbildung 4-10. Datenbankreplikation mit einem Write-Ahead-Log, das eine materialisierte Ansicht in der passiven Datenbank erstellt

Der Debezium-Konnektor, den wir bereits besprochen haben, liest die WAL der von ihm unterstützten Datenbanken, um Änderungen zu erfassen, aber anstatt sie an eine andere Datenbank derselben Instanz zu senden, schickt er sie an Kafka (siehe Abbildung 4-11).

Database Replication
Abbildung 4-11. Datenbankreplikation unter Verwendung eines WAL, um Kafka mitCDC-Transaktionen zu füllen

Von diesem Punkt aus kannst du mehrere materialisierte Ansichten erstellen. Wie in Abbildung 4-12 kannst du eine materialisierte Ansicht in ksqlDB oder in einer anderen passiven Datenbank mit Flink erstellen.

Multiple materialized views.
Abbildung 4-12. Aufbau mehrerer materialisierter Ansichten aus einem einzigen Debezium CDC-Connector, der aus dem WAL liest

CDC Use Cases werden eigentlich für Modelle oder Entitäten verwendet, die das Ergebnis von DDD sind. Diese Entitäten ändern sich nicht oft, also ändern sie sich langsam. In ETL-Pipelines sind diese Entitäten die Dimensionsdaten, die zur Anreicherung von Faktendaten verwendet werden.

Streaming ETL mit domänengesteuertem Design

Unter können wir diese Informationen nun mit Streaming ETL verknüpfen. Zusammenfassend lässt sich sagen, dass die Dimensionsdaten von materialisierten Ansichten stammen, die wiederum von CDC-Streams stammen, die wiederum von den WALs der ursprünglichen Quelldatenbanken stammen. Wir haben jetzt eine vollständige Streaming-Datenpipeline für Dimensionsdaten, die zur Anreicherung von Faktendaten verwendet wird. Das ist auch unsere Lösung für einen vollständigen Streaming-ETL, bei dem sowohl die Dimensionsdaten als auch die Faktendaten durch Streams unterstützt werden.

Das ist das Ziel, das wir verfolgen: Streaming ETL in allen Bereichen zu ermöglichen. Um dies zu erreichen, brauchen wir zwei Arten von Datenprodukten: Dimensionsdaten und Faktendaten. Wenn wir sie zu Streaming-Datenprodukten machen würden, wären es unterschiedliche Arten von Streaming-Datenprodukten - ein CDC-Stream für Dimensionsdaten und ein reiner Append-Stream für Faktendaten. CDC-Streams enthalten nur Änderungen, die von den WALs in einer operativen (transaktionalen) Quelldatenbank erfasst wurden, während Append-Only-Streams Faktendaten enthalten.

In DDD definiert das Domänenmodell die Entitäten, ihre Beziehungen zueinander und ihre Interaktionen auf der Grundlage der Geschäftslogik. Faktdaten sind diese Interaktionsereignisse zwischen Entitäten, die an Zeit und Zustand gebunden sind. Dimensionsdaten sind die Erstellungs-, Aktualisierungs- und Löschereignisse in Bezug auf Entitäten und ihre Beziehungen untereinander.

Ein Beispiel: Ein Besucher einer Website ändert seinen Namen nicht oft, also ist dies eine sich langsam verändernde Dimension. Aber er klickt vielleicht viele Dinge auf der Website an, z. B. Artikel in den Warenkorb legen und wieder entfernen. Das sind Faktdaten, die schnell ankommen und mit der Zeit verbunden sind. Die Verknüpfung der Faktendaten mit den Dimensionsdaten ist ein ETL-Prozess, der die Faktendaten mit Dimensionsdaten anreichert, so dass die Analysten wissen, wer geklickt hat, und daraus ableiten können, warum, wann und wo der Nutzer geklickt hat, um sein Erlebnis zu verbessern. Die Modellschulung erfordert die Erfassung des Dimensionsstatus nicht aus Dimensionstabellen, sondern aus angereicherten Faktdaten (angereichert aus Dimensionen), damit der aktuelle Status einer Dimension zusammen mit der Zeit mit dem Klickereignis erfasst werden kann.

Datenprodukte mit AsyncAPI veröffentlichen

Wir haben die Anforderungen an das Streaming-Datenprodukt auf der Grundlage der von anderen Bereichen bereitgestellten Anforderungen definiert. Wir haben auch die Derivate des Datenprodukts identifiziert (und bei Bedarf andere Domänenprodukte abonniert). Dann haben wir die Daten extrahiert und umgewandelt, um ein neues Datenprodukt zu erstellen. Jetzt ist das Datenprodukt bereit für die Veröffentlichung im Datennetz. Wir werden weiterhin die AsyncAPI verwenden, um den Verbrauchspunkt des Datenproduktinhalts zu definieren.

AsyncAPI ist ein Open-Source-Projekt, das die Definition von Streaming-Datenquellen vereinfachen und standardisieren soll. AsyncAPI ist eine Schnittstellendefinitionssprache (IDL), mit der Anwendungen, die in einer Sprache geschrieben wurden, mit Anwendungen, die in anderen Sprachen geschrieben wurden, verbunden werden können. In diesem Fall ist die AsyncAPI eine IDL, die eine asynchrone API definiert. Sie ermöglicht es anderen Anwendungen, Integrationen mit dem Streaming-Datenprodukt zu erstellen, unabhängig von der Programmiersprache. Um ein Datenprodukt zu veröffentlichen, muss ein AsyncAPI-YAML-Dokument erstellt und beim Streaming Data Mesh registriert werden.

Registrierung des Streaming-Datenprodukts

Die AsyncAPI-Dokumente sind in YAML geschrieben, einem maschinenlesbaren Dokumentformat, das von einem Domain Engineer leicht bearbeitet werden kann, so dass es auch einigermaßen für Menschen lesbar ist. Wenn wir ein Datenprodukt registrieren, erstellen wir ein AsyncAPI YAML-Dokument und registrieren es in einem Streaming-Datenkatalog, auf den wir in Kapitel 5 näher eingehen werden. Der Streaming Data Catalog enthält zunächst alle Datenprodukte im Streaming Data Mesh, damit die Käufer von Datenprodukten an einem einzigen Ort nach Streaming Data Products suchen und sie abonnieren können.

AsyncAPI erweitert OpenAPI, das offiziell als Swagger bekannt ist (siehe Abbildung 4-13 für Details). OpenAPI ist ebenfalls eine YAML-basierte IDL, die synchrone APIs beschreibt. Heutzutage werden synchrone APIs in API-Gateways wie Kong und Apigee registriert, wo API-Käufer nach bestimmten APIs suchen können, die auf ihren Anwendungsfällen basieren. Das Ziel von AsyncAPI ist es, diesen einfachen Ansatz auch auf asynchrone Datenquellen anzuwenden. AsyncAPI bietet uns eine einfache Möglichkeit, Self-Services zu aktivieren, die für eine gute Erfahrung für alle Nutzer/innen des Streaming-Datennetzes notwendig sind.

Das AsyncAPI YAML-Dokument ermöglicht es uns, genau zu definieren, wie Anwendungen das Streaming Data-Produkt konsumieren können, damit wir Self-Services erstellen können, um nahtlose Integrationen zwischen Domänen zu schaffen. So entsteht schließlich das Netz im Streaming Data Mesh. Mit dem AsyncAPI YAML-Dokument können wir auch Suchvorgänge im Streaming Data-Katalog durchführen, die wir in Kapitel 5 behandeln werden.

Die YAML-Dokumente der AsyncAPI werden von Anwendungen geparst, um Client-Consumer-Code für Domains in beliebigen Programmiersprachen zu erzeugen. Diese Anwendungen können auch andere Dinge tun, z. B. HTML-Seiten erzeugen, die von einem Streaming-Datenkatalog geliefert werden können. Wir werden dies in Kapitel 5 demonstrieren. In Kapitel 6 zeigen wir, wie ein AsyncAPI-YAML-Dienst eine REST-API aufrufen kann, die einen Kafka-Connector bereitstellt, der aus dem Kafka-Topic liest und in Amazon S3 schreibt.

OpenAPI and AsyncAPI
Abbildung 4-13. Unterschiede zwischen OpenAPI und AsyncAPI

Erstellung eines AsyncAPI YAML-Dokuments

Das Erstellen des AsyncAPI YAML-Dokuments ist der letzte Schritt bei der Veröffentlichung eines Datenprodukts in einem Streaming Data Mesh. Beispiel 4-6 zeigt ein AsyncAPI YAML-Skelettdokument; wir haben die Details entfernt. Wir werden dieses YAML-Dokument mit den Metadaten füllen, die für die Definition eines Streaming-Datenprodukts benötigt werden, das im Datengitter veröffentlicht werden soll. In YAML werden alle Felder als Objekte bezeichnet. In Beispiel 4-6 werden zum Beispiel asyncapi, externalDocs, info usw. als Objekte bezeichnet, ebenso wie die Unterobjekte covid, messages, schemas usw. Wir werden sie als Objekte bezeichnen, wenn wir im Zusammenhang mit YAML sprechen.

Beispiel 4-6. Ein AsyncAPI YAML-Dokumentenskelett
asyncapi: '2.2.0'
externalDocs:
info:
tags:
servers:
defaultContentType:
channels:
  covid:
components:
  messages:
    covidapi:
  schemas:
    covidapi:
  securitySchemes:
    user-password:
      type: userPassword
  messageTraits:
  operationTraits:
Hinweis

Das AsyncAPI-Beispiel nutzt Confluent Cloud als Streaming-Plattform. Confluent Cloud bietet eine vollständig verwaltete Apache Kafka- und Schema-Registry, auf die wir in späteren Abschnitten dieses Kapitels näher eingehen werden.

Objekte asyncapi, externalDocs, info und tags

Erstellen wir nun ein AsyncAPI YAML-Dokument, das ein Streaming-Datenprodukt definiert, das COVID-19 globale Statistiken liefert. In Beispiel 4-7 füllen wir alle beschreibenden Informationen für das Datenprodukt aus. Dies umfasst die ersten vier Abschnitte des YAML-Dokuments in Beispiel 4-6.

Beispiel 4-7. AsyncAPI YAML Informationsabschnitte
asyncapi: '2.2.0' 1
externalDocs: 2
  description: The source of the COVID-19 global statistics that is provided
  as a real-time data stream.
  url: https://covid19api.com/
info: 3
  title: COVID-19 Global Statistics AsyncAPI Spec
  version: '0.0.1'
  description: |
    This AsyncAPI provides pub/sub information for clients to pub/sub COVID
    data to Kafka

  license:
    name: Apache 2.0
    url: https://www.apache.org/licenses/LICENSE-2.0
  contact:
    name: API Support
    url: http://www.asyncapi.com/support
    email: info@asyncapi.io
  x-twitter: '@AsyncAPISpec'

tags: 4
  - name: root-tag1
    externalDocs:
      description: External docs description 1
      url: https://www.asyncapi.com/
  - name: root-tag2
    description: Description 2
    externalDocs:
      url: "https://www.asyncapi.com/"
  - name: root-tag3
  - name: root-tag4
    description: Description 4
  - name: root-tag5
    externalDocs:
      url: "https://www.asyncapi.com/"
1

Die AsyncAPI Version 2.2.0.

2

externalDocs enthält eine Beschreibung des Datenprodukts und eine URL, unter der die Nutzer weitere Informationen über das Datenprodukt finden können.

3

Der Abschnitt info der YAML enthält zusätzliche Informationen über das Datenprodukt, einschließlich Versions- und Lizenzinformationen.

4

Der optionale Abschnitt tags enthält Hashtags, die sich auf das Datenprodukt beziehen können.

Beachte, dass wir in Beispiel 4-7 an mehreren Stellen URLs platzieren könnten, damit die Nutzerinnen und Nutzer, die einkaufen, weitere Nachforschungen über die Datenprodukte anstellen können, so dass es von Vorteil ist, sie hinzuzufügen, um alle zum Verständnis notwendigen Informationen bereitzustellen.

Die Tags in diesem Abschnitt können verwendet werden, um Datenprodukte miteinander zu verknüpfen. Dies wird noch nützlicher, wenn wir in Kapitel 5 über Wissensgraphen sprechen. Wissensgraphen ermöglichen den Aufbau semantischer, mehrdimensionaler Beziehungen zwischen Daten und Metadaten und machen die Daten für die Nutzer wertvoller.

Abschnitt Server und Sicherheit

In Beispiel 4-8 fügen wir der YAML einen wichtigen Block hinzu, der Verbindungs- und Sicherheitsinformationen enthält. In diesem Fall wird das Datenprodukt in einem Kafka-Topic veröffentlicht. Da die AsyncAPI jede Art von Streaming-Plattform unterstützt, muss sie mit protocol konfiguriert werden, damit die Parser verstehen, wie sie die Integration zwischen dem Streaming-Datenprodukt und der Komponente, die es abonnieren soll, aufbauen können.

Beispiel 4-8. Eine AsyncAPI, die ein Datenprodukt für das Datennetz definiert
servers: 1
  kafka-aws-useast2: 2
    url: https://kafka.us-east-2.aws.confluent.cloud:9092 3
    protocol: kafka 4
    description: Kafka cluster Confluent cloud AWS US-EAST-2
    security:
      - user-password: [] 5

defaultContentType: application/json 6
1

Liste der Server, die die Verbindung zum Datenprodukt definieren.

2

kafka-aws-useast2 ist eine benutzerdefinierte Eigenschaft, die einen bestimmten Kafka-Server und seine Verbindungs- und Sicherheitsanforderungen identifiziert.

3

Die URL, mit der du dich mit der Streaming-Plattform - in diesem Fall Apache Kafka - verbinden kannst.

4

protocol gibt die Art der Streaming-Plattform an, über die das Datenprodukt bereitgestellt wird. Dieses Feld informiert die Anwendung, die diese YAML liest, darüber, dass sie bestimmte Bibliotheken einbinden muss, um die Verbindung zur Streaming-Plattform zu ermöglichen - in diesem Fall Apache Kafka.

5

user-password verweist auf die securitySchema, die der Anwendung, die diese YAML liest, mitteilt, dass der zu verwendende Sicherheitsmechanismus SASL_SSL ist, der sicherstellt, dass die Kommunikation mit SASL/PLAIN verschlüsselt und authentifiziert wird.

6

Die Eigenschaft defaultContentType informiert die lesende Anwendung darüber, dass der Inhalt des Datenprodukts JSON ist. Alternative Typen könnten Apache Avro oder protobuf sein.

In Beispiel 4-8 haben wir mehrere Optionen für die Sicherheit. Einige davon sind wie folgt:

  • Benutzer und Passwort

  • Bescheinigungen

  • API-Schlüssel

  • OAuth 2

  • OpenID

Der Abschnitt security kann mehrere Sicherheitsoptionen enthalten, aber in diesem Fall gibt es nur user-password. Die meisten Sicherheitskonfigurationen verwenden Benutzer/Passwort oder Zertifikate. In der AsyncAPI erweitert die Sicherheit die OpenAPI um andere Sicherheitsmechanismen wie OAuth 2 und OpenID, die von Streaming-Plattformen unterstützt werden. Wir werden nicht auf jede Implementierung im Detail eingehen, weil das den Rahmen dieses Buches sprengen würde (das wäre ein ganz anderes Buch). Für die Zwecke dieses Buches werden wir user-password als Sicherheitsmechanismus verwenden. Später in diesem Kapitel werden wir zeigen, wie wir diese Sicherheitskonfiguration im Detail umsetzen.

Kanäle und Themenbereich

Beispiel 4-9 zeigt den Block channels in der AsyncAPI, in dem sich viele Details des Streaming-Datenprodukts befinden. Unter channels befindet sich eine weitere Ebene mit der Bezeichnung covid, die dem Topic-Namen in Apache Kafka entspricht, dem Topic, von dem aus das Streaming-Datenprodukt bereitgestellt wird. In diesem Fall handelt es sich bei dem Streaming-Datenprodukt wiederum um globale COVID-19-Statistiken.

Beispiel 4-9. channels Abschnitt
channels:
  covid: # topic name 1
    x-confluent-cloud-security: 2
      $ref: '#/components/securitySchemes/user-password'
    description: Publishes/Subscribes to the COVID topic for new statistics.

    subscribe: 3
      summary: Subscribe to global COVID-19 Statistics.
      description: |
        The schema that this service follows the https://api.covid19api.com/
      operationId: receiveNewCovidInfo
      tags: 4
        - name: covid19api
          externalDocs:
            description: COVID-19 API
            url: https://api.covid19api.com/
        - name: covid
          description: covid service
      traits: 5
        - $ref: '#/components/operationTraits/covid'
      message: 6
        $ref: '#/components/messages/covidapi'
1

Der Name des Kanals, der dem Namen des Kafka-Topics entspricht.

2

Die Eigenschaft Sicherheitsimplementierung. $ref ist ein Verweis auf einen anderen Teil der AsyncAPI, der die Sicherheitsimplementierung genauer definiert (siehe Abschnitt "Sicherheitsschemata").

3

Gibt an, wie der Kunde das Thema covid abonnieren wird. Dieser Abschnitt ist für Abonnenten, nicht für Produzenten.

4

tags ermöglicht den Aufbau von mehr Beziehungen im Wissensgraphen des Streaming-Datenkatalogs. Wir stellen mehr Metadaten über das Streaming-Datenprodukt bereit und verbessern die Durchsuchbarkeit.

5

traits enthält weitere Informationen für die Selbstkonfiguration des Clients. In diesem Fall verweist die AsyncAPI auf einen anderen Teil des Dokuments, der weitere Informationen darüber enthält, wie sich der Client-Abonnent/Verbraucher selbst konfigurieren muss. Wir werden diese Details im Abschnitt "Traits" besprechen.

6

message ist eine weitere $ref, die auf das Schema des Streaming-Datenprodukts verweist. Der Verweis verweist auf einen anderen Teil des AsyncAPI-Dokuments, in dem genau beschrieben wird, wie die Nachricht strukturiert ist, damit der Client-Consumer sie analysieren und verarbeiten kann.

Der channels Abschnitt der AsyncAPI kann sowohl einen subscribe als auch einen publish Abschnitt enthalten. Wir haben den Abschnitt publish weggelassen, da dieses AsyncAPI-Dokument dazu gedacht ist, Streaming-Data-Produkte zu beschreiben, und andere Domains nicht die Informationen haben sollten, um das Apache Kafka-Topic zu produzieren. Diese AsyncAPI sollte nur Abonnenten haben, die die anderen Domains im Streaming Data Mesh sind.

Abschnitt Komponenten

Das components Objekt enthält fünf Unterabschnitte, die wiederverwendbare Objekte für verschiedene Teile der AsyncAPI-Spezifikation enthalten (siehe Beispiel 4-10). Alle Objekte, die innerhalb des components Objekts definiert sind, haben keine Auswirkungen auf die API, es sei denn, sie werden ausdrücklich von Eigenschaften außerhalb des components Objekts referenziert. In den vorangegangenen Abschnitten wurde in den AsyncAPI-Beispielen auf viele der Unterabschnitte des components Objekts verwiesen. Gehen wir jeden dieser Unterabschnitte im Detail durch.

Beispiel 4-10. components enthält die Nachrichten und Schemadetails
components:
  messages:
  schemas:
  securitySchemes:
  messageTraits:
  operationTraits:

Abschnitt Nachrichten

Du erinnerst dich vielleicht aus Beispiel 4-9, dass der Abschnitt channel/covid/message des AsyncAPI-Dokuments ein Nachrichtenobjekt referenziert, das components/messages/covidapi ist. Dieses Schema ist unter dem Abschnitt components des AsyncAPI-Dokuments definiert. Der Abschnitt components enthält zwei Unterabschnitte: messages und schemas. Der Abschnitt messages beschreibt den Umschlag der Nutzlast (siehe Beispiel 4-11), und schemas beschreibt die Nutzlast selbst.

Beispiel 4-11. messages beschreibt den Umschlag, der die Nutzlast enthält
components:
  messages:
    covidapi: 1
      name: covidapi
      title: covid api
      summary: covidapi from https://api.covid19api.com/
      correlationId: 2
        description: |
          You can correlate / join with other data using the
          CountryCode field.
        location: $message.payload#/CountryCode
      tags: 3
        - name: message-tag1
          externalDocs:
            description: External docs description 1
            url: https://www.asyncapi.com/
        - name: message-tag2
          description: Description 2
          externalDocs:
            url: "https://www.asyncapi.com/"
      headers: 4
        type: object
        properties:
          my-custom-app-header:
            type: string
          correlationId:
            type: string
      payload: 5
        $ref: "#/components/schemas/covidapi"
      bindings: 6
        kafka:
          key:
            type: object
            properties:
              id:
                type: string
              type:
                type: string
          bindingVersion: '0.1.0'
1

Der Name der Komponente message. Dies ist das Element, auf das in Beispiel 4-9 verwiesen wird.

2

Die correlationId verweist auf die (5), um das Feld zu identifizieren, das als Korrelations-ID verwendet werden soll, ein Bezeichner in der Nachrichtenverfolgung. In Kafka wird dies höchstwahrscheinlich der Schlüssel sein, der für die Zuordnung der Partition in einem Topic verwendet wird.

3

Auch hier kann tags genutzt werden, um Beziehungen zwischen anderen Datenprodukten oder Domänen herzustellen.

4

Der Abschnitt headers enthält Informationen im Header der Nachricht von der Streaming-Plattform. Er enthält auch die correlationId, falls sie im Header angegeben ist.

5

payload verweist auf einen anderen Abschnitt der AsyncAPI, der das Schema der Nachricht enthält, die sich in components/schemas/covidapi im selben components Objekt befindet.

6

Eine Freiform-Map, bei der die Schlüssel den Namen des Protokolls (in diesem Fall das Kafka-Protokoll) und die Werte protokollspezifische Definitionen für den Server (Kafka) beschreiben.

Der in Beispiel 4-12 gezeigte Unterabschnitt schemas definiert die Schema-Nutzdaten inline. Inline bedeutet, dass das Schema im AsyncAPI YAML-Dokument definiert ist. Alternativ kannst du das Schema auch außerhalb des AsyncAPI YAML-Dokuments definieren, indem du eine $ref angibst, die einfach eine URL zum Schema ist (siehe Beispiel 4-13).

Beispiel 4-12. Das Schema beschreibt die Nutzlast, die das Streaming-Datenprodukt selbst ist
  schemas:
      covidapi:
        type: object
        required:
          - CountryCode
        properties:
          Country:
            type: string
          CountryCode:
            type: string
            description: correlationId
          Date:
            type: string
          ID:
            type: string
          NewConfirmed:
            type: integer
          NewDeaths:
            type: integer
          NewRecovered:
            type: integer
          Premium:
            type: object
          Slug:
            type: string
          TotalConfirmed:
            type: integer
          TotalDeaths:
            type: integer
          TotalRecovered:
            type: integer

Die Verwendung eines Tools wie einer Schemaregistrierung zur Registrierung und Verwaltung von Schemas ist der bevorzugte Ansatz. Die Schemaregistrierung behält den Überblick über die Schemaversionen und prüft manchmal die Kompatibilität mit früheren Versionen. Schemas sind der "Vertrag" zwischen der produzierenden Domäne und der konsumierenden Domäne für Streaming-Datenprodukte. Dies schützt die Anwendungen vor Änderungen am Schema, die die Datenverarbeitung in der konsumierenden Domäne stören könnten. Außerdem zwingt es die produzierenden Domänen dazu, ihre Streaming-Data-Produkte so weiterzuentwickeln, dass die Kompatibilität mit älteren Versionen nicht unterbrochen wird. Eine Schemaregistrierung fällt unter die föderierte Daten-Governance im Streaming Data Mesh, daher werden wir in Kapitel 5 näher darauf eingehen. Im AsyncAPI YAML-Dokument ist es wichtig zu wissen, dass du dein Schema nicht inline, sondern remote mit einer Schema-Registry definieren kannst (siehe Beispiel 4-13).

Beispiel 4-13. messages beschreibt den Umschlag, der die Nutzlast enthält
messages:
    covidapi:
      name: covidapi
      title: covidapi
      summary: COVID 19 global statistics
      contentType: avro/binary
      schemaFormat: application/vnd.apache.avro+json;version=1.9.0
      payload:
        $ref: 'http://schema-registry:8081/subjects/topic/versions/1/#covidapi' 1
1

Das Schema payload ist ein externer Verweis auf eine Schemaregistrierung.

Abschnitt Sicherheitssysteme

Die AsyncAPI bietet spezifische Sicherheitsinformationen im securitySchemes Objekt des YAML-Dokuments. Beispiel 4-14 zeigt, wie man sich mit der Streaming-Plattform verbindet, die in diesem Fall Kafka ist, wie im Abschnitt servers der AsyncAPI YAML definiert. Das description Objekt hat eine Eigenschaftsbeschreibung, die der konsumierenden Anwendung zur Verfügung gestellt wird. Dadurch erhält der Client des Streaming-Datenprodukts genauere Informationen. Die Autoren des AsyncAPI YAML-Dokuments können den Inhalt des description -Objekts so formatieren, dass er weitere Informationen enthält, die AsyncAPI YAML nicht bereitstellt.

Beispiel 4-14. Abschnitt Sicherheitsschemata von Komponenten, der weitere Details in der description
securitySchemes:
    user-password:
      type: userPassword
      description: |
        Provide your Confluent KEY as the user and SECRET as the password.

        ```prop
        # Kafka
        bootstrap.servers=kafka.us-east-2.aws.confluent.cloud:9092
        security.protocol=SASL_SSL
        sasl.mechanisms=PLAIN
        sasl.username={{ CLUSTER_API_KEY }} 1
        sasl.password={{ CLUSTER_API_SECRET }} 2

        # Best practice for higher availability in librdkafka clients prior to 1.7
        session.timeout.ms=45000

        # Confluent Cloud Schema Registry
        schema.registry.url=https://schema-registry.westus2.azure.confluent.cloud
        basic.auth.credentials.source=USER_INFO
        basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }} 3

        ```

        Copy the above YAML replacing the KEY/SECRETS for both the cluster and
        schema registry and use in your Kafka clients.
1

{{ CLUSTER_API_KEY }} ist der Schlüssel oder Benutzer in user-password, der für die Verbindung zu Kafka in der Confluent Cloud verwendet wird.

2

{{ CLUSTER_API_SECRET }} ist das Geheimnis oder das Passwort user-password für die Verbindung zu Kafka in Confluent Cloud.

3

{{ SR_API_KEY }}:{{ SR_API_SECRET }} ist der Schlüssel/das Geheimnis oder user-password, der/das bei der Abfrage von Schemas aus der Schema Registry in Confluent Cloud verwendet wird.

Wenn Domain-Konsumenten ein Streaming-Datenprodukt wie die in dieser AsyncAPI beschriebenen globalen COVID-19-Statistikdaten konsumieren wollen, müssen sie den Zugriff darauf beantragen. Dann muss der Streaming Data Product Manager die antragstellende Domain genehmigen. Ein Teil dieser Genehmigung kann darin bestehen, die Zugangsdaten an die anfragende Domäne zu senden, damit sie diese Informationen in ihre Konfiguration aufnehmen kann, um das Streaming-Datenprodukt zu nutzen. In Beispiel 4-14 würde dies den Parametern entsprechen, die in (1), (2) und (3) aufgeführt sind. Diese Parameter müssen durch die von der produzierenden Domäne bereitgestellten Anmeldeinformationen ersetzt werden, um Zugriff auf die Streaming-Datenprodukte zu erhalten.

Die Beschreibung in Beispiel 4-14 zeigt, wie man einen Konsumenten so konfiguriert, dass er von Confluent Cloud liest. Der Client muss lediglich die Parameter durch die Anmeldeinformationen ersetzen, diese Konfiguration in die Anwendung einlesen und sie an die Kafka-Client-Bibliotheken weitergeben, um sie für das Lesen aus Kafka zu konfigurieren. In späteren Kapiteln werden wir zeigen, wie man das AsyncAPI YAML-Dokument verwendet, um stattdessen Apache Kafka-Konnektoren zu bauen und Anwendungen zu erstellen, die aus Apache Kafka lesen.

Warnung

Es ist wichtig, sich daran zu erinnern, dass dieses Beispiel die Benutzer/Passwort-Methode für die Sicherheit verwendet. Wie bereits erwähnt, haben andere unterstützte Sicherheitsmethoden ihre eigenen Arten von Anmeldeinformationen und verwenden möglicherweise nicht den Benutzer/Passwort-Ansatz. Diese anderen Sicherheitsmethoden werden in diesem Buch nicht behandelt.

Abschnitt Eigenschaften

In AsyncAPI bietet traits zusätzliche Informationen, die auf ein Objekt im YAML-Dokument angewendet werden können. Traits werden in der Regel nur verwendet, wenn die Anwendung, die das AsyncAPI-YAML-Dokument parst, versucht, Client-Code in einer bestimmten Sprache zu erzeugen. In Beispiel 4-9 gab es ein Objekt namens operationId mit dem Wert receiveNewCovidInfo. Eine Anwendung, die ein AsyncAPI YAML-Dokument namens AsyncAPI Generator liest, kann von der AsyncAPI-Website heruntergeladen werden. Er generiert Java Spring Client Code für Domains, den du kompilieren und einsetzen kannst. Diese Anwendung kann das Streaming-Datenprodukt von der Streaming-Plattform, die im Abschnitt servers in der AsyncAPI YAML definiert ist, konsumieren. In diesem Fall wird es Apache Kafka sein. Der AsyncAPI Generator verwendet den Wert in operationId als Methodennamen im Quellcode. Die Traits in messageTraits und operationTraits in Beispiel 4-15 werden verwendet, um Methoden wie groupID oder clientId Werte zuzuweisen, die bei der Generierung von Client-Code helfen.

Beispiel 4-15. Traits werden von Codegeneratoren verwendet, um Eigenschaften Werte zuzuweisen und Klassen und Methoden im generierten Code zu benennen
  messageTraits:
    commonHeaders:
      headers:
        type: object
        properties: 1
          my-app-header:
            type: integer
            minimum: 0
            maximum: 100
          correlationId:
            type: string

  operationTraits:
    covid:
      bindings:
        kafka: 2
          groupId: my-app-group-id-pub
          clientId: my-app-client-id-pub
          bindingVersion: '0.1.0'
1

Liefert weitere Informationen darüber, welche Header vom Client für die Verarbeitung des Streaming-Datenprodukts verwendet werden können

2

Stellt Kafka Bindungsinformationen zur Verfügung, die helfen, die Verbrauchergruppen zu identifizieren und die Client-Anwendung bei Bedarf elastisch zu skalieren

Zuweisung von Daten-Tags

Daten-Tags sind eine einfache Möglichkeit, den konsumierenden Bereichen mehr Informationen über das Streaming-Datenprodukt zu geben: wie es erstellt wurde und was sie erwarten können, wenn sie es konsumieren. Viele der Eigenschaften von Streaming-Daten sind schwer zu messen, wie z. B. Qualität und Sicherheit, so dass es manchmal schwierig ist, den Nutzern diese wichtigen Informationen zur Verfügung zu stellen. Anstatt eine Zahl oder eine Punktzahl anzugeben, können wir Tags bereitstellen, die die Qualitäts- und Sicherheitsstufen repräsentieren. In diesem Abschnitt werden wir versuchen, den Daten Tags zuzuweisen.

Tags können Informationen über die Qualität oder Sicherheit des Streaming-Datenprodukts liefern. Manchmal wünschen sich die Abnehmerbereiche Streaming-Datenprodukte, die unverändert (in Rohform) aus der ursprünglichen Quelle stammen. Andere Verbraucherbereiche möchten vielleicht, dass dasselbe Streaming-Datenprodukt von höchster Qualität ist und die Formatstandards und Sicherheitsanforderungen erfüllt. Das wären dann zwei verschiedene Streaming-Datenprodukte. Tags bieten eine einfache Möglichkeit, Streaming-Datenprodukte für die konsumierenden Domänen zu präsentieren.

Qualität

Qualität ist ein schwer zu bewertendes Merkmal, aber wir könnten Tags verwenden, wie sie in Tabelle 4-8 definiert sind.

Tabelle 4-8. Mögliche Qualitätskennzeichen
Tags Definition

RAW

Rohdaten aus der Originalquelle

STANDARD

Umwandlung zur Erfüllung von Formatstandards

ANREICHEN

Umwandlung in Formatstandards und Anreicherung

Für die beiden Streaming-Datenprodukte, die aus denselben Quelldaten stammen, aber eine unterschiedliche Qualität aufweisen, könnten wir RAW als Datenqualitäts-Tag für das Streaming-Datenprodukt zuweisen, das Rohdaten liefert. Für das zweite Streaming-Produkt, bei dem die Verbraucher eine Anreicherung erwarten, könnten wir ENRICHED als Datenqualitätskennzeichen vergeben. So könnten die Verbraucher leicht erkennen, auf welches Streaming-Produkt sie zugreifen wollen.

Diese Tags könnten in der AsyncAPI tags zugewiesen werden und die Domain-Konsumenten könnten sie anklicken und die Tag-Definitionen abrufen, wie in Beispiel 4-16.

Beispiel 4-16. Hinzufügen von Qualitätsmerkmalen zur Bereitstellung zusätzlicher Informationen
    covidapi:
      name: covidapi
      title: covid api
      tags:
        - name: quality.RAW 1
          externalDocs:
            description: Provides raw source data
            url: https://somewhere/quality/raw
1

quality.RAW entspricht den Rohdaten. Die URL leitet den Domain-Nutzer zu weiteren Informationen darüber, wie die Qualität umgesetzt wurde.

Sicherheit

Bei der Sicherheit geht es in diesem Zusammenhang um den Schutz sensibler Informationen im Streaming-Datenprodukt. Ähnlich wie bei der Qualität könnten auch die Sicherheits-Tags wie in Tabelle 4-9 definiert werden.

Tabelle 4-9. Mögliche Sicherheits-Tags
Tags Definition

FILTERED

Die sensiblen Daten wurden aus dem Streaming-Datenprodukt herausgefiltert oder ausgewählt, oder es waren keine sensiblen Informationen in der Nutzlast enthalten.

TOKENIZED

Die sensiblen Daten wurden mit Token versehen und können über einen Nachschlagemechanismus abgerufen werden.

VERSCHLÜSSELT

Die sensiblen Daten wurden verschlüsselt und die Entschlüsselung der Daten in ihren ursprünglichen Wert erfordert einen Schlüssel.

Ähnlich wie bei den Tags für die Qualitätsinformationen werden auch die Tags für die Sicherheit hinzugefügt. Du kannst dem AsyncAPI-YAML-Dokument mehrere Tags sowohl für die Qualität als auch für die Sicherheit hinzufügen(Beispiel 4-17).

Beispiel 4-17. Hinzufügen von Sicherheits-Tags zur Bereitstellung zusätzlicher Informationen
    covidapi:
      name: covidapi
      title: covid api
      tags:
        - name: security.FILTERED 1
          externalDocs:
            description: Provides raw source data
            url: https://somewhere/security/filtered
1

security.FILTERED bedeutet, dass sensible Informationen gefiltert oder aus dem endgültigen Streaming-Datenprodukt entfernt wurden.

Beispiel 4-17 zeigt, wie wir der konsumierenden Domäne Informationen darüber geben können, was mit den zu sichernden Daten gemacht wurde. Die url ist eine zusätzliche Ressource, die weitere Informationen darüber liefern kann, was herausgefiltert wurde und warum.

Durchsatz

Die Bereitstellung des Durchsatzes für das Streaming-Datenprodukt liefert wichtige Informationen zur Skalierbarkeit für den Verbraucher. Der Durchsatz kann in Megabytes pro Sekunde (MBps) gemessen werden. Er ist ein Indikator dafür, wie schnell die Daten zu einer konsumierenden Domäne gelangen. Einige Streaming-Datenprodukte können langsam sein, wie z. B. die sich langsam verändernden Dimensionsdaten, die in "Dimensionsdaten versus Faktendaten im Streaming-Kontext" besprochen werden . Andere Streaming-Daten können sehr schnell sein, z. B. Clickstream-Daten aus einer Webanwendung oder ein Twitter-Feed.

Ähnlich wie bei Qualität und Sicherheit könntest du in Beispiel 4-18 den Durchsatz als Beschreibung in einem Durchsatz-Tag und einer URL angeben, die zusätzliche Informationen darüber liefert, wie das Apache Kafka-Topic konfiguriert ist, um diesen Durchsatz zu ermöglichen, z. B. die Anzahl der Partitionen.

Beispiel 4-18. Beschreiben des Durchsatzes in der AsyncAPI
subscribe:
      summary: Subscribe to global COVID 19 Statistics.
      description: |
        The schema that this service follows the https://api.covid19api.com/
      operationId: receiveNewCovidInfo
      tags:
        - name: throughput 1
          externalDocs:
            description: 10/mbps 2
            url: https://localhost/covid119/throughput 3
1

Durchsatz-Tag

2

Die Tag-Beschreibung, die den Durchsatzwert angibt

3

Eine URL, unter der konsumierende Domänen mehr darüber erfahren können, wie der Durchsatz implementiert wird

Versionierung

ist es wichtig, die Version des Streaming-Datenprodukts anzugeben. Auf diese Weise können konsumierende Domänen nachvollziehen, ob das Streaming Data-Produkt für ihre Produktionsworkloads bereit ist oder ob eine größere Versionsänderung stattgefunden hat, von der sie profitieren könnten.

Beispiel 4-19 zeigt, wie die AsyncAPI detaillierte Versionsinformationen für konsumierende Domänen bereitstellen kann, die diese bei der Verwaltung ihrer eigenenAnwendungsentwicklung nutzen können.

Beispiel 4-19. AsyncAPI YAML Informationsabschnitte
info:
  title: COVID 19 Global Statistics AsyncAPI Spec
  version: '0.0.1'
  description: |
    This streaming data product is in preview. DISCLAIMER - this streaming data
    product could implement breaking changes. Do not use this for your production
    applications in your domain.
  contact:
    name: API Support
    url: http://www.asyncapi.com/support

Es kann auch von Vorteil sein, wenn das Änderungsprotokoll zu aktualisierten Versionen entweder in der Beschreibung oder einer URL angegeben wird. Dazu gehören alle Änderungen an der ETL-Datenpipeline, die das Datenprodukt erzeugt, und alle Änderungen an der ursprünglichen Quelle.

Datenderivate, die von anderen Streaming-Datenprodukten aus anderen Bereichen stammen, sollten ebenfalls in Tags oder URLs gekennzeichnet werden, damit die konsumierenden Bereiche die Quellen, aus denen das endgültige Streaming-Datenprodukt besteht, rekursiv aufschlüsseln können. In späteren Kapiteln werden wir uns damit befassen, wie dies möglich ist.

Überwachung

Die Überwachungsinformationen über das Streaming-Datenprodukt sind für die konsumierenden Domänen ebenfalls wichtig. Auch dies könnte als weitere URL oder Tag im AsyncAPI-YAML-Dokument angegeben werden. Tabelle 4-10 zeigt einige der wichtigen Informationen, die konsumierende Domains wissen wollen.

Tabelle 4-10. Informationen, die aus der Überwachung von Streaming-Datenprodukten gewonnen werden können
Informationen Einsicht

Anzahl der Verbraucher

  • Zeigt an, wie beliebt das Streaming-Datenprodukt ist.

  • Um zu sehen, welche anderen Domänen diese Daten nutzen.

Fehlerzahl/SLA

  • Um den aktuellen Status des Streaming-Datenprodukts zu erfahren, falls es in der konsumierenden Domäne zu Problemen kommt. Damit lassen sich Fragen beantworten wie:

    • Ist sie aktiv?

    • Gibt es eine Stromunterbrechung?

    • Wann wird das Daten-Streaming-Produkt voraussichtlich wieder online sein?

  • Um zu sehen, ob das Streaming-Produkt die SLA für die Betriebszeit einhält.

Durchsatz/Bandbreite

  • Um zu sehen, ob das Streaming-Datenprodukt die maximale Kapazität erreicht hat.

Wenn die konsumierende Domäne 99,999 % für ihre eigenen Anwendungen benötigt und das Streaming-Datenprodukt nur 99,9 % bietet, möchte sie vielleicht höhere SLA-Garantien verlangen, was zu einem separaten Streaming-Datenprodukt führen kann.

Die konsumierenden Domains werden Alarme für diese Metriken einrichten wollen, damit sie auf mögliche Probleme reagieren können. Du kannst eine bessere Datenverflechtung erreichen, wenn du alle deine Streaming-Datenprodukte überwachst und sie programmatisch konsumierbar oder für die konsumierenden Domänen alarmierbar machst.

Zusammenfassung

In diesem Kapitel haben wir alle notwendigen Schritte zur Erstellung eines Streaming Data-Produkts beschrieben: die Definition der Anforderungen, die Aufnahme, die Umwandlung und schließlich die Veröffentlichung eines AsyncAPI YAML-Dokuments. All dies haben wir mit den Fähigkeiten von Generalisten gemacht, die wir von Domain Engineers erwarten: JSON, SQL und YAML. Dieses AsyncAPI-Dokument ermöglicht es uns, ein Streaming Data Mesh zu erstellen. In Kapitel 5 erfahren wir, wie wir das AsyncAPI YAML-Dokument nutzen können, um einen Streaming-Datenkatalog zu füllen. Wir werden auch AsyncAPI-Anwendungen (Tools) verwenden, die HTML-Seiten in einer Streaming-Data-Catalog-Anwendung erzeugen, und sehen, wie wir diese erweitern können, um einen Streaming-Data-Mesh-Workflow hinzuzufügen. In späteren Kapiteln werden wir das AsyncAPI-Dokument weiter nutzen, um Self-Services zu erstellen, die Integrationen aufbauen und Metadaten rekursiv abrufen können, z. B. die Datenabfolge.

1 US Department of Health and Human Services, "Summary of the HIPAA Privacy Rule", S. 4.

2 Die EU-DSGVO gilt nur für personenbezogene Daten, also alle Informationen, die sich auf eine identifizierbare Person beziehen. Für jedes Unternehmen, das mit EU-Kunden zu tun hat, ist es wichtig, dieses Konzept zu verstehen, um die GDPR einzuhalten.

Get Streaming Data Mesh now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.