Kapitel 4. Arbeiten mit Daten und Feature Stores

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Beim maschinellen Lernen werden Daten in eine Vorhersagelogik umgewandelt. Daten sind für den Prozess unerlässlich, können aus vielen Quellen stammen und müssen verarbeitet werden, um sie nutzbar zu machen. Daher sind Datenmanagement und -verarbeitung die wichtigsten Komponenten des maschinellen Lernens. Daten können aus verschiedenen Quellen stammen:

Dateien

Daten, die in lokalen oder Cloud-Dateien gespeichert sind

Data Warehouses

Datenbanken mit historischen Datentransaktionen

Online-Datenbanken

SQL-, NoSQL-, Graph- oder Zeitreihendatenbanken mit aktuellen Transaktions- oder Anwendungsdaten

Datenströme

Zwischenspeicherung von Echtzeit-Ereignissen und Nachrichten (für die zuverlässige Weitergabe von Daten zwischen Diensten)

Online-Dienste

Jeder Cloud-Dienst, der wertvolle Daten liefern kann (dazu können soziale, finanzielle, staatliche und Nachrichtendienste gehören)

Eingehende Nachrichten

Asynchrone Nachrichten und Benachrichtigungen, die per E-Mail oder über einen anderen Nachrichtendienst (Slack, WhatsApp, Teams) eingehen können

Die Quelldaten werden verarbeitet und als Merkmale gespeichert, um sie für die Modellschulung und den Modellfluss zu verwenden. In vielen Fällen werden die Merkmale in zwei Speichersystemen gespeichert: eines für den Batch-Zugriff (Training, Batch-Vorhersage usw.) und eines für den Online-Abruf (für Echtzeit- oder Online-Serving). Infolgedessen kann es zwei getrennte Datenverarbeitungspipelines geben, eine mit Stapelverarbeitung und eine mit Echtzeitverarbeitung (Stream).

Die Datenquellen und die Verarbeitungslogik werden sich wahrscheinlich im Laufe der Zeit ändern, was zu Änderungen an den verarbeiteten Merkmalen und dem aus diesen Daten erstellten Modell führt. Daher sind die Versionierung der Daten, die Verarbeitungslogik und die Nachverfolgung der Datenreihenfolge entscheidende Elemente jeder MLOps-Lösung.

Die Bereitstellung genauer und hochwertiger Produktionsmodelle erfordert große Datenmengen und eine hohe Verarbeitungsleistung. Die Verarbeitung von Produktionsdaten kann mit verteilten Analyse-Engines(Apache Spark, Dask, Google BigQuery und anderen), Stream-Processing-Technologien (wie Apache Flink) oder mehrstufigen Datenpipelines skaliert werden.

Einer der Mechanismen zur Automatisierung der Integration mit Datenquellen, der skalierbaren Batch- und Echtzeit-Datenverarbeitung, der Datenversionierung und der Feature-Verwaltung ist die Verwendung eines Feature-Stores. Ein Feature Store ist ein zentraler Knotenpunkt für die Erstellung, den Austausch und die Überwachung von Features. Feature Stores sind in modernen MLOps-Implementierungen unverzichtbar und werden in diesem Kapitel näher beschrieben.

Datenversionierung und Abstammung

Modelle und Datenprodukte werden aus Daten abgeleitet. Das Sammeln von Metadaten und die Rückverfolgung der Herkunft der Daten ermöglichen daher eine bessere Kontrolle und Steuerung der Datenprodukte. Wenn du eine bestimmte Version eines Datenprodukts untersuchen willst, musst du außerdem die ursprünglichen Daten kennen, die zur Erstellung dieses Produkts oder Modells verwendet wurden.

Die Versionierung von Daten und die Verwaltung von Metadaten gehören zu den wichtigsten MLOps-Praktiken, die die folgenden Punkte abdecken:

Datenqualität

Die Rückverfolgung von Daten durch die Systeme einer Organisation und das Sammeln von Metadaten und Informationen über die Herkunft können helfen, Fehler und Unstimmigkeiten zu erkennen. So können Korrekturmaßnahmen ergriffen und die Datenqualität verbessert werden.

Reproduzierbarkeit und Rückverfolgbarkeit von Modellen

Der Zugang zu historischen Daten ermöglicht es uns, die Modellergebnisse zu reproduzieren und kann für die Fehlersuche und das Ausprobieren verschiedener Parametersätze genutzt werden.

Datenmanagement und Prüfbarkeit

Indem sie den Ursprung und die Geschichte der Daten verstehen, können Organisationen sicherstellen, dass die Daten den erwarteten Richtlinien und Vorschriften entsprechen, Fehlerquellen aufspüren und Prüfungen oder Untersuchungen durchführen.

Compliance

Die Datenabfolge kann Organisationen helfen, die Einhaltung von Vorschriften wie GDPR und HIPAA nachzuweisen.

Einfachere Datenverwaltung

Metadaten und Abstammungsinformationen ermöglichen eine bessere Datenerkennung, Mappings, Profilerstellung, Integration und Migrationen.

Bessere Zusammenarbeit

Datenversionierung und -abstammung können die Zusammenarbeit zwischen Datenwissenschaftlern und ML-Ingenieuren erleichtern, indem sie eine klare und konsistente Sicht auf die in ML-Modellen und bei der Handhabung von Upgrades verwendeten Daten ermöglichen.

Verfolgung von Abhängigkeiten

Verstehe, wie jede Daten-, Parameter- oder Codeänderung zu den Ergebnissen beiträgt, und erhalte Einblicke, welche Daten oder Modellobjekte aufgrund von Änderungen an der Datenquelle geändert werden müssen.

Wie es funktioniert

Wie in Abbildung 4-1 dargestellt, kann der Datengenerierungsfluss als eine Reihe von Datenquellen und Parametern abstrahiert werden, die als Input für eine Datenverarbeitungsaufgabe(Berechnung) verwendet werden, die eine Sammlung von Datenprodukten oder Artefakten erzeugt. Die Output-Artefakte können unterschiedlicher Art sein: Dateien, Tabellen, Machine-Learning-Modelle, Diagramme und so weiter.

imle 0401
Abbildung 4-1. Fluss der Datenreihenfolge

Das Datenverfolgungssystem erfasst die Informationen über die Eingaben (Datenquellen und -versionen, Parameter) und Berechnungsaufgaben (Code, Pakete, Ressourcen, ausführender Benutzer und mehr). Dann fügt es sie als Metadaten in die Output-Artefakte ein. Die Metadaten können zusätzliche Informationen enthalten, wie z. B. vom Nutzer bereitgestellte Labels oder Tags, Informationen über die Datenstruktur, das Schema, Statistiken usw. Die Metadaten werden in der Regel nicht in jedes Output-Artefakt kopiert, sondern stattdessen referenziert (durch einen Link), um Datendopplungen zu vermeiden.

Wie in Abbildung 4-2 dargestellt, können die Ergebnisse der ersten Aufgabe (z. B. Datenaufbereitung) als Dateninput für die folgenden Aufgaben (z. B. Modelltraining, Testen) verwendet werden.

imle 0402
Abbildung 4-2. Datenreihenfolge in einer mehrstufigen Pipeline

Wenn wir über eine Benutzeroberfläche oder ein SDK auf ein Datenprodukt zugreifen, können wir anhand der Metadaten die genauen Datenquellen, Parameter und die vollständigen Details der Berechnungsaufgabe sehen. Wir können auch den Fortschritt der in einem mehrstufigen Fluss erzeugten Daten verfolgen und alle zusätzlichen Metadaten untersuchen.

Jedes Mal, wenn die Datenverarbeitungsaufgabe ausgeführt wird, wird eine neue Version der Output-Artefakte erstellt (siehe Abbildung 4-3). Jede Version wird mit einer eindeutigen Versionskennung (Commit-ID) versehen und kann außerdem mit einem aussagekräftigen Versionsnamen versehen werden, z. B. Master, Development, Staging, Production usw. Dies ähnelt dem Git-Ablauf bei der Versionierung von Quellcode.

Nehmen wir an, du führst wiederholt jede Stunde eine bestimmte Aufgabe aus. Sie hat die gleichen Eingaben und Parameter oder du nimmst kleine Änderungen vor, die die Ergebnisse der Ausgabedaten nicht verändern. Das kann zu riesigen Haufen redundanter Daten führen, und mehrere Versionen speichern denselben Inhalt. Viele Lösungen für die Datenversionierung implementieren die Deduplizierung von Inhalten, um dieses Problem zu lösen.

Wenn ein Artefakt erstellt wird, wird ein kryptografischer Hash-Wert des Inhalts berechnet (z. B. mit den Algorithmen MD5 oder SHA1), der die Einzigartigkeit des Inhalts darstellt. Schließlich wird der Hash-Wert mit älteren Versionen verglichen oder als Index in der Speicherung verwendet. Auf diese Weise wird der Inhalt nur einmal gespeichert.

Da Lösungen für die Datenversionierung neben den Quelldaten (Code, Parameter, Benutzer, Ressourcen usw.) auch verschiedene Attribute erfassen, müssen sie gut in das Versionskontrollsystem (Git) und das Job- oder Pipeline-Ausführungsframework integriert sein. Andernfalls muss der Nutzer die Frameworks manuell zusammenfügen und die Referenzmetadaten für die Aufzeichnung zusammen mit den Daten bereitstellen.

imle 0403
Abbildung 4-3. Wie sich Daten-, Parameter- und Codeänderungen auf Artefaktversionen auswirken

Viele Frameworks(MLflow, MLRun usw.) bieten eine Logging-API, bei der der Nutzer eine log_artifact() Methode aufruft, die automatisch die neuen Daten zusammen mit dem Code und den Ausführungsmetadaten aufzeichnet und versioniert. Viele bieten eine auto logging Lösung an, die keine Code-Instrumentierung erfordert. Stattdessen findet sie automatisch heraus, welche Daten und Metadaten gespeichert und versioniert werden müssen, indem sie den Code des Nutzers und die Fähigkeiten des ML-Frameworks versteht.

Gängige ML-Datenversionierungstools

Es gibt eine Reihe von Open-Source- und kommerziellen Frameworks für die Datenversionierung. Dieses Buch konzentriert sich darauf, die Open-Source-Optionen DVC, Pachyderm, MLflow und MLRun zu erläutern und zu vergleichen.

Versionskontrolle der Daten

Data Version Control (DVC) begann als Werkzeug zur Datenversionierung für ML und wurde erweitert, um grundlegende ML-Workflow-Automatisierung und Experimentmanagement zu unterstützen. Es nutzt die Vorteile der Softwareentwicklung, mit der du bereits vertraut bist (Git, deine IDE, CI/CD usw.).

DVC funktioniert genau wie Git (mit ähnlichen Befehlen), aber für große dateibasierte Datensätze und Modellartefakte. Das ist sein Hauptvorteil, aber auch seine Schwäche. DVC speichertdie Dateninhalte in Dateien oder einer Objektspeicherung (AWS S3, GCS, Azure Blob usw.) und hält eine Referenz auf diese Objekte in einer Datei (.dvc) fest, die im Git-Repository gespeichert wird.

Mit dem folgenden Befehl fügst du eine lokale Modelldatei(model.pkl) zum Datenversionierungssystem hinzu:

dvc add model.pkl

DVC kopiert den Inhalt der Datei model.pkl in eine neue Datei mit einem neuen Namen (abgeleitet vom md5-Hashwert des Inhalts) und legt sie im Verzeichnis .dvc/ ab. Außerdem wird eine Datei namens model.pkl.dvc erstellt, die auf diese Inhaltsdatei verweist. Als Nächstes muss die neue Metadatendatei von Git nachverfolgt werden, der Inhalt sollte ignoriert und die Änderungen müssen übertragen werden. Dies geschieht durch die Eingabe der folgenden Befehle:

git add model.pkl.dvc .gitignore
git commit -m "Add raw data"

Wenn du die Daten auf deine entfernte Speicherung hochladen willst, musst du ein entferntes Objekt-Repository einrichten (hier nicht gezeigt) und den DVC-Push-Befehl verwenden:

dvc push

Der Datenfluss ist in Abbildung 4-4 dargestellt.

imle 0404
Abbildung 4-4. DVC-Datenfluss (Quelle: DVC)

Wie du aus dem Beispiel ersehen kannst, bietet DVC eine zuverlässige Synchronisierung zwischen Code- und Dateidatenobjekten, aber es erfordert eine manuelle Konfiguration und speichert keine erweiterten Metadaten über die Ausführung, den Arbeitsablauf, die Parameter und so weiter. Stattdessen verwaltet DVC Parameter und Ergebnismetriken mithilfe von JSON- oder YAML-Dateien, die zusammen mit dem Code gespeichert und versioniert werden.

Benutzer können Workflow-Phasen definieren, die eine ausführbare Datei (z.B. ein Python-Programm) umschließen und angeben, welche Parameter (-p) die Dateieingaben oder Abhängigkeiten (-d) und Ausgaben (-o) für diese ausführbare Datei sind (siehe Beispiel 4-1).

Beispiel 4-1. Hinzufügen eines Workflow-Schrittes in DVC
dvc stage add -n featurize \
              -p featurize.max_features,featurize.ngrams \
              -d src/featurization.py -d data/prepared \
              -o data/features \
              python src/featurization.py data/prepared data/features

Wenn du den Befehl dvc repro ausführst, prüft er, ob sich die Abhängigkeiten geändert haben, führt die erforderlichen Schritte aus und registriert die Ausgaben.

DVC verwendet keine Experimentierdatenbank. Es verwendet Git als Datenbank, und jede Ausführung oder Parameterkombination wird einem eindeutigen Git-Commit zugeordnet. Außerdem ist DVC auf die lokale Entwicklung ausgerichtet. Daher kann der Einsatz im großen Maßstab oder in einer containerisierten oder verteilten Workflow-Umgebung eine Herausforderung darstellen und Skripting und manuelle Integrationen erfordern.

Zusammenfassend lässt sich sagen, dass DVC ein hervorragendes Werkzeug ist, um große Datenartefakte zu versionieren und sie in einer lokalen Entwicklungsumgebung Git-Commits zuzuordnen. Darüber hinaus implementiert DVC eine Datendeduplizierung, um den tatsächlichen Speicherbedarf zu reduzieren. Andererseits ist DVC befehlszeilenorientiert (Git-Flow) und verfügt nur über begrenzte Möglichkeiten, um in der Produktion zu laufen, Pipelines auszuführen und erweiterte Attribute und strukturierte Daten zu verfolgen. Außerdem verfügt es über eine minimale Benutzeroberfläche (Studio).

Dickhäuter

Pachyderm ist ein Datenpipeline- und Versionierungstool, das auf einer containerisierten Infrastruktur aufbaut. Es bietet ein versioniertes Dateisystem und ermöglicht es den Nutzern, mehrstufige Pipelines zu erstellen, bei denen jede Stufe in einem Container läuft, Eingabedaten (als Dateien) akzeptiert und Ausgabedateien erzeugt.

Pachyderm bietet ein versioniertes Daten-Repository, das über eine objektbasierte Speicherung (z. B. AWS S3, Minio, GCS) implementiert und über eine Datei-API oder das SDK/CLI aufgerufen werden kann. Jede Datenübertragung oder Änderung wird ähnlich wie bei Git protokolliert. Die Daten werden dedupliziert, um Speicherplatz zu sparen.

Die Pachyderm-Datenpipeline führt Container aus und bindet einen Ausschnitt des Repositorys in den Container ein (unter dem Verzeichnis /pfs/ ). Der Container liest Dateien, verarbeitet sie und schreibt die Ergebnisse zurück in das Pachyderm-Repository.

Beispiel 4-2 zeigt eine einfache Pipeline-Definition, die alle Daten aus dem Repository data im Zweig master übernimmt, die Logik der Wortzählung (unter Verwendung des angegebenen Container-Images) ausführt und die Ausgabe in das Repository out schreibt.

Beispiel 4-2. Beispiel einer Dickhäuter-Pipeline
pipeline:
    name: 'count'
description: 'Count the number of lines in a csv file'
input:
    pfs:
        repo: 'data'
        branch: 'master'
        glob: '/'
transform:
    image: alpine:3.14.0
    cmd: ['/bin/sh']
    stdin: ['wc -l /pfs/data/iris.csv > /pfs/out/line_count.txt']

Pipelines können jedes Mal ausgelöst werden, wenn sich die Eingabedaten ändern, und die Daten können inkrementell verarbeitet werden (nur neue Dateien werden an den Containerprozess übergeben). Das kann Zeit und Ressourcen sparen.

Pachyderm hat eine schöne Benutzeroberfläche für die Verwaltung von Pipelines und die Erkundung der Daten. Siehe Abbildung 4-5.

imle 0405
Abbildung 4-5. Benutzeroberfläche des Dickhäuters

Pachyderm kann mit großen oder kontinuierlich strukturierten Datenquellen arbeiten, indem er die Daten in kleinere CSV- oder JSON-Dateien aufteilt.

Zusammenfassend lässt sich sagen, dass Pachyderm ein hervorragendes Tool für den Aufbau versionierter Datenpipelines ist, bei denen der Code einfach genug ist, um Dateien zu lesen und zu schreiben. Es beherrscht Deduplizierung und inkrementelle Verarbeitung. Es erfordert jedoch eine separate Nachverfolgung des Quellcodes (führt vorgefertigte Images aus), der Ausführungs- oder Experimentparameter, der Metadaten, Metriken und mehr.

MLflow Tracking

MLflow ist eine Open-Source-Plattform für die Verwaltung des gesamten Lebenszyklus von Machine Learning. Eine der Kernkomponenten ist MLflow Tracking, das eine API und eine Benutzeroberfläche für die Protokollierung von Machine-Learning-Läufen, deren Eingaben und Ausgaben sowie die Visualisierung der Ergebnisse bietet. MLflow Tracking-Läufe sind Ausführungen von Data Science Code. Jeder Lauf zeichnet die folgenden Informationen auf:

Code Version

Der für den Lauf verwendete Git-Commit-Hash.

Start- und Endzeit

Die Start- und Endzeit des Laufs.

Quelle

Der Name der Datei, mit der der Lauf gestartet wird, oder der Projektname und der Einstiegspunkt für den Lauf, wenn er von einem MLflow-Projekt aus ausgeführt wird.

Parameter

Key-value Eingabeparameter deiner Wahl. Sowohl die Schlüssel als auch die Werte sind Strings.

Metriken

Key-value Metriken, wobei der Wert numerisch ist. MLflow zeichnet den gesamten Verlauf der Kennzahl auf und macht ihn sichtbar.

Artefakte

Gib Dateien in einem beliebigen Format aus. Du kannst zum Beispiel Bilder (z. B. PNGs), Modelle (z. B. ein gepickeltes scikit-learn Modell) und Datendateien (z. B. eine Parquet-Datei) als Artefakte aufzeichnen.

MLflow Tracking ist keine vollständige Lösung für die Datenversionierung, da es keine Funktionen wie die Datenabfolge (Aufzeichnung der Dateneingaben und der Daten, die zur Erstellung eines neuen Datenelements verwendet wurden) oder Deduplizierung unterstützt. Es ermöglicht jedoch die Protokollierung und Indizierung der Datenausgaben jedes Laufs zusammen mit dem Quellcode, den Parametern und einigen Ausführungsdetails. MLflow kann manuell mit anderen Tools wie DVC integriert werden, um Daten und Experimente zu verfolgen.

Der Vorteil von MLflow ist es, die Datenausgaben mit zusätzlichen Metadaten über den Code und die Parameter zu verfolgen und sie in einer grafischen Benutzeroberfläche zu visualisieren oder zu vergleichen. Das ist jedoch nicht kostenlos. Der Nutzercode muss mit dem MLflow-Tracking-Code instrumentiert werden.

Beispiel 4-3 zeigt einen Teil des Codes, der einen Lauf über die MLflow-API verfolgt. Zunächst werden die Kommandozeilenargumente manuell geparst und die Eingabedaten als String-URL übergeben, wie jeder andere Parameter auch. Das Laden und Umwandeln der Daten erfolgt dann manuell.

Nachdem die Logik (Datenaufbereitung, Training usw.) ausgeführt wurde, protokolliert der Nutzer die Tags, Eingabeparameter, Ausgabemetriken und Datenartefakte (Datensatz und Modell) mithilfe der MLflow-Protokollbefehle.

Beispiel 4-3. MLflow Tracking Codebeispiel
if __name__ == "__main__":
    # parse the input parameters
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", help="input data path", type=str)
    parser.add_argument('--dropout',  type=float, default=0.0, help='dropout ratio')
    parser.add_argument("--lr", type=float, default=0.001, help='learning rate')
    args = parser.parse_args()

    # Read the csv file
    try:
        data = pd.read_csv(args.data)
    except Exception as e:
        raise ValueError(f"Unable to read the training CSV, {e}")

    # additional initialization code ...

    with mlflow.start_run():

        # training code ...

        # log experiment tags, parameters and result metrics
        mlflow.set_tag("framework", "sklearn")
        mlflow.log_param("dropout", args.dropout)
        mlflow.log_param("lr", args.lr)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)

        # log data and model artifacts
        mlflow.log_artifacts(out_data_path, "output_data")
        mlflow.sklearn.log_model(model, "model",
                                 registered_model_name="ElasticnetWineModel")

MLflow sendet die Laufinformationen an den Tracking-Server und speichert die Datenelemente in lokalen Dateien oder Remote-Objekten (z. B. in S3). Die Laufinformationen können in der MLflow-Benutzeroberfläche eingesehen oder verglichen werden (siehe Abbildung 4-6).

imle 0406
Abbildung 4-6. MLflow-Benutzeroberfläche

MLflow verwaltet oder versioniert keine Datenobjekte. Die Ausführung ist das wichtigste Element, und du kannst nicht direkt auf Datenobjekte und Artefakte zugreifen oder sie durchsuchen. Außerdem gibt es kein Lineage Tracking, d.h. es wird nicht verfolgt, welche Datenobjekte verwendet wurden, um ein neues Datenobjekt oder Artefakt zu erzeugen. Wenn du eine Pipeline ausführst, kannst du die Artefakte der verschiedenen Schritte nicht an einem Ort sehen oder die Ausgabe eines Schritts mit der Eingabe des nächsten Schritts verketten.

Mit MLflow kann die Speicherung sehr groß werden, da jeder Lauf die Ergebnisse in einem neuen Dateiverzeichnis speichert, auch wenn sich nichts geändert hat. Es gibt keine Datendeduplizierung wie bei den anderen Frameworks.

Zusammenfassend lässt sich sagen, dass MLflow ein hervorragendes Werkzeug ist, um ML-Experimentierergebnisse in einer Entwicklungsumgebung zu verfolgen und zu vergleichen. Darüber hinaus ist MLflow einfach zu installieren und zu nutzen. Es ist jedoch kein Datenverfolgungs- oder Versionierungssystem und kann erhebliche Speicherkapazitäten erfordern. Außerdem muss MLflow von den Entwicklern mit eigenem Code versehen werden und die MLOps-Teams müssen Glue-Logik hinzufügen, damit es in die Produktionsbereitstellung und CI/CD-Workflows passt.

MLRun

MLRun ist ein quelloffenes MLOps-Orchestrierungs-Framework mit mehreren Unterkomponenten, die den gesamten ML-Lebenszyklus abdecken. Datenobjekte sind in MLRun Bürger erster Klasse und sind gut mit den anderen Komponenten integriert, um eine nahtlose Erfahrung und Automatisierung zu ermöglichen.

Während die meisten Frameworks Dateidatenobjekte verwalten, unterstützt MLRun eine Vielzahl von Datenobjekten (Datenspeicher, Elemente/Dateien, Datensätze, Streams, Modelle, Feature-Sets, Feature-Vektoren, Diagramme und mehr), die jeweils über eigene Metadaten, Aktionen und Viewer verfügen.

Jedes Objekt in MLRun hat einen Typ, eine eindeutige Versions-ID, Tags (benannte Versionen wie Entwicklung, Produktion usw.), benutzerdefinierte Bezeichnungen (zum Gruppieren und Suchen von Objekten) und Beziehungen zu anderen Objekten und ist ein Projektmitglied. Ein Laufobjekt ist beispielsweise mit den Quell- und Ausgabedatenobjekten und den Funktionsobjekten (Code) verknüpft und bildet so einen Graphen von Beziehungen.

Abbildung 4-7 zeigt den Ausführungsbildschirm mit Informationsreitern für allgemeine und Code-Attribute, Dateneingabeobjekte, Daten/Artefaktausgabeobjekte, Ergebnismetriken, automatisch gesammelte Protokolle usw. Die Nutzer können die Informationen aus verschiedenen Perspektiven betrachten. Du kannst dir zum Beispiel alle Datensätze im Projekt ansehen (unabhängig davon, welcher Lauf sie erzeugt hat).

imle 0407
Abbildung 4-7. Benutzeroberfläche des MLRun-Auftragslaufs

MLRun-Datenobjekte und -Artefakte sind mit detaillierten Metadaten versehen. Dazu gehören Informationen darüber, wie sie erstellt wurden (von wem, wann, mit welchem Code, Framework usw.), welche Datenquellen zu ihrer Erstellung verwendet wurden und typspezifische Attribute wie Schema, Statistik, Vorschau und mehr. Die Metadaten werden automatisch generiert, was eine bessere Beobachtbarkeit ermöglicht und die Notwendigkeit einer zusätzlichen Glue-Logik überflüssig macht.

Hinweis

MLFlow-Benutzer können MLFlow weiterhin zum Verfolgen von APIs verwenden. MLRun registriert die protokollierten Daten, Metadaten und Modelle automatisch als Produktionsartefakte zusammen mit zusätzlichen betrieblichen Metadaten und Kontext.

MLRun bietet ein umfangreiches API/SDK für die Verfolgung und Suche in Daten und Experimenten. Die eigentliche Stärke ist jedoch, dass es die meisten Funktionen und Automatisierungen ohne zusätzlichen Programmieraufwand bereitstellen kann.

Beispiel 4-4 nimmt Eingabedaten und Parameter entgegen und erzeugt Ausgabedaten und Ergebnisse. Beachte, dass der Code im Gegensatz zu den vorherigen Beispielen kein Argument-Parsing, kein Laden von Daten, keine Konvertierung, kein Logging usw. enthält.

Beispiel 4-4. MLRun Code Beispiel
def data_preparation(dataset: pd.DataFrame, test_size=0.2):
    # preform processing on the dataset
    dataset = clean_df(dataset).dropna(how="any", axis="rows")
    dataset = dataset.drop(columns=["key", "pickup_datetime"])
    train, test = train_test_split(dataset, test_size=test_size)
    return train, test, "fare_amount"

Wenn du die Funktion ausführst und die URL oder den Pfad des Eingabedatenobjekts angibst (eine Datei, ein entferntes Objekt oder einen komplexen Typ), wird es automatisch in die Funktion geladen. Zum Beispiel mit AWS Boto-Treibern für den Zugriff auf S3-Objekte oder BigQuery-Treibern für den Zugriff auf eine BigQuery-Tabelle. Dann werden die Daten in das akzeptierende Format (DataFrame) umgewandelt und in den Nutzercode injiziert.

MLRun kann den Typ des zurückgegebenen Wertes automatisch erkennen (zum Beispiel sind train und test vom Typ DataFrame) und ihn in der besten Form speichern, zusammen mit automatisch generierten Metadaten, Links zu den Auftragsdetails und Dateneingabeobjekten sowie Informationen zur Versionierung. Wenn sich die Daten wiederholen, werden sie dedupliziert und nur einmal gespeichert, um Speicherplatz zu sparen.

Datenobjekte werden in der Benutzeroberfläche und im Client-SDK typspezifisch visualisiert, unabhängig davon, wie und wo sie gespeichert sind, z. B. tabellarische Formate mit Tabellenmetadaten (Schema, Statistiken und mehr) für Datensätze oder interaktive Grafiken für Diagrammobjekte (siehe Abbildungen 4-8 und 4-9).

Zusammenfassend lässt sich sagen, dass MLRun ein komplettes MLOps-Orchestrierungs-Framework ist, dessen Schwerpunkt auf der Datenverwaltung, der Bewegung, der Versionierung und der Automatisierung liegt. Darüber hinaus verfügt MLRun über ein umfangreiches Objektmodell, das die verschiedenen Arten von Daten und Ausführungsobjekten (Funktionen, Läufe, Workflows und mehr), ihre Beziehung zueinander und ihre Verwendung abdeckt. MLRun setzt auf Abstraktion und Automatisierung, um den Entwicklungs- und Implementierungsaufwand zu verringern. MLRun ist jedoch keine allgemeine Datenverwaltungs- und Versionsverwaltungslösung, und sein Wert wird maximiert, wenn es im Kontext von MLOps eingesetzt wird.

imle 0408
Abbildung 4-8. Ansicht eines Datensatz-Artefakts in MLRun (mit automatisch generierter Vorschau, Schema und Statistik)
imle 0409
Abbildung 4-9. Visualisierung eines interaktiven Diagramm-Artefakts mit dem MLRun SDK (in Jupyter)

Andere Rahmenwerke

Einige Tools, wie Delta Lake und lakeFS, übernehmen die Versionierung von Data Lakes. Diese Tools sind jedoch nicht auf den ML-Lebenszyklus ausgerichtet und müssen möglicherweise integriert werden, damit sie für MLOps nützlich sind.

Cloud-Provider bieten Lösungen an, die in der Regel eng mit ihren internen Diensten verbunden sind. Siehe zum Beispiel Amazon SageMaker ML Lineage Tracking und Azure ML Datasets.

Datenaufbereitung und -analyse im großen Maßstab

Die Datenverarbeitung wird in den Daten-, ML- und Anwendungspipelines ausgiebig genutzt. Bei der Arbeit mit Produktionsdaten ist es notwendig, eine größere Skalierung und Leistung zu unterstützen und in einigen Fällen die Daten in Echtzeit zu verarbeiten, wenn sie ankommen.

Praktiken, die bei der interaktiven Entwicklung funktionieren, z. B. das Speichern der Daten in einer CSV-Datei und das Einlesen in das Notebook, funktionieren nicht mit Gigabytes oder Terabytes von Daten. Sie erfordern verteilte oder parallele Datenverarbeitungsansätze.

Die allgemeine Architektur für die verteilte Datenverarbeitung ist die gleiche, mit Unterschieden in der Art und Weise, wie die Daten verteilt und gesammelt werden und welche APIs sie verwenden. Zum Beispiel werden die Daten auf mehrere Computerknoten verteilt, die Verarbeitungsanfragen oder Abfragen kommen bei einem oder mehreren Knoten zur lokalen Verarbeitung an und die Ergebnisse werden gesammelt und zu einer einzigen Antwort zusammengeführt. Darüber hinaus können komplexe Abfragen Daten zwischen den Knotenpunkten verschieben oder mehrere Verarbeitungs- und Bewegungsschritte ausführen.

Abbildung 4-10 zeigt, wie die verteilte Datenverarbeitung mit dem Map-Reduce-Ansatz zum Zählen von Wörtern in einem Dokument funktioniert.

imle 0410
Abbildung 4-10. Verteilte Wortzählung mit Map-Reduce-Architektur (Quelle: O'Reilly)

Strukturierte und unstrukturierte Datenumwandlungen

Daten können strukturiert sein, das heißt, sie entsprechen einem bestimmten Format oder einer bestimmten Struktur und haben oft ein vordefiniertes Schema oder Datenmodell. Strukturierte Daten können eine Datenbanktabelle oder Dateien mit einem strukturierten Layout sein (z. B. CSV, Excel, JSON, ML, Parquet). Die meisten Daten auf der Welt sind jedoch unstrukturiert, meist komplexer und schwieriger zu verarbeiten als strukturierte Daten. Dazu gehören freie Texte, Protokolle, Webseiten, Bilder, Videos und Audiodaten.

Hier sind einige Beispiele für analytische Transformationen, die an strukturierten Daten vorgenommen werden können:

Filtern

Auswahl einer Teilmenge der Daten nach bestimmten Kriterien, z. B. nach einem bestimmten Datumsbereich oder bestimmten Werten in einer Spalte.

Sortieren

Ordnen der Daten anhand einer oder mehrerer Spalten, z. B. Sortieren nach Datum oder nach einem bestimmten Wert.

Gruppierung

Organisieren der Daten in Gruppen, die auf einer oder mehreren Spalten basieren, z. B. nach Produktkategorie oder Stadt gruppieren.

Aggregation

Berechnung von zusammenfassenden Statistiken, wie z. B. Anzahl, Summe, Durchschnitt, Maximum und Standardabweichung für eine oder mehrere Spalten.

Beitritt zu

Kombinieren von Daten aus mehreren Tabellen oder Datensätzen auf der Basis gemeinsamer Spalten, z. B. das Verbinden einer Tabelle mit Verkaufsdaten mit einer Tabelle mit Kundendaten.

Mapping

Zuordnung von Werten aus einer oder mehreren Spalten zu neuen Spaltenwerten mithilfe von benutzerdefinierten Operationen oder Code. Die zustandsabhängige Zuordnung kann neue Werte auf der Grundlage der ursprünglichen Werte und der akkumulierten Zustände aus älteren Einträgen (z. B. die seit der letzten Anmeldung vergangene Zeit) berechnen.

Zeitreihenanalyse

Analysieren oder Aggregieren von Daten im Laufe der Zeit, um z.B. Trends, Muster oder Anomalien zu erkennen.

Die folgenden Techniken können verwendet werden, um unstrukturierte Daten zu verarbeiten oder sie in strukturierte Daten umzuwandeln:

Text Mining

Der Einsatz von NLP-Techniken, um Bedeutung und Erkenntnisse aus Textdaten zu gewinnen. Text Mining kann Informationen wie Stimmungen, Entitäten und Themen aus Textdaten extrahieren.

Computer Vision

Die Verwendung von Bild- und Videoverarbeitungstechniken, um Informationen aus visuellen Daten zu gewinnen. Computer Vision kann Informationen wie Objekterkennung, Gesichtserkennung und Bildklassifizierung extrahieren.

Audio- und Spracherkennung

Nutzung von Sprache-zu-Text- und Audioverarbeitungstechniken, um Bedeutung und Erkenntnisse aus Audiodaten zu gewinnen. Audio- und Spracherkennung kann Informationen wie Sprache-zu-Text, Stimmung und Sprecheridentifikation extrahieren.

Datenextraktion

Techniken wie Web-Scraping und Datenextraktion nutzen, um strukturierte Daten aus unstrukturierten Datenquellen zu extrahieren.

Mit verschiedenen ML-Methoden können zum Beispiel Rohdaten in aussagekräftigere Daten umgewandelt werden:

Clustering

Gruppierung ähnlicher Datenpunkte auf der Grundlage bestimmter Merkmale, wie z. B. Kunden mit ähnlichen Kaufgewohnheiten

Dimensionalitätsreduktion

Reduzierung der Anzahl von Merkmalen in einem Datensatz, um ihn leichter analysieren oder visualisieren zu können

Regression und Klassifizierung

Vorhersage einer Klasse oder eines Wertes auf der Grundlage anderer Datenmerkmale

Anrechnung

Bestimmung des erwarteten Wertes auf der Grundlage anderer Datenpunkte bei fehlenden Daten

einbetten

Die Darstellung einer Text-, Audio- oder Bildsequenz als numerischer Vektor, der die semantischen Beziehungen oder kontextuellen Merkmale beibehält.

Verteilte Datenverarbeitungsarchitekturen

Datenverarbeitungsarchitekturen können in drei Hauptkategorien unterteilt werden:

Interaktiv

Eine Anfrage oder ein Update kommt an, wird verarbeitet und eine Antwort wird sofort zurückgegeben; zum Beispiel SQL- und NoSQL-Datenbanken, Data Warehouses, Key/Value-Stores, Graph-Datenbanken, Zeitreihen-Datenbanken und Cloud-Dienste.

Batch

Ein Auftrag wird auf eine Anfrage hin oder zu einem geplanten Zeitpunkt gestartet, die Daten werden abgeholt und verarbeitet und die Ergebnisse nach Abschluss in die Zielspeicherung geschrieben. Batch-Aufträge benötigen in der Regel eine längere Bearbeitungszeit. Beispiele für Frameworks zur Batch-Datenverarbeitung sind Hadoop, Spark und Dask.

Streaming

Kontinuierliche Verarbeitung von eingehenden Anfragen oder Datenpaketen und Schreiben der Ergebnisse in Echtzeit in eine Zielspeicherung oder eine Nachrichtenwarteschlange.

Die Batch-Verarbeitung ist in der Regel effizienter für die Verarbeitung großer Datenmengen. Die interaktive und die Stream-Datenverarbeitung liefern jedoch schnellere Antworten mit kürzeren Verzögerungen. Außerdem ist der Aufbau von Pipelines für die Datenstromverarbeitung in der Regel komplexer als Batch-Aufträge.

Einige Frameworks wie Spark unterstützen zwar verschiedene Verarbeitungsmethoden (interaktiv, Batch, Streaming), sind aber in der Regel nur für eine der Verarbeitungsmethoden optimal.

Interaktive Datenverarbeitung

Von interaktiven Systemen wird erwartet, dass sie sofort reagieren, damit der anfragende Client oder das interaktive Dashboard nicht warten muss. Außerdem können Produktionsdienste auf die Zuverlässigkeit und Robustheit der Ergebnisse angewiesen sein. Aus diesem Grund haben interaktive Systeme einfache APIs mit begrenzten Datenoperationen. In einigen Fällen bieten interaktive Systeme Mechanismen zur Definition eigener Logik durch gespeicherte Prozeduren und benutzerdefinierte Funktionen (UDFs).

Der Hauptunterschied zwischen den verschiedenen Arten von interaktiven Datensystemen ist die Art und Weise, wie sie Daten indizieren und speichern, um die Abfragezeit zu minimieren. NoSQL-, In-Memory- und Key/Value-Stores zum Beispiel sind für die Abfrage nach einem Indexschlüssel (z. B. Benutzer-ID, Produkt-ID usw.) optimiert. Die Daten werden durch den Schlüssel (oder einen Krypto-Hash oder den Schlüssel) geteilt und in verschiedenen Knotenpunkten gespeichert. Wenn eine Anfrage eintrifft, wird sie an den entsprechenden Knoten weitergeleitet, der die Daten für diesen Schlüssel (Benutzer, Produkt usw.) verwaltet und die Antwort schnell berechnen und abrufen kann. Komplexe oder schlüsselübergreifende Berechnungen hingegen erfordern die Koordination zwischen allen Knotenpunkten und dauern viel länger.

Analytische Datenbanken und Data Warehouses sind darauf ausgelegt, viele Datensätze mit unterschiedlichen Indexschlüsselwerten zu durchsuchen. Sie organisieren die Daten in Spalten (nach Feldern) und verwenden verschiedene spaltenbezogene Komprimierungstechnologien sowie Filter- und Hinting-Tricks (wie Bloom-Filter), um Datenblöcke zu überspringen.

Andere Systeme wie Zeitreihen- oder Graphdatenbanken verfügen über fortschrittlichere Datenlayouts und Suchstrategien, die multidimensionale Indizes und spaltenbezogene Komprimierung kombinieren. Der Zugriff auf das metrische Objekt der Zeitreihe erfolgt zum Beispiel über den metrischen Schlüssel (Name) und die Verwendung von Technologien zur spaltenweisen Komprimierung, um die einzelnen Werte (nach Zeit) zu durchsuchen oder zu aggregieren.

Viele interaktive Systeme verwenden die SQL-Sprache oder eine SQL-ähnliche Semantik, um Daten zu verarbeiten.

Einige Unterkategorien nennenswerter Datensysteme sind in Tabelle 4-1 aufgeführt.

Tabelle 4-1. Kategorien und Beschreibungen der Datensysteme
Kategorie Beschreibung

Relationale

Speichern strukturierter Daten, Zugriff über SQL-Befehl. Beispiele sind MySQL, PostgreSQL, Oracle und Microsoft SQL Server.

NoSQL

Beispiele dafür sind MongoDB, Cassandra, Redis, Elasticsearch, AWS DynamoDB, Google BigTable und nicht-tabellarische Datenbanken.

Zeitreihen

Speichern und Abfragen von Zeitreihendaten. Beispiele sind InfluxDB, Prometheus und TimescaleDB.

Grafik

Speichern und Abfragen von Daten in einem Graphenformat. Beispiele sind Neo4j und Titan.

Vektor

Eine Vektordatenbank indiziert und speichert hochdimensionale Vektoreinbettungen für ein schnelles Wiederfinden und eine Ähnlichkeitssuche. Beispiele sind Chroma, Pinecone, Milvus, Weaviate und Pgvector.

Analytische Systeme durchlaufen und verarbeiten in der Regel größere Datensätze. Daher unterstützen sie umfangreichere Transformationen (Filtern, Gruppieren, Verbinden, Aggregieren, Zuordnen usw.) und benutzerdefinierte Funktionen. Außerdem können einige Lösungen Daten aus anderen Datenbanken oder aus Dateien verarbeiten und aggregieren. Lösungen wie Spark SQL oder PrestoDB haben zum Beispiel Konnektoren zu vielen Datenquellen und können Abfragen verarbeiten, die viele Datensätze umfassen und in verschiedenen Systemen gespeichert sind.

Eine der beliebtesten verteilten SQL-basierten Analyse-Engines ist PrestoDB und ihr Nachfolgeprojekt Trino. Presto wurde ursprünglich von Facebook entwickelt und als Open Source zur Verfügung gestellt. Später wurde es in Projekte wie Trino und in kommerzielle Produkte wie den Amazon Athena Cloud Service abgezweigt. Trino hat eine lange Liste von Datenkonnektoren.

Abbildung 4-11 zeigt die Architekturen von Presto und Trino. Die Anfragen kommen über HTTP-Anfragen, werden geparst und vom Planer und dem Zeitplannungsprogramm in kleinere Aufgaben zerlegt, die von den einzelnen Workern bearbeitet und zusammengeführt werden.

imle 0411
Abbildung 4-11. PrestoDB und Trino Architektur (Quelle: Presto)

Batch-Datenverarbeitung

Die Stapelverarbeitung wird verwendet, wenn große Datenmengen verarbeitet und eine Reihe von Datenumwandlungen durchgeführt werden müssen und die Verarbeitungszeit weniger wichtig ist. Bei der Stapelverarbeitung werden die Daten gelesen und in Stücke zerlegt, die an mehrere Worker zur Verarbeitung weitergegeben werden. Sobald das Ergebnis fertig ist, wird es in das Zielsystem geschrieben. Die Stapelverarbeitung wird häufig eingesetzt, um große Mengen historischer Daten zu verarbeiten und den Datensatz für das Training von ML-Modellen zu erzeugen.

Eines der bekanntesten Batch-Datenverarbeitungs-Frameworks war Apache Hadoop, ein Open Source Software-Framework für verteilte Speicherung und groß angelegte Verarbeitung datenintensiver Aufgaben. Hadoop wurde ursprünglich von Yahoo! Ingenieuren entwickelt und basiert auf dem MapReduce-Programmiermodell, das aus zwei Hauptfunktionen besteht: Map und Reduce. Die Funktion Map verarbeitet einen Eingabedatensatz zu einer Reihe von Schlüssel-Wert-Paaren, die dann nach Schlüssel gruppiert und von der Funktion Reduce verarbeitet werden, um die endgültige Ausgabe zu erzeugen.

Inzwischen wurde Hadoop durch modernere und Cloud-native Architekturen ersetzt, die auf Cloud-Objektspeicherung, containerisierter Infrastruktur und Berechnungsframeworks wie Spark, Flink, Beam, Dask und anderen basieren.

Eine alltägliche Anwendung für die Stapelverarbeitung ist die ETL-Verarbeitung. ETL bedeutet, Daten aus verschiedenen Quellen zu extrahieren, umzuwandeln und in eine Zieldatenbank, ein Data Warehouse oder einen Data Lake zu laden. ETL ist ein wichtiger Schritt im Datenintegrationsprozess, da er es Unternehmen ermöglicht, Daten aus verschiedenen Quellen zu extrahieren, zu bereinigen und in ein einziges, zentrales Repository umzuwandeln.

Batch-Verarbeitungspipelines können komplex sein und mehrere Schritte und Abhängigkeiten aufweisen. Apache Airflow ist eines der beliebtesten Open-Source-Frameworks für die Erstellung, Planung und Überwachung von Batch-Datenpipelines.

Airflow wurde ursprünglich von Airbnb entwickelt und wird jetzt von der Apache Software Foundation gepflegt. Es bietet eine einfache und leicht zu bedienende Schnittstelle zur Definition von Workflows als DAGs von Aufgaben, wobei jede Aufgabe einen einzelnen Verarbeitungsschritt darstellt. Die Aufgaben können in Python geschrieben werden und in verschiedenen Umgebungen ausgeführt werden, z. B. lokal, über Kubernetes oder in der Cloud.

Airflow bietet außerdem eine webbasierte Benutzeroberfläche (siehe Abbildung 4-12) für die Verwaltung und Überwachung von Workflows, mit der du den Status der einzelnen Aufgaben einsehen, fehlgeschlagene Aufgaben wiederholen und Aufgaben manuell auslösen oder planen kannst. Sie enthält außerdem Funktionen zur Verwaltung und Organisation von Workflows, wie z. B. die Definition von Abhängigkeiten zwischen Aufgaben und die Einrichtung einer Logik zur Wiederholung von Aufgaben.

imle 0412
Abbildung 4-12. Airflow Benutzeroberfläche

Beispiel 4-5 ist ein Beispiel für Python-Code, der verwendet werden kann, um eine DAG in Apache Airflow zu erstellen, die Daten aus einer CSV-Datei liest, sie verarbeitet und an ein Ziel schreibt.

Beispiel 4-5. Code-Beispiel für die Luftstromdaten-Pipeline
import csv
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def process_data(**kwargs):
    ti = kwargs['ti']
    input_file = ti.xcom_pull(task_ids='read_file')
    processed_data = do_data_processing(input_file)
    return processed_data

def do_data_processing(input_file):
    # Placeholder function that performs data processing
    processed_data = input_file
    return processed_data

def read_csv_file(file_path):
    with open(file_path, 'r') as file:
        reader = csv.reader(file)
        return list(reader)

def write_csv_file(file_path, data):
    with open(file_path, 'w') as file:
        writer = csv.writer(file)
        writer.writerows(data)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'data_processing_dag',
    default_args=default_args,
    description='A DAG that reads data from a CSV file, processes it'
                ', and writes it to a destination',
    schedule_interval=timedelta(hours=1),
)

read_file = PythonOperator(
    task_id='read_file',
    python_callable=lambda: read_csv_file('/path/to/input_file.csv'),
    xcom_push=True,
    dag=dag,
)

process_data = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    provide_context=True,
    dag=dag,
)

write_file = PythonOperator(
    task_id='write_file',
    python_callable=lambda: write_csv_file('/path/to/output_file.csv',
                                           ti.xcom_pull(task_ids='process_data')),
    provide_context=True,
    dag=dag,
)

read_file >> process_data >> write_file

Es gibt mehrere Cloud-basierte Batch-Daten-Pipeline-Dienste wie AWS Glue, Google Cloud Composer (basierend auf Airflow) und Azure Data Factory.

Einer der Nachteile von Hadoop oder anderen Batch-Pipelines ist, dass die Daten bei jedem Schritt von der Festplatte gelesen, verarbeitet und wieder auf die Festplatte geschrieben werden müssen. Frameworks wie Spark und Dask wissen jedoch, wie man die Verarbeitungspipeline zu einem optimalen Graphen kompiliert, bei dem die Aufgaben nach Möglichkeit im Speicher ausgeführt werden, was die IO auf der Festplatte minimiert und die Leistung maximiert.

Beispiel 4-6 demonstriert einen Spark-Code, der eine CSV-Datei liest, die Daten verarbeitet und das Ergebnis in eine Zieldatei schreibt.

Beispiel 4-6. PySpark Daten-Pipeline Codebeispiel
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("SimpleBatchProcessing").getOrCreate()

# Load a CSV file into a Spark DataFrame
df = spark.read.csv("/path/to/input_file.csv", header=True, inferSchema=True)

# Perform some data processing on the DataFrame
processed_df = df.groupBy("column_name").agg({"column_name": "mean"})

# Write the processed DataFrame to a new CSV file
processed_df.write.csv("/path/to/output_file.csv", header=True)

# Stop the Spark session
spark.stop()

Beispiel 4-7 zeigt die gleiche Aufgabe, implementiert mit Dask. Der Vorteil von Dask ist, dass die Operationen denen von Python pandas sehr ähnlich sind, was ein enormer Vorteil für Datenwissenschaftler ist. Allerdings ist Spark in der Regel skalierbarer und robuster.

Beispiel 4-7. Dask Daten-Pipeline Codebeispiel
import dask.dataframe as dd

# Load a CSV file into a Dask DataFrame
df = dd.read_csv('/path/to/input_file.csv')

# Perform some data processing on the DataFrame
processed_df = df.groupby('column_name').column_name.mean().compute()

# Write the processed DataFrame to a new CSV file
processed_df.to_csv('/path/to/output_file.csv', index=False)

Du kannst sehen, dass die Beispiele für Spark und Dask viel einfacher sind als die für Airflow. Für die Verwaltung und Verfolgung langer, komplexer Aufträge kann Airflow jedoch besser geeignet sein.

Stream-Verarbeitung

Stream Processing ermöglicht eine skalierbare, fehlertolerante und Echtzeit-Datenverarbeitung. Sie wird häufig in Anwendungen eingesetzt, die große Datenmengen in Echtzeit verarbeiten, wie z. B. Echtzeit-Analysen, Betrugserkennung oder Empfehlungen.

Bei der Stream-Verarbeitung werden die Daten und eingehenden Ereignisse in einen Stream (eine Warteschlange) geschoben und von einem oder mehreren Workern gelesen. Die Worker verarbeiten die Daten nacheinander, führen Transformationen durch, fassen die Ergebnisse zusammen und schreiben die Ergebnisse in eine Datenbank oder einen Ausgabestrom. Im Gegensatz zu traditionellen Warteschlangen erfolgt die Stream-Verarbeitung in der Reihenfolge. Nehmen wir zum Beispiel an, der Stream enthält zwei Ereignisse: eines für die Kundenanmeldung und eines für die Kundenabmeldung. Werden sie nicht in der richtigen Reihenfolge verarbeitet, kann dies zu einem fehlerhaften Zustand führen. Ein anderes Beispiel ist eine Geldeinzahlung, gefolgt von einer Abhebung. Die Abhebung kann abgelehnt werden, wenn die Vorgänge in der falschen Reihenfolge bearbeitet werden.

Streams sind so konzipiert, dass sie skalierbar sind. Sie sind in Partitionen unterteilt, und jede Partition verarbeitet einen bestimmten Satz von Datenobjekten, so dass sie die Reihenfolge nicht verletzen. Ein Stream mit Benutzeraktivitäten wird zum Beispiel nach der Benutzer-ID aufgeteilt, damit die Aktivitäten eines bestimmten Benutzers immer in derselben Partition gespeichert und von demselben Worker verarbeitet werden.

Streams wie Kafka, AWS Kinesis und andere unterscheiden sich von Nachrichtenwarteschlangen wie RabbitMQ, AMQP, Amazon SQS, Google Pub/Sub und so weiter. Nachrichtenwarteschlangen garantieren keine Reihenfolge der Nachrichten. Sie garantieren jedoch die zuverlässige Zustellung von Nachrichten, während bei Streams der Client für die Zuverlässigkeit sorgt. Außerdem sind sie aufgrund der einfacheren Logik und der Parallelität, die Streams bieten, viel schneller.

Abbildung 4-13 zeigt eine Streaming-Anwendung, in der Clients Daten veröffentlichen, die auf die einzelnen Partitionen verteilt werden (basierend auf einem Hash des Partitionsschlüssels). Ein Worker liest aus jeder Partition und verarbeitet die Daten. Der Worker kann eine Datenbank verwenden, um den Status in bekannten Intervallen zu speichern (Checkpoints), so dass der Status im Falle eines Ausfalls wiederhergestellt werden kann, oder der Worker kann ungenutzten Speicher freigeben. Schließlich können die Ergebnisse in eine Zieldatenbank oder einen Ausgabestrom geschrieben werden.

Streams bieten eine "At-least-once-Semantik". Daher kann dieselbe Nachricht mehrmals vorkommen. Eine Möglichkeit, die "Exact Once"-Semantik zu gewährleisten (dieselbe Nachricht wird nur einmal verarbeitet), ist die Verwendung von Checkpoints. Die Streams werden der Reihe nach verarbeitet und der Status kann nach jedem Micro-Batch persistiert werden. Im Falle eines Ausfalls kann der Worker die letzten Checkpoint-Daten (Status) wiederherstellen, die Ereignisse ab diesem Zeitpunkt verarbeiten und ältere Ereignisse ignorieren.

imle 0413
Abbildung 4-13. Architektur der Streaming-Anwendung

Stream Processing Frameworks

Echtzeit-Analysen mit Echtzeit-Streams unterscheiden sich von Batch- oder SQL-Analysen. Bei Streams können die Worker die Daten nur einmal in sequentieller Reihenfolge durchgehen und einen Teil der Daten (in derselben Partition) sehen. Aus diesem Grund konzentrieren sich Echtzeit-Analyse-Frameworks wie Spark Streaming, Apache Flink, Apache Beam, Apache NiFi und andere auf die Stream-Verarbeitung und implementieren die Standard-Analyse- und Statistikmethoden auf eine Stream-optimierte Weise.

Ein typisches Szenario bei der Stream-Verarbeitung ist die Aggregation von Werten über einen bestimmten Zeitraum, z. B. die Untersuchung des Gesamtwerts der Kundentransaktionen der letzten Stunde, um Betrug zu erkennen. Es ist nicht machbar, die Gesamtsumme für jedes neue Ereignis mit Stream Processing zu berechnen. Das würde eine beträchtliche Menge an Zeit und Speicherplatz benötigen. Stattdessen werden die Werte in gefensterte Buckets gruppiert, z. B. sechs Buckets oder mehr, die jeweils den Gesamtwert pro 10 Minuten enthalten. Der Prozess summiert nur die Werte der letzten sechs Buckets und lässt den ältesten Bucket alle 10 Minuten fallen. Abbildung 4-14 zeigt überlappende Schiebefenster mit einer einminütigen Fensterdauer und 30-sekündigen Fensterperioden.

imle 0414
Abbildung 4-14. Schiebefenster (Quelle: Apache Beam)

Beispiel 4-8 zeigt den Apache Beam-Code für die Definition eines solchen Fensters.

Beispiel 4-8. Definieren des Schiebefensters mit Apache Beam
from apache_beam import window
sliding_windowed_items = (
    items | 'window' >> beam.WindowInto(window.SlidingWindows(60, 30)))

Die Codierung mit Stream-Processing-Frameworks erfordert fortgeschrittene Kenntnisse in der Datentechnik. Aus diesem Grund meiden viele Nutzer Echtzeitdaten, obwohl sie einen viel besseren Geschäftswert und genauere Modellbewertungsergebnisse liefern können. Feature Stores können hier Abhilfe schaffen, denn sie können die Batch- und die Streaming-Pipeline automatisch aus derselben übergeordneten Datenverarbeitungslogik generieren.

Feature Stores

Feature Stores sind eine Fabrik und ein zentrales Repository für Machine Learning Features. Feature Stores übernehmen die Sammlung von Rohdaten aus verschiedenen Quellen, die Transformationspipeline, die Speicherung, Katalogisierung, Versionierung, Sicherheit, Bereitstellung und Überwachung. Sie automatisieren viele der in diesem Kapitel beschriebenen Prozesse, beschleunigen die Produktionszeit und reduzieren den Entwicklungsaufwand. Feature Stores bilden einen gemeinsamen Katalog produktionsreifer Funktionen, ermöglichen die Zusammenarbeit und den Austausch zwischen Teams und beschleunigen die Innovation und Bereitstellung neuer KI-Anwendungen.

Die ersten Feature Stores wurden von großen Dienstleistern wie Uber, Twitter und Spotify implementiert. Bei diesen Anbietern ist KI das Herzstück des Geschäfts, und Feature Stores halfen ihnen, die Entwicklung und den Einsatz neuer KI-Anwendungen zu beschleunigen und die Zusammenarbeit und Wiederverwendung zu verbessern. Heute gibt es zahlreiche kommerzielle und Open-Source-Implementierungen, aus denen du wählen kannst.

Erweiterte Feature Stores bieten die folgenden Möglichkeiten:

Datenkonnektivität

Problemlose Integration mit mehreren Offline- (Data Lakes, Data Warehouses, Datenbanken usw.) und Online-Quellen (Streams, Message Queues, APIs, Managed Services usw.).

Offline- und Online-Transformation

Einige Feature Stores bieten die Möglichkeit, die Batch- und Streaming-Pipelines automatisch aus der übergeordneten Logik zu erstellen und zu verwalten.

Speicherung

Speicherung der generierten Merkmale in einem Offline-Speicher (z. B. einem Objektspeicher) und einem Online-Speicher (normalerweise eine Schlüssel/Wert-Datenbank).

Metadaten-Management

Automatisches Erzeugen, Speichern und Verwalten aller Metadaten von Merkmalen, einschließlich Abstammung, Schemata, Statistiken, Beschriftungen und mehr.

Versionierung

Du verwaltest mehrere Versionen jedes Features und den Prozess der Überführung von Features aus der Entwicklung in die Produktion und integrierst sie in CI/CD.

Erzeugen und Verwalten von Feature-Vektoren

Korrektes Zusammenfügen mehrerer Merkmale zu einem einzigen Datensatz für die Verwendung in Trainings- oder Serviceanwendungen.

Zentrale Katalogisierung

Zentraler Zugriff auf Funktionen zum Erstellen, Beschriften oder Suchen.

Sicherheit und Governance

Kontrolle des Zugriffs auf Merkmale und Rohdaten und Protokollierung des Merkmalszugriffs.

Benutzerfreundliches UI und SDK

Einfacher Zugang über APIs und eine Benutzeroberfläche, um die unterschwellige Komplexität zu abstrahieren, Funktionen zu visualisieren und sie für Datenwissenschaftler nutzbar zu machen.

Überwachung und hohe Verfügbarkeit

Überwache die Anlagen und Datenverarbeitungsaufgaben automatisch und stelle sie bei Ausfällen zuverlässig wieder her.

Validierung und Analyse von Merkmalen

Ausführung verschiedener Datenverarbeitungsaufgaben automatisch oder auf Initiative des Nutzers, um die Korrektheit von Merkmalen zu überprüfen oder eine tiefgehende Analyse von Merkmalen, Korrelationen usw. zu erstellen.

Bevor du dich für einen Feature Store entscheidest, solltest du seine Funktionen gründlich vergleichen. Viele haben zum Beispiel nur eine sehr eingeschränkte Funktionalität, konzentrieren sich auf die Katalogisierung von Merkmalen oder verfügen nicht über automatisierte Transformationen, Datenmanagement im großen Maßstab und Echtzeitfunktionen. Diese Funktionen bieten den größten Nutzen, um die Produktionszeit zu verkürzen.

Architektur und Nutzung des Feature Stores

Abbildung 4-15 veranschaulicht die allgemeine Architektur und Nutzung eines Feature Stores. Rohdaten werden aufgenommen und in Features umgewandelt, und die Features werden katalogisiert und für verschiedene Anwendungen bereitgestellt (Training, Servicing, Monitoring). Über APIs und eine Benutzeroberfläche können Datenwissenschaftler, Dateningenieure und ML-Ingenieure Features aktualisieren, suchen, überwachen und nutzen.

Die wichtigsten Komponenten eines Feature Stores sind:

Transformationsschicht

Wandelt Offline- oder Online-Rohdaten in Merkmale um und speichert sie sowohl in einem Online- (Schlüssel/Wert) als auch in einem Offline-Speicher (Objekt).

Ebene der Speicherung

Speichert mehrere Versionen eines Features in Feature-Tabellen (Feature-Sets) und verwaltet den Lebenszyklus der Daten (Erstellen, Anhängen, Löschen, Überwachen und Sichern der Daten). Die Datenschicht speichert jedes Feature in zwei Formen: offline für Training und Analyse und online für die Bereitstellung und Überwachung.

Feature Retrieval

Nimmt Anfragen nach mehreren Merkmalen (Merkmalsvektoren) und anderen Eigenschaften (z. B. Zeitbereiche und Ereignisdaten) entgegen und erstellt einen Offline-Datenschnappschuss für das Training oder einen Echtzeitvektor für die Bereitstellung.

Metadatenmanagement und Katalogisierung

Speichert die Merkmalsdefinition, Metadaten, Beschriftungen und Beziehungen.

imle 0415
Abbildung 4-15. Verwendung und Architektur des Feature Stores

Ingestion und Transformation Service

In diesem Kapitel wurde die Komplexität der Implementierung einer umfangreichen Verarbeitung von Batch- und Echtzeitdaten, der Datenversionierung und des Metadatenmanagements erörtert. Feature Stores zielen darauf ab, diese Komplexität durch Abstraktion und Automatisierung zu reduzieren. In modernen Feature Stores werden Datenpipelines mit Hilfe von High-Level-Transformationslogik beschrieben. Diese Logik wird in die zugrundeliegende Semantik der Verarbeitungsmaschine umgewandelt und als kontinuierlicher und produktionsfähiger Dienst bereitgestellt, was einen erheblichen Entwicklungsaufwand erspart.

Die Pipeline-Implementierung ist unterschiedlich für die lokale Entwicklung (mit Paketen wie Pandas), für große Offline-Daten (mit Batch-Verarbeitung) und für Echtzeit-Daten (mit Stream-Verarbeitung). Der Vorteil eines Feature Stores, der automatisierte Transformationen unterstützt, besteht darin, dass er eine Definition für alle drei Bereitstellungsmodi verwendet und das Reengineering bei der Portierung von Datenpipelines von einer Methode zur anderen entfällt. In einigen Feature Stores wird die Datenpipeline-Technologie von den Datenquellen bestimmt, ob offline (Data Lakes, Data Warehouses, Datenbanken usw.) oder online (Streams, Message Queues, APIs, Managed Services usw.).

Merkmalspeicher implementieren die Datenaufnahme und -umwandlung für Gruppen von Merkmalen (genannt Merkmalsätze oder Merkmalsgruppen), die aus derselben Quelle stammen, z. B. alle Merkmale, die aus einem Kreditkarten-Transaktionsprotokoll extrahiert wurden. Feature-Sets nehmen Daten aus Offline- oder Online-Quellen auf, erstellen durch eine Reihe von Transformationen eine Liste von Merkmalen und speichern die daraus resultierenden Merkmale zusammen mit den zugehörigen Metadaten und Statistiken.

Abbildung 4-16 veranschaulicht den Transformationsdienst (Feature-Set). Sobald die Daten von der Quelle eingelesen wurden, durchlaufen sie einen Graphen (DAG) von Transformationen, und die daraus resultierenden Merkmale werden in die Offline- und Online-Stores geschrieben.

imle 0416
Abbildung 4-16. Beispiel für die Pipeline eines Feature-Transformationsdienstes (Feature-Set)

Beispiele für Transformationen (nach Datentyp):

Strukturiert

Filtern, gruppieren, zusammenführen, aggregieren, OneHot-Kodierung, zuordnen, extrahieren und klassifizieren

Text

Extrahieren, Parsen, Disassemblieren, Erkennen von Entitäten, Stimmungen und Einbettungen

Visuell (Bilder und Videos)

Rahmen, Größe ändern, Objekte erkennen, zuschneiden, neu einfärben, drehen, zuordnen und klassifizieren

Der generierte Transformationsdienst sollte produktionsfähig sein und automatische Skalierung, hohe Verfügbarkeit, Live-Upgrades und mehr unterstützen. Außerdem sollte er eine kontinuierliche Dateneingabe und -verarbeitung unterstützen. Neue Daten können zum Beispiel kontinuierlich (in Echtzeit) oder in geplanten Intervallen (offline) eintreffen. Daher sind serverlose Funktionstechnologien eine hervorragende Lösung.

Merkmal Speicherung

Die Merkmale werden in der Regel in zwei Formen gespeichert: Offline-Speicherung für Schulungs- und Analyseanwendungen und Online-Speicherung für Echtzeit-Serving- und Überwachungsanwendungen. Siehe Abbildung 4-17.

imle 0417
Abbildung 4-17. Speicherung von Merkmalen

Der Offline-Speicher enthält alle historischen Daten und nutzt oft Data Lakes, Objektspeicher oder Data Warehouse-Technologien. Eine gängige Wahl ist zum Beispiel die Verwendung von komprimierten Parquet-Dateien, die in einer Objektspeicherung wie AWS S3 gespeichert werden.

Der Online-Speicher enthält die aktuellsten Daten und verwendet oft NoSQL- oder Key/Value-Speicher wie Redis, AWS DynamoDB, Google BigTable und andere. Der Online-Store muss Lesefunktionen in Millisekunden unterstützen.

Feature Retrieval (für Training und Service)

Anwendungen zum Trainieren, Bedienen und Analysieren benötigen mehrere Merkmale aus verschiedenen Datensätzen und Quellen. Im Gegensatz dazu werden Merkmale in Gruppen (so genannte Merkmalsätze) organisiert, die auf ihrer Herkunft und ihrer Entität (Primärschlüssel wie Benutzer-ID, Produkt-ID usw.) basieren.

Das Abrufen mehrerer Merkmale aus verschiedenen Quellen, zu unterschiedlichen Zeiten und mit unterschiedlichen Indizes kann eine komplexe Analyseaufgabe sein. Feature-Stores bestimmen automatisch die für die JOIN Abfrage erforderlichen Parameter auf der Grundlage der Feature-Metadaten, der Entitätsnamen und der Benutzeranfragedaten. Wenn die Datensätze transaktional sind (die Datensätze sind mit einem Zeitstempel versehen), muss die Join-Operation außerdem die zeitliche Korrektheit und den Zeitverlauf berücksichtigen, damit nur die zum Zeitpunkt des Ereignisses bekannten Werte zurückgegeben werden (auch als Join-Analyseoperation bezeichnet).

Offline-Feature-Sets können durch SQL-Abfragen aus dem Feature Store generiert werden. Bei Echtzeitanwendungen, die innerhalb von Millisekunden reagieren müssen, führt dies jedoch zu einem erheblichen Overhead, sodass andere Echtzeitmethoden verwendet werden. Außerdem können zeitbasierte Merkmale (z. B. die Anzahl der Anfragen in der letzten Stunde) nicht vorberechnet werden und erfordern eine besondere Handhabung, um ein genaues Ergebnis zu erzielen (z. B. durch die Kombination von vorberechneten Zeitfensterdaten und Ad-hoc-Berechnungen der letzten Meile).

Abbildung 4-18 zeigt den Ablauf der Merkmalsabfrage mit zwei separaten Engines, eine für die Offline-Abfrage und die andere für die Echtzeit-Abfrage. Beachte, dass im Fall von Offline der Datensatz als Snapshot oder in einem neuen Datensatz aufbewahrt wird, um die Nachverfolgung der Datenreihenfolge und die Erklärbarkeit zu ermöglichen.

imle 0418
Abbildung 4-18. Feature Retrieval

Die Anfrage get_offline_features kann Ereignisdaten als Grundlage für die Abfrage, einen gültigen Zeitbereich (z. B. wenn wir das Modell auf der Grundlage der Daten des letzten Monats trainieren wollen) und die zurückzugebenden Merkmale und Spalten (z. B. ob Index-, Zeit- oder Label-Spalten enthalten sein sollen) akzeptieren. Dann wird ein lokaler oder serverloser Analyseauftrag gestartet, der die Ergebnisse berechnet und den Merkmalsvektor-Datensatz zurückgibt.

Beim Echtzeitabruf initialisiert das System den Abrufdienst (indem es einmalig eine lokale oder entfernte Echtzeit-Analysefunktion konfiguriert, um Zeit bei den Anfragen zu sparen). Dann werden die Benutzeranfragen mit den Entitätsschlüsseln (aus den Ereignisdaten) gepusht und ein Ergebnisvektor akzeptiert. Darüber hinaus ermöglichen einige Feature-Stores die Echtzeit-Imputation (Ersetzen fehlender oder NaN-Daten durch statistische Feature-Werte aus den Feature-Metadaten).

Feature Stores Lösungen und Anwendungsbeispiele

Feature Stores begannen als interne Plattformen bei führenden Cloud-Providern (wie Uber, Spotify und Twitter). Inzwischen gibt es aber viele Open-Source- und kommerzielle Feature-Store-Lösungen auf dem Markt. Wie bei jeder wichtigen neuen Technologie gibt es jedoch auch bei diesen Lösungen viele Unterschiede in der Funktionalität, die du kennen solltest, um die richtige Lösung zu wählen.

Der wichtigste Unterschied ist, ob die Feature-Store-Plattform die Datenpipeline (Transformation) für dich verwaltet und ob sie sowohl Offline- als auch Echtzeit-Pipelines (Streaming) unterstützt. Wie du in diesem Kapitel gelesen hast, ist der Aufbau und die Verwaltung einer skalierbaren Datenpipeline die größte Herausforderung. Wenn du gezwungen bist, dies manuell zu tun, untergräbt dies den Wert eines Feature Stores erheblich.

Tabelle 4-2 vergleicht die führenden Feature Store-Lösungen:

Tabelle 4-2. Vergleich der Feature-Store-Lösungen
Kategorie Fest Tecton MLRun SageMaker Vertex AI Databricks HopsWorks

Offene Quelle

Ja

Nein

Ja

Nein

Nein

Nein

Ja

Verwaltete Option

Nein

große Wolken

Cloud + On-Premise

auf AWS

auf GCP

große Wolken

Cloud + On-Premise

Offline-Pipelines

Nein

Ja

Ja

Nein

Nein

Nein

Ja

Pipelines in Echtzeit

Nein

Ja

Ja

Nein

Nein

Nein

Nein

Feature Retrieval

Ja

Ja

Ja

Ja

Ja

Ja

Ja

Motoren

Funke

Funke

Python, Dask, Spark, Nuclio

Keine

Funke

Funke

Spark, Flink

Merkmal Analytik

Nein

Ja

Ja

Nein

Nein

Nein

Ja

Versionierung und Abstammung

Nein

Ja

Ja

Nein

Nein

Nein

Ja

Merkmale Sicherheit

Nein

Ja

Ja

Ja

Nein

Nein

Nein

Überwachung

Nein

Ja

Ja

Nein

Nein

Nein

Ja

Leimfreie Ausbildung und Bedienung

Nein

Nein

Ja

Nein

Nein

Nein

Ja

In den folgenden Abschnitten wird gezeigt, wie Feature Stores mit den beiden führenden Open-Source-Frameworks verwendet werden: Feast und MLRun. Beachte, dass MLRun einen größeren Funktionsumfang hat und neben vielen anderen einzigartigen Funktionen auch Offline- und Online-Transformationsdienste (basierend auf den serverlosen Engines von MLRun) bietet.

Feast Feature Store verwenden

Feast bietet keinen Transformationsdienst an. Die Daten sollten im Voraus vorbereitet und in einer unterstützten Quelle (wie S3, GCS, BigQuery) gespeichert werden. Feast registriert den Quelldatensatz und seine Metadaten (Schema, Entität usw.) in einem FeatureView-Objekt, wie in Beispiel 4-9 gezeigt.

Beispiel 4-9. Definieren von Feast FeatureView (Quelle: Feast)
# Read data from parquet files. Parquet is convenient for local development mode.
# For production, you can use your favorite DWH, such as BigQuery. See Feast
# documentation for more info.
driver_hourly_stats = FileSource(
    name="driver_hourly_stats_source",
    path="/content/feature_repo/data/driver_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created",
)

# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])

# Our parquet files contain sample data that includes a driver_id column, timestamps
# and three feature column. Here we define a Feature View that will allow us to serve
# this data to our model online.
driver_hourly_stats_view = FeatureView(
    name="driver_hourly_stats",
    entities=[driver],
    ttl=timedelta(days=1),
    schema=[
        Field(name="conv_rate", dtype=Float32),
        Field(name="acc_rate", dtype=Float32),
        Field(name="avg_daily_trips", dtype=Int64),
    ],
    online=True,
    source=driver_hourly_stats,
    tags={},
)

Feast bietet keinen Online-Transformations- oder Ingestion-Dienst. Stattdessen muss der Nutzer eine Materialisierungsaufgabe ausführen, um die Offline-Features in den Echtzeitspeicher (Datenbank) zu kopieren. Leider bedeutet dies auch, dass die im Online-Speicher gespeicherten Daten zwischen den Materialisierungen ungenau sind, und wenn die Materialisierung zu häufig ausgeführt wird, kann dies zu einem erheblichen Berechnungsaufwand führen.

Ausführen der Materialisierungsaufgabe über das SDK:

store = FeatureStore(repo_path=".")
store.materialize_incremental(datetime.now())

Das Projekt kann eine oder mehrere Merkmalsansichten enthalten, die jeweils unabhängig voneinander definiert und materialisiert werden. Merkmale können aus einer oder mehreren Feature-Ansichten abgerufen werden (dies löst eine JOIN-Operation aus).

Um Offline-Features (direkt von der Offline-Quelle) abzurufen, verwendest du den API-Aufruf get_historical_features() wie in Beispiel 4-10 gezeigt.

Beispiel 4-10. Abrufen von Offline-Features mit Feast (Quelle: Feast)
# The entity dataframe is the dataframe we want to enrich with feature values
# see https://docs.feast.dev/getting-started/concepts/feature-retrieval for details
# for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
    {
        # entity's join key -> entity values
        "driver_id": [1001, 1002, 1003],
        # "event_timestamp" (reserved key) -> timestamps
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
        ],
        # (optional) label name -> label values. Feast does not process these
        "label_driver_reported_satisfaction": [1, 5, 3],
        # values we're using for an on-demand transformation
        "val_to_add": [1, 2, 3],
        "val_to_add_2": [10, 20, 30],
    }
)

store = FeatureStore(repo_path=".")

# retrieve offline features, feature names are specified with <view>:<feature-name>
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print("----- Example features -----\n")
print(training_df.head())

Um Online-Funktionen aus dem Online-Store abzurufen, verwenden wir den API-Aufruf get_online_features(), wie in Beispiel 4-11 gezeigt.

Beispiel 4-11. Abrufen von Online-Features mit Feast (Quelle: Feast)
from pprint import pprint
from feast import FeatureStore

store = FeatureStore(repo_path=".")

feature_vector = store.get_online_features(
    features=[
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
    entity_rows=[
        # {join_key: entity_value}
        {
            "driver_id": 1001,
            "val_to_add": 1000,
            "val_to_add_2": 2000,
        },
        {
            "driver_id": 1002,
            "val_to_add": 1001,
            "val_to_add_2": 2002,
        },
    ],
).to_dict()

pprint(feature_vector)

# results:
{'acc_rate': [0.86463862657547, 0.6959823369979858],
 'avg_daily_trips': [359, 311],
 'conv_rate_plus_val1': [1000.6638441681862, 1001.1511893719435],
 'conv_rate_plus_val2': [2000.6638441681862, 2002.1511893719435],
 'driver_id': [1001, 1002]}

MLRun Feature Store verwenden

MLRun unterstützt die Registrierung bestehender Quellen (wie Feast) oder die Definition einer Datenpipeline zur Umwandlung von Quelldaten in Features. Bei der Definition der Datenpipeline ( Graph genannt) stellt MLRun die ausgewählte Datenverarbeitungs-Engine auf der Grundlage der abstrakten Benutzerdefinitionen bereit. MLRun unterstützt einige Verarbeitungs-Engines, darunter lokales Python, Dask, Spark und Nuclio (eine serverlose Echtzeit-Engine).

In MLRun schreibt die Pipeline standardmäßig in Online- und Offline-Speicher, sodass keine separaten Materialisierungsaufträge erforderlich sind und die Online- und Offline-Funktionen immer synchronisiert sind. Außerdem kann MLRun das Datenschema automatisch erkennen, was es einfacher und robuster macht.

MLRun trennt die Definition des Feature-Sets (eine Sammlung von Features, die von der gleichen Pipeline erzeugt werden) von den Definitionen der Datenquellen. Auf diese Weise kannst du dasselbe Feature Set in der interaktiven Entwicklung und in der Produktion verwenden. Tausche einfach die Datenquelle von einer lokalen Datei in der Entwicklung gegen eine Datenbank oder einen Echtzeit-Kafka-Stream im Produktionseinsatz aus.

Beispiel 4-12 zeigt ein Beispiel für die Definition eines Feature Sets für die Verarbeitung von Kreditkartentransaktionen zur Aufdeckung von Kreditkartenbetrug. Die Definition umfasst die Entität, den Zeitstempel und den Transformationsgraphen mit eingebauten Operatoren und Aggregationen. Beachte, dass ein Nutzer auch eigene Python-Operatoren hinzufügen kann. Siehe das vollständige Beispiel.

Die Datenpipeline besteht aus den folgenden Elementen:

  • Extrahieren der Datenkomponenten (Stunde, Wochentag).

  • Zuordnung der Alterswerte

  • One-Hot-Codierung für die Transaktionskategorie und das Geschlecht

  • Aggregieren des Betrags (Mittelwert, Summe, Anzahl, Maximum über 2/12/24-Stunden-Zeitfenster)

  • Aggregieren der Transaktionen pro Kategorie (über 14-Tage-Zeitfenster)

  • Schreiben der Ergebnisse in Offline- (Parquet) und Online- (NoSQL) Ziele

Beispiel 4-12. MLRun FeatureSet definieren (Quelle: MLRun)
import mlrun.feature_store as fs

# Define the credit transactions FeatureSet
transaction_set = fs.FeatureSet("transactions",
                                entities=[fs.Entity("source")],
                                timestamp_key='timestamp',
                                description="transactions feature set")

# Define and add value mapping
main_categories = ["es_transportation", "es_health", "es_otherservices",
       "es_food", "es_hotelservices", "es_barsandrestaurants",
       "es_tech", "es_sportsandtoys", "es_wellnessandbeauty",
       "es_hyper", "es_fashion", "es_home", "es_contents",
       "es_travel", "es_leisure"]

# One Hot Encode the newly defined mappings
one_hot_encoder_mapping = {'category': main_categories,
                           'gender': list(transactions_data.gender.unique())}

# Define the data pipeline (graph) steps
transaction_set.graph\
    .to(DateExtractor(parts = ['hour', 'day_of_week'],
        timestamp_col = 'timestamp'))\
    .to(MapValues(mapping={'age': {'U': '0'}}, with_original_features=True))\
    .to(OneHotEncoder(mapping=one_hot_encoder_mapping))


# Add aggregations for 2, 12, and 24 hour time windows
transaction_set.add_aggregation(name='amount',
                                column='amount',
                                operations=['avg','sum', 'count','max'],
                                windows=['2h', '12h', '24h'],
                                period='1h')


# Add the category aggregations over a 14 day window
for category in main_categories:
    transaction_set.add_aggregation(name=category,column=f'category_{category}',
                                    operations=['count'], windows=['14d'],
                                        period='1d')

Die Datenpipeline kann mit transaction_set.plot(rankdir="LR", with_targets=True) visualisiert werden, wie in Abbildung 4-19 zu sehen ist.

imle 0419
Abbildung 4-19. Feature Set Plot

Sobald du die Feature-Set-Definition hast, kannst du sie mit derpreview() Methode testen und debuggen, mit der die Datenpipeline lokal ausgeführt wird und du die Ergebnisse sehen kannst:

df = fs.preview(transaction_set, transactions_data)
df.head()

Wenn die Definition des Featuresets abgeschlossen ist, kannst du es als Produktionsjob einsetzen, der bei Bedarf, nach einem bestimmten Zeitplan oder als Echtzeit-Pipeline läuft.

Für die Batch-Ingestion verwendest du die Methode ingest(). Für Echtzeit-Ingestion von HTTP oder Streams verwendest du deploy_ingestion_service_v2(), die eine serverlose Echtzeit-Pipeline von Nuclio startet. Siehe Beispiel 4-13.

Beispiel 4-13. Daten in das MLRun FeatureSet einlesen (Quelle: MLRun)
# Batch ingest the transactions dataset (from CSV file) through the defined pipeline
source = CSVSource("mycsv", path="measurements.csv")
fs.ingest(transaction_set, source=source)

# Deploy a real-time pipeline with HTTP API endpoint as the source
# MLRun support other real-time sources like Kafka, Kinesis, etc.
source = HTTPSource()
fs.deploy_ingestion_service_v2(transaction_set, source)

Du kannst die Feature-Sets, ihre Metadaten und Statistiken im MLRun Feature Store UI beobachten. Siehe Abbildung 4-20.

Das Abrufen von Merkmalen in MLRun erfolgt über das Feature-Vektor-Objekt. Feature-Vektoren enthalten die Definitionen der angeforderten Features und zusätzliche Parameter. Darüber hinaus speichern sie auch berechnete Werte wie die Metadaten der Features, Statistiken usw., die beim Training, bei der Bedienung oder bei Überwachungsaufgaben hilfreich sein können. Feature-Statistiken werden zum Beispiel für die automatische Wertberechnung bei fehlenden oder NaN-Feature-Werten und für die Überwachung der Modellabweichung in der Serving-Anwendung verwendet.

imle 0420
Abbildung 4-20. MLRun FeatureSet in der Benutzeroberfläche

Feature-Vektoren können in der Benutzeroberfläche von MLRun erstellt, aktualisiert und angezeigt werden.

Die Nutzer definieren zunächst den Feature-Vektor und können ihn dann verwenden, um Offline- oder Online-Features zu erhalten. In Beispiel 4-14 siehst du, wie du Offline-Features abrufst und die Methode get_offline_features() verwendest.

Beispiel 4-14. Offline-Features von MLRun abrufen (Quelle: MLRun)
# Define the list of features you will be using (<feature-set>.<feature>)
features = ['transactions.amount_max_2h',
            'transactions.amount_sum_2h',
            'transactions.amount_count_2h',
            'transactions.amount_avg_2h',
            'transactions.amount_max_12h']

# Import MLRun's Feature Store
import mlrun.feature_store as fstore

# Define the feature vector name for future reference
fv_name = 'transactions-fraud'

# Define the feature vector using our Feature Store
transactions_fv = fstore.FeatureVector(fv_name, features,
                                   label_feature="labels.label",
                                   description=
                                       'Predicting a fraudulent transaction')

# Save the feature vector definition in the Feature Store
transactions_fv.save()

# Get offline feature vector as dataframe and save the dataset to a parquet file
train_dataset = fstore.get_offline_features(transactions_fv, target=ParquetTarget())

# Preview the dataset
train_dataset.to_dataframe().tail(5)

Um Echtzeit-Features abzurufen, musst du zunächst einen Dienst definieren (der die Echtzeit-Abrufpipeline initialisiert), gefolgt von .get() Methoden, um Feature-Werte in Echtzeit abzufragen. Die Trennung zwischen der Erstellung des Dienstes (einmalige Initialisierung) und den einzelnen Abfragen sorgt für geringere Abfragelatenzen. Darüber hinaus unterstützt MLRun die automatische Berechnung von Werten auf der Grundlage der Metadaten und Statistiken des Merkmals. Dadurch kann ein erheblicher Entwicklungs- und Berechnungsaufwand eingespart werden. Siehe Beispiel 4-15.

Beispiel 4-15. Online-Funktionen von MLRun abrufen (Quelle: MLRun)
# Create the online feature service, substitute NaN values with
# the feature mean value
svc = fstore.get_online_feature_service('transactions-fraud:latest',
                                    impute_policy={"*": "$mean"})

# Get sample feature vector
sample_fv = svc.get([{'source': 'C76780537'}])

# sample_fv Result
[{'amount_max_2h': 14.68,
  'amount_max_12h': 70.81,
  'amount_sum_2h': 14.68,
  'amount_count_2h': 1.0,
  'amount_avg_2h': 14.68}]
Hinweis

Die Merkmalspeicher von MLRun bieten genaue Echtzeit-Aggregationen und niedrige Latenzzeiten, indem sie vorberechnete Werte während des Ingestionsprozesses mit Echtzeitberechnungen zum Zeitpunkt der Merkmalsanfrage kombinieren.

Das MLRun-Framework bietet eine Modellentwicklungs- und Trainingspipeline, Echtzeit-Serving-Pipelines und eine integrierte Modellüberwachung. Der Funktionsspeicher von MLRun ist nativ in die anderen Komponenten integriert, wodurch überflüssige Glue-Logik, Metadatenübersetzung usw. entfallen und die Zeit bis zur Produktion verkürzt wird.

Fazit

Da die Datenverwaltung und -verarbeitung die wichtigsten Komponenten von ML sind, ist es wichtig zu wissen, wie man datenbezogene Aufgaben optimal erledigt. Dieses Kapitel befasst sich mit den empfohlenen Tools und Praktiken für die verschiedenen Phasen der Arbeit mit deinen Daten. Zu Beginn des Kapitels haben wir uns mit der Versionierung und der Herkunft der Daten befasst, die für die Rückverfolgung des Ursprungs der Daten unerlässlich sind. Dann haben wir uns mit der Datenaufbereitung und -analyse im großen Maßstab befasst, also damit, wie die Daten behandelt werden, damit sie in der Produktion verwendet werden können. In diesem Abschnitt haben wir auch die Architektur interaktiver Datenverarbeitungslösungen und die Unterschiede zwischen Batch-Datenverarbeitung und Echtzeitverarbeitung besprochen.

Nachdem wir die Herausforderungen bei der Umsetzung dieser Praktiken im großen Maßstab besprochen hatten, stellten wir das Konzept der Feature Stores vor, die ein zentrales Repository für ML-Features sind. Wir haben uns mit den Funktionen eines Feature Stores befasst, z. B. mit der Datenkonnektivität und der Offline- und Online-Transformation. Wir haben auch gezeigt, wo der Feature Store in die MLOps-Pipeline passt, von der Aufnahme von Rohdaten bis hin zur Unterstützung der Nutzung dieser Daten beim Training, Servicing, Monitoring und mehr. Zum Schluss haben wir verschiedene Feature-Store-Lösungen und ihre Verwendung vorgestellt.

Kritisches Denken - Diskussionsfragen

  • Welche Details liefern Metadaten? Warum brauchen wir als Datenexperten diese Informationen?

  • Welche Open-Source-Tools zur Datenversionierung gibt es? Welches könnte für dein Unternehmen geeignet sein?

  • Was ist der Unterschied zwischen Stapelverarbeitung und Stream Processing? Wann wird beides verwendet?

  • Wie vereinfacht ein Feature Store die Datenverwaltung und -verarbeitung? Welche Funktionen ermöglichen dies?

  • Was sind die Unterschiede zwischen dem Feast und dem MLRun Feature Store? Welcher könnte für deine Organisation der richtige sein?

Übungen

  • Wähle eine Open-Source-Lösung (DVC, Pachyderm, MLflow oder MLRun) und erstelle ein Skript oder einen Workflow für die Datenversionierung, mit dem du Daten und Metadaten aufzeichnen und versionieren kannst.

  • Erstelle einen Prototyp einer Stapelverarbeitungspipeline mit einem Werkzeug deiner Wahl.

  • Verbinde einen Trino-Datenkonnektor mit einer Datenquelle.

  • Trainiere ein Demomodell (du kannst Hugging Face verwenden, wenn du ein Beispielmodell brauchst) mit einem Feature Store.

  • Erstelle ein Feature Set und eine Ingestion Pipeline in MLRun. Du kannst dieses Projekt als Referenz verwenden.

Get Implementierung von MLOps im Unternehmen now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.