Kapitel 4. Arbeiten mit Daten und Feature Stores
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Beim maschinellen Lernen werden Daten in eine Vorhersagelogik umgewandelt. Daten sind für den Prozess unerlässlich, können aus vielen Quellen stammen und müssen verarbeitet werden, um sie nutzbar zu machen. Daher sind Datenmanagement und -verarbeitung die wichtigsten Komponenten des maschinellen Lernens. Daten können aus verschiedenen Quellen stammen:
- Dateien
-
Daten, die in lokalen oder Cloud-Dateien gespeichert sind
- Data Warehouses
-
Datenbanken mit historischen Datentransaktionen
- Online-Datenbanken
-
SQL-, NoSQL-, Graph- oder Zeitreihendatenbanken mit aktuellen Transaktions- oder Anwendungsdaten
- Datenströme
-
Zwischenspeicherung von Echtzeit-Ereignissen und Nachrichten (für die zuverlässige Weitergabe von Daten zwischen Diensten)
- Online-Dienste
-
Jeder Cloud-Dienst, der wertvolle Daten liefern kann (dazu können soziale, finanzielle, staatliche und Nachrichtendienste gehören)
- Eingehende Nachrichten
-
Asynchrone Nachrichten und Benachrichtigungen, die per E-Mail oder über einen anderen Nachrichtendienst (Slack, WhatsApp, Teams) eingehen können
Die Quelldaten werden verarbeitet und als Merkmale gespeichert, um sie für die Modellschulung und den Modellfluss zu verwenden. In vielen Fällen werden die Merkmale in zwei Speichersystemen gespeichert: eines für den Batch-Zugriff (Training, Batch-Vorhersage usw.) und eines für den Online-Abruf (für Echtzeit- oder Online-Serving). Infolgedessen kann es zwei getrennte Datenverarbeitungspipelines geben, eine mit Stapelverarbeitung und eine mit Echtzeitverarbeitung (Stream).
Die Datenquellen und die Verarbeitungslogik werden sich wahrscheinlich im Laufe der Zeit ändern, was zu Änderungen an den verarbeiteten Merkmalen und dem aus diesen Daten erstellten Modell führt. Daher sind die Versionierung der Daten, die Verarbeitungslogik und die Nachverfolgung der Datenreihenfolge entscheidende Elemente jeder MLOps-Lösung.
Die Bereitstellung genauer und hochwertiger Produktionsmodelle erfordert große Datenmengen und eine hohe Verarbeitungsleistung. Die Verarbeitung von Produktionsdaten kann mit verteilten Analyse-Engines(Apache Spark, Dask, Google BigQuery und anderen), Stream-Processing-Technologien (wie Apache Flink) oder mehrstufigen Datenpipelines skaliert werden.
Einer der Mechanismen zur Automatisierung der Integration mit Datenquellen, der skalierbaren Batch- und Echtzeit-Datenverarbeitung, der Datenversionierung und der Feature-Verwaltung ist die Verwendung eines Feature-Stores. Ein Feature Store ist ein zentraler Knotenpunkt für die Erstellung, den Austausch und die Überwachung von Features. Feature Stores sind in modernen MLOps-Implementierungen unverzichtbar und werden in diesem Kapitel näher beschrieben.
Datenversionierung und Abstammung
Modelle und Datenprodukte werden aus Daten abgeleitet. Das Sammeln von Metadaten und die Rückverfolgung der Herkunft der Daten ermöglichen daher eine bessere Kontrolle und Steuerung der Datenprodukte. Wenn du eine bestimmte Version eines Datenprodukts untersuchen willst, musst du außerdem die ursprünglichen Daten kennen, die zur Erstellung dieses Produkts oder Modells verwendet wurden.
Die Versionierung von Daten und die Verwaltung von Metadaten gehören zu den wichtigsten MLOps-Praktiken, die die folgenden Punkte abdecken:
- Datenqualität
-
Die Rückverfolgung von Daten durch die Systeme einer Organisation und das Sammeln von Metadaten und Informationen über die Herkunft können helfen, Fehler und Unstimmigkeiten zu erkennen. So können Korrekturmaßnahmen ergriffen und die Datenqualität verbessert werden.
- Reproduzierbarkeit und Rückverfolgbarkeit von Modellen
-
Der Zugang zu historischen Daten ermöglicht es uns, die Modellergebnisse zu reproduzieren und kann für die Fehlersuche und das Ausprobieren verschiedener Parametersätze genutzt werden.
- Datenmanagement und Prüfbarkeit
-
Indem sie den Ursprung und die Geschichte der Daten verstehen, können Organisationen sicherstellen, dass die Daten den erwarteten Richtlinien und Vorschriften entsprechen, Fehlerquellen aufspüren und Prüfungen oder Untersuchungen durchführen.
- Compliance
-
Die Datenabfolge kann Organisationen helfen, die Einhaltung von Vorschriften wie GDPR und HIPAA nachzuweisen.
- Einfachere Datenverwaltung
-
Metadaten und Abstammungsinformationen ermöglichen eine bessere Datenerkennung, Mappings, Profilerstellung, Integration und Migrationen.
- Bessere Zusammenarbeit
-
Datenversionierung und -abstammung können die Zusammenarbeit zwischen Datenwissenschaftlern und ML-Ingenieuren erleichtern, indem sie eine klare und konsistente Sicht auf die in ML-Modellen und bei der Handhabung von Upgrades verwendeten Daten ermöglichen.
- Verfolgung von Abhängigkeiten
-
Verstehe, wie jede Daten-, Parameter- oder Codeänderung zu den Ergebnissen beiträgt, und erhalte Einblicke, welche Daten oder Modellobjekte aufgrund von Änderungen an der Datenquelle geändert werden müssen.
Wie es funktioniert
Wie in Abbildung 4-1 dargestellt, kann der Datengenerierungsfluss als eine Reihe von Datenquellen und Parametern abstrahiert werden, die als Input für eine Datenverarbeitungsaufgabe(Berechnung) verwendet werden, die eine Sammlung von Datenprodukten oder Artefakten erzeugt. Die Output-Artefakte können unterschiedlicher Art sein: Dateien, Tabellen, Machine-Learning-Modelle, Diagramme und so weiter.
Das Datenverfolgungssystem erfasst die Informationen über die Eingaben (Datenquellen und -versionen, Parameter) und Berechnungsaufgaben (Code, Pakete, Ressourcen, ausführender Benutzer und mehr). Dann fügt es sie als Metadaten in die Output-Artefakte ein. Die Metadaten können zusätzliche Informationen enthalten, wie z. B. vom Nutzer bereitgestellte Labels oder Tags, Informationen über die Datenstruktur, das Schema, Statistiken usw. Die Metadaten werden in der Regel nicht in jedes Output-Artefakt kopiert, sondern stattdessen referenziert (durch einen Link), um Datendopplungen zu vermeiden.
Wie in Abbildung 4-2 dargestellt, können die Ergebnisse der ersten Aufgabe (z. B. Datenaufbereitung) als Dateninput für die folgenden Aufgaben (z. B. Modelltraining, Testen) verwendet werden.
Wenn wir über eine Benutzeroberfläche oder ein SDK auf ein Datenprodukt zugreifen, können wir anhand der Metadaten die genauen Datenquellen, Parameter und die vollständigen Details der Berechnungsaufgabe sehen. Wir können auch den Fortschritt der in einem mehrstufigen Fluss erzeugten Daten verfolgen und alle zusätzlichen Metadaten untersuchen.
Jedes Mal, wenn die Datenverarbeitungsaufgabe ausgeführt wird, wird eine neue Version der Output-Artefakte erstellt (siehe Abbildung 4-3). Jede Version wird mit einer eindeutigen Versionskennung (Commit-ID) versehen und kann außerdem mit einem aussagekräftigen Versionsnamen versehen werden, z. B. Master, Development, Staging, Production usw. Dies ähnelt dem Git-Ablauf bei der Versionierung von Quellcode.
Nehmen wir an, du führst wiederholt jede Stunde eine bestimmte Aufgabe aus. Sie hat die gleichen Eingaben und Parameter oder du nimmst kleine Änderungen vor, die die Ergebnisse der Ausgabedaten nicht verändern. Das kann zu riesigen Haufen redundanter Daten führen, und mehrere Versionen speichern denselben Inhalt. Viele Lösungen für die Datenversionierung implementieren die Deduplizierung von Inhalten, um dieses Problem zu lösen.
Wenn ein Artefakt erstellt wird, wird ein kryptografischer Hash-Wert des Inhalts berechnet (z. B. mit den Algorithmen MD5 oder SHA1), der die Einzigartigkeit des Inhalts darstellt. Schließlich wird der Hash-Wert mit älteren Versionen verglichen oder als Index in der Speicherung verwendet. Auf diese Weise wird der Inhalt nur einmal gespeichert.
Da Lösungen für die Datenversionierung neben den Quelldaten (Code, Parameter, Benutzer, Ressourcen usw.) auch verschiedene Attribute erfassen, müssen sie gut in das Versionskontrollsystem (Git) und das Job- oder Pipeline-Ausführungsframework integriert sein. Andernfalls muss der Nutzer die Frameworks manuell zusammenfügen und die Referenzmetadaten für die Aufzeichnung zusammen mit den Daten bereitstellen.
Viele Frameworks(MLflow, MLRun usw.) bieten eine Logging-API, bei der der Nutzer eine log_artifact()
Methode aufruft, die automatisch die neuen Daten zusammen mit dem Code und den Ausführungsmetadaten aufzeichnet und versioniert. Viele bieten eine auto logging
Lösung an, die keine Code-Instrumentierung erfordert. Stattdessen findet sie automatisch heraus, welche Daten und Metadaten gespeichert und versioniert werden müssen, indem sie den Code des Nutzers und die Fähigkeiten des ML-Frameworks versteht.
Gängige ML-Datenversionierungstools
Es gibt eine Reihe von Open-Source- und kommerziellen Frameworks für die Datenversionierung. Dieses Buch konzentriert sich darauf, die Open-Source-Optionen DVC, Pachyderm, MLflow und MLRun zu erläutern und zu vergleichen.
Versionskontrolle der Daten
Data Version Control (DVC) begann als Werkzeug zur Datenversionierung für ML und wurde erweitert, um grundlegende ML-Workflow-Automatisierung und Experimentmanagement zu unterstützen. Es nutzt die Vorteile der Softwareentwicklung, mit der du bereits vertraut bist (Git, deine IDE, CI/CD usw.).
DVC funktioniert genau wie Git (mit ähnlichen Befehlen), aber für große dateibasierte Datensätze und Modellartefakte. Das ist sein Hauptvorteil, aber auch seine Schwäche. DVC speichertdie Dateninhalte in Dateien oder einer Objektspeicherung (AWS S3, GCS, Azure Blob usw.) und hält eine Referenz auf diese Objekte in einer Datei (.dvc) fest, die im Git-Repository gespeichert wird.
Mit dem folgenden Befehl fügst du eine lokale Modelldatei(model.pkl) zum Datenversionierungssystem hinzu:
dvc add model.pkl
DVC kopiert den Inhalt der Datei model.pkl in eine neue Datei mit einem neuen Namen (abgeleitet vom md5-Hashwert des Inhalts) und legt sie im Verzeichnis .dvc/ ab. Außerdem wird eine Datei namens model.pkl.dvc
erstellt, die auf diese Inhaltsdatei verweist. Als Nächstes muss die neue Metadatendatei von Git nachverfolgt werden, der Inhalt sollte ignoriert und die Änderungen müssen übertragen werden. Dies geschieht durch die Eingabe der folgenden Befehle:
git add model.pkl.dvc .gitignore git commit -m "Add raw data"
Wenn du die Daten auf deine entfernte Speicherung hochladen willst, musst du ein entferntes Objekt-Repository einrichten (hier nicht gezeigt) und den DVC-Push-Befehl verwenden:
dvc push
Der Datenfluss ist in Abbildung 4-4 dargestellt.
Wie du aus dem Beispiel ersehen kannst, bietet DVC eine zuverlässige Synchronisierung zwischen Code- und Dateidatenobjekten, aber es erfordert eine manuelle Konfiguration und speichert keine erweiterten Metadaten über die Ausführung, den Arbeitsablauf, die Parameter und so weiter. Stattdessen verwaltet DVC Parameter und Ergebnismetriken mithilfe von JSON- oder YAML-Dateien, die zusammen mit dem Code gespeichert und versioniert werden.
Benutzer können Workflow-Phasen definieren, die eine ausführbare Datei (z.B. ein Python-Programm) umschließen und angeben, welche Parameter (-p
) die Dateieingaben oder Abhängigkeiten (-d
) und Ausgaben (-o
) für diese ausführbare Datei sind (siehe Beispiel 4-1).
Beispiel 4-1. Hinzufügen eines Workflow-Schrittes in DVC
dvc stage add -n featurize \ -p featurize.max_features,featurize.ngrams \ -d src/featurization.py -d data/prepared \ -o data/features \ python src/featurization.py data/prepared data/features
Wenn du den Befehl dvc repro
ausführst, prüft er, ob sich die Abhängigkeiten geändert haben, führt die erforderlichen Schritte aus und registriert die Ausgaben.
DVC verwendet keine Experimentierdatenbank. Es verwendet Git als Datenbank, und jede Ausführung oder Parameterkombination wird einem eindeutigen Git-Commit zugeordnet. Außerdem ist DVC auf die lokale Entwicklung ausgerichtet. Daher kann der Einsatz im großen Maßstab oder in einer containerisierten oder verteilten Workflow-Umgebung eine Herausforderung darstellen und Skripting und manuelle Integrationen erfordern.
Zusammenfassend lässt sich sagen, dass DVC ein hervorragendes Werkzeug ist, um große Datenartefakte zu versionieren und sie in einer lokalen Entwicklungsumgebung Git-Commits zuzuordnen. Darüber hinaus implementiert DVC eine Datendeduplizierung, um den tatsächlichen Speicherbedarf zu reduzieren. Andererseits ist DVC befehlszeilenorientiert (Git-Flow) und verfügt nur über begrenzte Möglichkeiten, um in der Produktion zu laufen, Pipelines auszuführen und erweiterte Attribute und strukturierte Daten zu verfolgen. Außerdem verfügt es über eine minimale Benutzeroberfläche (Studio).
Dickhäuter
Pachyderm ist ein Datenpipeline- und Versionierungstool, das auf einer containerisierten Infrastruktur aufbaut. Es bietet ein versioniertes Dateisystem und ermöglicht es den Nutzern, mehrstufige Pipelines zu erstellen, bei denen jede Stufe in einem Container läuft, Eingabedaten (als Dateien) akzeptiert und Ausgabedateien erzeugt.
Pachyderm bietet ein versioniertes Daten-Repository, das über eine objektbasierte Speicherung (z. B. AWS S3, Minio, GCS) implementiert und über eine Datei-API oder das SDK/CLI aufgerufen werden kann. Jede Datenübertragung oder Änderung wird ähnlich wie bei Git protokolliert. Die Daten werden dedupliziert, um Speicherplatz zu sparen.
Die Pachyderm-Datenpipeline führt Container aus und bindet einen Ausschnitt des Repositorys in den Container ein (unter dem Verzeichnis /pfs/ ). Der Container liest Dateien, verarbeitet sie und schreibt die Ergebnisse zurück in das Pachyderm-Repository.
Beispiel 4-2 zeigt eine einfache Pipeline-Definition, die alle Daten aus dem Repository data
im Zweig master
übernimmt, die Logik der Wortzählung (unter Verwendung des angegebenen Container-Images) ausführt und die Ausgabe in das Repository out
schreibt.
Beispiel 4-2. Beispiel einer Dickhäuter-Pipeline
pipeline
:
name
:
'count'
description
:
'Count
the
number
of
lines
in
a
csv
file'
input
:
pfs
:
repo
:
'data'
branch
:
'master'
glob
:
'/'
transform
:
image
:
alpine:3.14.0
cmd
:
[
'/bin/sh'
]
stdin
:
[
'wc
-l
/pfs/data/iris.csv
>
/pfs/out/line_count.txt'
]
Pipelines können jedes Mal ausgelöst werden, wenn sich die Eingabedaten ändern, und die Daten können inkrementell verarbeitet werden (nur neue Dateien werden an den Containerprozess übergeben). Das kann Zeit und Ressourcen sparen.
Pachyderm hat eine schöne Benutzeroberfläche für die Verwaltung von Pipelines und die Erkundung der Daten. Siehe Abbildung 4-5.
Pachyderm kann mit großen oder kontinuierlich strukturierten Datenquellen arbeiten, indem er die Daten in kleinere CSV- oder JSON-Dateien aufteilt.
Zusammenfassend lässt sich sagen, dass Pachyderm ein hervorragendes Tool für den Aufbau versionierter Datenpipelines ist, bei denen der Code einfach genug ist, um Dateien zu lesen und zu schreiben. Es beherrscht Deduplizierung und inkrementelle Verarbeitung. Es erfordert jedoch eine separate Nachverfolgung des Quellcodes (führt vorgefertigte Images aus), der Ausführungs- oder Experimentparameter, der Metadaten, Metriken und mehr.
MLflow Tracking
MLflow ist eine Open-Source-Plattform für die Verwaltung des gesamten Lebenszyklus von Machine Learning. Eine der Kernkomponenten ist MLflow Tracking, das eine API und eine Benutzeroberfläche für die Protokollierung von Machine-Learning-Läufen, deren Eingaben und Ausgaben sowie die Visualisierung der Ergebnisse bietet. MLflow Tracking-Läufe sind Ausführungen von Data Science Code. Jeder Lauf zeichnet die folgenden Informationen auf:
- Code Version
-
Der für den Lauf verwendete Git-Commit-Hash.
- Start- und Endzeit
-
Die Start- und Endzeit des Laufs.
- Quelle
-
Der Name der Datei, mit der der Lauf gestartet wird, oder der Projektname und der Einstiegspunkt für den Lauf, wenn er von einem MLflow-Projekt aus ausgeführt wird.
- Parameter
-
Key-value
Eingabeparameter deiner Wahl. Sowohl die Schlüssel als auch die Werte sind Strings. - Metriken
-
Key-value
Metriken, wobei der Wert numerisch ist. MLflow zeichnet den gesamten Verlauf der Kennzahl auf und macht ihn sichtbar. - Artefakte
-
Gib Dateien in einem beliebigen Format aus. Du kannst zum Beispiel Bilder (z. B. PNGs), Modelle (z. B. ein gepickeltes
scikit-learn
Modell) und Datendateien (z. B. eine Parquet-Datei) als Artefakte aufzeichnen.
MLflow Tracking ist keine vollständige Lösung für die Datenversionierung, da es keine Funktionen wie die Datenabfolge (Aufzeichnung der Dateneingaben und der Daten, die zur Erstellung eines neuen Datenelements verwendet wurden) oder Deduplizierung unterstützt. Es ermöglicht jedoch die Protokollierung und Indizierung der Datenausgaben jedes Laufs zusammen mit dem Quellcode, den Parametern und einigen Ausführungsdetails. MLflow kann manuell mit anderen Tools wie DVC integriert werden, um Daten und Experimente zu verfolgen.
Der Vorteil von MLflow ist es, die Datenausgaben mit zusätzlichen Metadaten über den Code und die Parameter zu verfolgen und sie in einer grafischen Benutzeroberfläche zu visualisieren oder zu vergleichen. Das ist jedoch nicht kostenlos. Der Nutzercode muss mit dem MLflow-Tracking-Code instrumentiert werden.
Beispiel 4-3 zeigt einen Teil des Codes, der einen Lauf über die MLflow-API verfolgt. Zunächst werden die Kommandozeilenargumente manuell geparst und die Eingabedaten als String-URL übergeben, wie jeder andere Parameter auch. Das Laden und Umwandeln der Daten erfolgt dann manuell.
Nachdem die Logik (Datenaufbereitung, Training usw.) ausgeführt wurde, protokolliert der Nutzer die Tags, Eingabeparameter, Ausgabemetriken und Datenartefakte (Datensatz und Modell) mithilfe der MLflow-Protokollbefehle.
Beispiel 4-3. MLflow Tracking Codebeispiel
if
__name__
==
"__main__"
:
# parse the input parameters
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
"--data"
,
help
=
"input data path"
,
type
=
str
)
parser
.
add_argument
(
'--dropout'
,
type
=
float
,
default
=
0.0
,
help
=
'dropout ratio'
)
parser
.
add_argument
(
"--lr"
,
type
=
float
,
default
=
0.001
,
help
=
'learning rate'
)
args
=
parser
.
parse_args
()
# Read the csv file
try
:
data
=
pd
.
read_csv
(
args
.
data
)
except
Exception
as
e
:
raise
ValueError
(
f
"Unable to read the training CSV,
{
e
}
"
)
# additional initialization code ...
with
mlflow
.
start_run
():
# training code ...
# log experiment tags, parameters and result metrics
mlflow
.
set_tag
(
"framework"
,
"sklearn"
)
mlflow
.
log_param
(
"dropout"
,
args
.
dropout
)
mlflow
.
log_param
(
"lr"
,
args
.
lr
)
mlflow
.
log_metric
(
"rmse"
,
rmse
)
mlflow
.
log_metric
(
"r2"
,
r2
)
mlflow
.
log_metric
(
"mae"
,
mae
)
# log data and model artifacts
mlflow
.
log_artifacts
(
out_data_path
,
"output_data"
)
mlflow
.
sklearn
.
log_model
(
model
,
"model"
,
registered_model_name
=
"ElasticnetWineModel"
)
MLflow sendet die Laufinformationen an den Tracking-Server und speichert die Datenelemente in lokalen Dateien oder Remote-Objekten (z. B. in S3). Die Laufinformationen können in der MLflow-Benutzeroberfläche eingesehen oder verglichen werden (siehe Abbildung 4-6).
MLflow verwaltet oder versioniert keine Datenobjekte. Die Ausführung ist das wichtigste Element, und du kannst nicht direkt auf Datenobjekte und Artefakte zugreifen oder sie durchsuchen. Außerdem gibt es kein Lineage Tracking, d.h. es wird nicht verfolgt, welche Datenobjekte verwendet wurden, um ein neues Datenobjekt oder Artefakt zu erzeugen. Wenn du eine Pipeline ausführst, kannst du die Artefakte der verschiedenen Schritte nicht an einem Ort sehen oder die Ausgabe eines Schritts mit der Eingabe des nächsten Schritts verketten.
Mit MLflow kann die Speicherung sehr groß werden, da jeder Lauf die Ergebnisse in einem neuen Dateiverzeichnis speichert, auch wenn sich nichts geändert hat. Es gibt keine Datendeduplizierung wie bei den anderen Frameworks.
Zusammenfassend lässt sich sagen, dass MLflow ein hervorragendes Werkzeug ist, um ML-Experimentierergebnisse in einer Entwicklungsumgebung zu verfolgen und zu vergleichen. Darüber hinaus ist MLflow einfach zu installieren und zu nutzen. Es ist jedoch kein Datenverfolgungs- oder Versionierungssystem und kann erhebliche Speicherkapazitäten erfordern. Außerdem muss MLflow von den Entwicklern mit eigenem Code versehen werden und die MLOps-Teams müssen Glue-Logik hinzufügen, damit es in die Produktionsbereitstellung und CI/CD-Workflows passt.
MLRun
MLRun ist ein quelloffenes MLOps-Orchestrierungs-Framework mit mehreren Unterkomponenten, die den gesamten ML-Lebenszyklus abdecken. Datenobjekte sind in MLRun Bürger erster Klasse und sind gut mit den anderen Komponenten integriert, um eine nahtlose Erfahrung und Automatisierung zu ermöglichen.
Während die meisten Frameworks Dateidatenobjekte verwalten, unterstützt MLRun eine Vielzahl von Datenobjekten (Datenspeicher, Elemente/Dateien, Datensätze, Streams, Modelle, Feature-Sets, Feature-Vektoren, Diagramme und mehr), die jeweils über eigene Metadaten, Aktionen und Viewer verfügen.
Jedes Objekt in MLRun hat einen Typ, eine eindeutige Versions-ID, Tags (benannte Versionen wie Entwicklung, Produktion usw.), benutzerdefinierte Bezeichnungen (zum Gruppieren und Suchen von Objekten) und Beziehungen zu anderen Objekten und ist ein Projektmitglied. Ein Laufobjekt ist beispielsweise mit den Quell- und Ausgabedatenobjekten und den Funktionsobjekten (Code) verknüpft und bildet so einen Graphen von Beziehungen.
Abbildung 4-7 zeigt den Ausführungsbildschirm mit Informationsreitern für allgemeine und Code-Attribute, Dateneingabeobjekte, Daten/Artefaktausgabeobjekte, Ergebnismetriken, automatisch gesammelte Protokolle usw. Die Nutzer können die Informationen aus verschiedenen Perspektiven betrachten. Du kannst dir zum Beispiel alle Datensätze im Projekt ansehen (unabhängig davon, welcher Lauf sie erzeugt hat).
MLRun-Datenobjekte und -Artefakte sind mit detaillierten Metadaten versehen. Dazu gehören Informationen darüber, wie sie erstellt wurden (von wem, wann, mit welchem Code, Framework usw.), welche Datenquellen zu ihrer Erstellung verwendet wurden und typspezifische Attribute wie Schema, Statistik, Vorschau und mehr. Die Metadaten werden automatisch generiert, was eine bessere Beobachtbarkeit ermöglicht und die Notwendigkeit einer zusätzlichen Glue-Logik überflüssig macht.
Hinweis
MLFlow-Benutzer können MLFlow weiterhin zum Verfolgen von APIs verwenden. MLRun registriert die protokollierten Daten, Metadaten und Modelle automatisch als Produktionsartefakte zusammen mit zusätzlichen betrieblichen Metadaten und Kontext.
MLRun bietet ein umfangreiches API/SDK für die Verfolgung und Suche in Daten und Experimenten. Die eigentliche Stärke ist jedoch, dass es die meisten Funktionen und Automatisierungen ohne zusätzlichen Programmieraufwand bereitstellen kann.
Beispiel 4-4 nimmt Eingabedaten und Parameter entgegen und erzeugt Ausgabedaten und Ergebnisse. Beachte, dass der Code im Gegensatz zu den vorherigen Beispielen kein Argument-Parsing, kein Laden von Daten, keine Konvertierung, kein Logging usw. enthält.
Beispiel 4-4. MLRun Code Beispiel
def
data_preparation
(
dataset
:
pd
.
DataFrame
,
test_size
=
0.2
):
# preform processing on the dataset
dataset
=
clean_df
(
dataset
)
.
dropna
(
how
=
"any"
,
axis
=
"rows"
)
dataset
=
dataset
.
drop
(
columns
=
[
"key"
,
"pickup_datetime"
])
train
,
test
=
train_test_split
(
dataset
,
test_size
=
test_size
)
return
train
,
test
,
"fare_amount"
Wenn du die Funktion ausführst und die URL oder den Pfad des Eingabedatenobjekts angibst (eine Datei, ein entferntes Objekt oder einen komplexen Typ), wird es automatisch in die Funktion geladen. Zum Beispiel mit AWS Boto-Treibern für den Zugriff auf S3-Objekte oder BigQuery-Treibern für den Zugriff auf eine BigQuery-Tabelle. Dann werden die Daten in das akzeptierende Format (DataFrame) umgewandelt und in den Nutzercode injiziert.
MLRun kann den Typ des zurückgegebenen Wertes automatisch erkennen (zum Beispiel sind train
und test
vom Typ DataFrame) und ihn in der besten Form speichern, zusammen mit automatisch generierten Metadaten, Links zu den Auftragsdetails und Dateneingabeobjekten sowie Informationen zur Versionierung. Wenn sich die Daten wiederholen, werden sie dedupliziert und nur einmal gespeichert, um Speicherplatz zu sparen.
Datenobjekte werden in der Benutzeroberfläche und im Client-SDK typspezifisch visualisiert, unabhängig davon, wie und wo sie gespeichert sind, z. B. tabellarische Formate mit Tabellenmetadaten (Schema, Statistiken und mehr) für Datensätze oder interaktive Grafiken für Diagrammobjekte (siehe Abbildungen 4-8 und 4-9).
Zusammenfassend lässt sich sagen, dass MLRun ein komplettes MLOps-Orchestrierungs-Framework ist, dessen Schwerpunkt auf der Datenverwaltung, der Bewegung, der Versionierung und der Automatisierung liegt. Darüber hinaus verfügt MLRun über ein umfangreiches Objektmodell, das die verschiedenen Arten von Daten und Ausführungsobjekten (Funktionen, Läufe, Workflows und mehr), ihre Beziehung zueinander und ihre Verwendung abdeckt. MLRun setzt auf Abstraktion und Automatisierung, um den Entwicklungs- und Implementierungsaufwand zu verringern. MLRun ist jedoch keine allgemeine Datenverwaltungs- und Versionsverwaltungslösung, und sein Wert wird maximiert, wenn es im Kontext von MLOps eingesetzt wird.
Andere Rahmenwerke
Einige Tools, wie Delta Lake und lakeFS, übernehmen die Versionierung von Data Lakes. Diese Tools sind jedoch nicht auf den ML-Lebenszyklus ausgerichtet und müssen möglicherweise integriert werden, damit sie für MLOps nützlich sind.
Cloud-Provider bieten Lösungen an, die in der Regel eng mit ihren internen Diensten verbunden sind. Siehe zum Beispiel Amazon SageMaker ML Lineage Tracking und Azure ML Datasets.
Datenaufbereitung und -analyse im großen Maßstab
Die Datenverarbeitung wird in den Daten-, ML- und Anwendungspipelines ausgiebig genutzt. Bei der Arbeit mit Produktionsdaten ist es notwendig, eine größere Skalierung und Leistung zu unterstützen und in einigen Fällen die Daten in Echtzeit zu verarbeiten, wenn sie ankommen.
Praktiken, die bei der interaktiven Entwicklung funktionieren, z. B. das Speichern der Daten in einer CSV-Datei und das Einlesen in das Notebook, funktionieren nicht mit Gigabytes oder Terabytes von Daten. Sie erfordern verteilte oder parallele Datenverarbeitungsansätze.
Die allgemeine Architektur für die verteilte Datenverarbeitung ist die gleiche, mit Unterschieden in der Art und Weise, wie die Daten verteilt und gesammelt werden und welche APIs sie verwenden. Zum Beispiel werden die Daten auf mehrere Computerknoten verteilt, die Verarbeitungsanfragen oder Abfragen kommen bei einem oder mehreren Knoten zur lokalen Verarbeitung an und die Ergebnisse werden gesammelt und zu einer einzigen Antwort zusammengeführt. Darüber hinaus können komplexe Abfragen Daten zwischen den Knotenpunkten verschieben oder mehrere Verarbeitungs- und Bewegungsschritte ausführen.
Abbildung 4-10 zeigt, wie die verteilte Datenverarbeitung mit dem Map-Reduce-Ansatz zum Zählen von Wörtern in einem Dokument funktioniert.
Strukturierte und unstrukturierte Datenumwandlungen
Daten können strukturiert sein, das heißt, sie entsprechen einem bestimmten Format oder einer bestimmten Struktur und haben oft ein vordefiniertes Schema oder Datenmodell. Strukturierte Daten können eine Datenbanktabelle oder Dateien mit einem strukturierten Layout sein (z. B. CSV, Excel, JSON, ML, Parquet). Die meisten Daten auf der Welt sind jedoch unstrukturiert, meist komplexer und schwieriger zu verarbeiten als strukturierte Daten. Dazu gehören freie Texte, Protokolle, Webseiten, Bilder, Videos und Audiodaten.
Hier sind einige Beispiele für analytische Transformationen, die an strukturierten Daten vorgenommen werden können:
- Filtern
-
Auswahl einer Teilmenge der Daten nach bestimmten Kriterien, z. B. nach einem bestimmten Datumsbereich oder bestimmten Werten in einer Spalte.
- Sortieren
-
Ordnen der Daten anhand einer oder mehrerer Spalten, z. B. Sortieren nach Datum oder nach einem bestimmten Wert.
- Gruppierung
-
Organisieren der Daten in Gruppen, die auf einer oder mehreren Spalten basieren, z. B. nach Produktkategorie oder Stadt gruppieren.
- Aggregation
-
Berechnung von zusammenfassenden Statistiken, wie z. B. Anzahl, Summe, Durchschnitt, Maximum und Standardabweichung für eine oder mehrere Spalten.
- Beitritt zu
-
Kombinieren von Daten aus mehreren Tabellen oder Datensätzen auf der Basis gemeinsamer Spalten, z. B. das Verbinden einer Tabelle mit Verkaufsdaten mit einer Tabelle mit Kundendaten.
- Mapping
-
Zuordnung von Werten aus einer oder mehreren Spalten zu neuen Spaltenwerten mithilfe von benutzerdefinierten Operationen oder Code. Die zustandsabhängige Zuordnung kann neue Werte auf der Grundlage der ursprünglichen Werte und der akkumulierten Zustände aus älteren Einträgen (z. B. die seit der letzten Anmeldung vergangene Zeit) berechnen.
- Zeitreihenanalyse
-
Analysieren oder Aggregieren von Daten im Laufe der Zeit, um z.B. Trends, Muster oder Anomalien zu erkennen.
Die folgenden Techniken können verwendet werden, um unstrukturierte Daten zu verarbeiten oder sie in strukturierte Daten umzuwandeln:
- Text Mining
-
Der Einsatz von NLP-Techniken, um Bedeutung und Erkenntnisse aus Textdaten zu gewinnen. Text Mining kann Informationen wie Stimmungen, Entitäten und Themen aus Textdaten extrahieren.
- Computer Vision
-
Die Verwendung von Bild- und Videoverarbeitungstechniken, um Informationen aus visuellen Daten zu gewinnen. Computer Vision kann Informationen wie Objekterkennung, Gesichtserkennung und Bildklassifizierung extrahieren.
- Audio- und Spracherkennung
-
Nutzung von Sprache-zu-Text- und Audioverarbeitungstechniken, um Bedeutung und Erkenntnisse aus Audiodaten zu gewinnen. Audio- und Spracherkennung kann Informationen wie Sprache-zu-Text, Stimmung und Sprecheridentifikation extrahieren.
- Datenextraktion
-
Techniken wie Web-Scraping und Datenextraktion nutzen, um strukturierte Daten aus unstrukturierten Datenquellen zu extrahieren.
Mit verschiedenen ML-Methoden können zum Beispiel Rohdaten in aussagekräftigere Daten umgewandelt werden:
- Clustering
-
Gruppierung ähnlicher Datenpunkte auf der Grundlage bestimmter Merkmale, wie z. B. Kunden mit ähnlichen Kaufgewohnheiten
- Dimensionalitätsreduktion
-
Reduzierung der Anzahl von Merkmalen in einem Datensatz, um ihn leichter analysieren oder visualisieren zu können
- Regression und Klassifizierung
-
Vorhersage einer Klasse oder eines Wertes auf der Grundlage anderer Datenmerkmale
- Anrechnung
-
Bestimmung des erwarteten Wertes auf der Grundlage anderer Datenpunkte bei fehlenden Daten
- einbetten
-
Die Darstellung einer Text-, Audio- oder Bildsequenz als numerischer Vektor, der die semantischen Beziehungen oder kontextuellen Merkmale beibehält.
Verteilte Datenverarbeitungsarchitekturen
Datenverarbeitungsarchitekturen können in drei Hauptkategorien unterteilt werden:
- Interaktiv
-
Eine Anfrage oder ein Update kommt an, wird verarbeitet und eine Antwort wird sofort zurückgegeben; zum Beispiel SQL- und NoSQL-Datenbanken, Data Warehouses, Key/Value-Stores, Graph-Datenbanken, Zeitreihen-Datenbanken und Cloud-Dienste.
- Batch
-
Ein Auftrag wird auf eine Anfrage hin oder zu einem geplanten Zeitpunkt gestartet, die Daten werden abgeholt und verarbeitet und die Ergebnisse nach Abschluss in die Zielspeicherung geschrieben. Batch-Aufträge benötigen in der Regel eine längere Bearbeitungszeit. Beispiele für Frameworks zur Batch-Datenverarbeitung sind Hadoop, Spark und Dask.
- Streaming
-
Kontinuierliche Verarbeitung von eingehenden Anfragen oder Datenpaketen und Schreiben der Ergebnisse in Echtzeit in eine Zielspeicherung oder eine Nachrichtenwarteschlange.
Die Batch-Verarbeitung ist in der Regel effizienter für die Verarbeitung großer Datenmengen. Die interaktive und die Stream-Datenverarbeitung liefern jedoch schnellere Antworten mit kürzeren Verzögerungen. Außerdem ist der Aufbau von Pipelines für die Datenstromverarbeitung in der Regel komplexer als Batch-Aufträge.
Einige Frameworks wie Spark unterstützen zwar verschiedene Verarbeitungsmethoden (interaktiv, Batch, Streaming), sind aber in der Regel nur für eine der Verarbeitungsmethoden optimal.
Interaktive Datenverarbeitung
Von interaktiven Systemen wird erwartet, dass sie sofort reagieren, damit der anfragende Client oder das interaktive Dashboard nicht warten muss. Außerdem können Produktionsdienste auf die Zuverlässigkeit und Robustheit der Ergebnisse angewiesen sein. Aus diesem Grund haben interaktive Systeme einfache APIs mit begrenzten Datenoperationen. In einigen Fällen bieten interaktive Systeme Mechanismen zur Definition eigener Logik durch gespeicherte Prozeduren und benutzerdefinierte Funktionen (UDFs).
Der Hauptunterschied zwischen den verschiedenen Arten von interaktiven Datensystemen ist die Art und Weise, wie sie Daten indizieren und speichern, um die Abfragezeit zu minimieren. NoSQL-, In-Memory- und Key/Value-Stores zum Beispiel sind für die Abfrage nach einem Indexschlüssel (z. B. Benutzer-ID, Produkt-ID usw.) optimiert. Die Daten werden durch den Schlüssel (oder einen Krypto-Hash oder den Schlüssel) geteilt und in verschiedenen Knotenpunkten gespeichert. Wenn eine Anfrage eintrifft, wird sie an den entsprechenden Knoten weitergeleitet, der die Daten für diesen Schlüssel (Benutzer, Produkt usw.) verwaltet und die Antwort schnell berechnen und abrufen kann. Komplexe oder schlüsselübergreifende Berechnungen hingegen erfordern die Koordination zwischen allen Knotenpunkten und dauern viel länger.
Analytische Datenbanken und Data Warehouses sind darauf ausgelegt, viele Datensätze mit unterschiedlichen Indexschlüsselwerten zu durchsuchen. Sie organisieren die Daten in Spalten (nach Feldern) und verwenden verschiedene spaltenbezogene Komprimierungstechnologien sowie Filter- und Hinting-Tricks (wie Bloom-Filter), um Datenblöcke zu überspringen.
Andere Systeme wie Zeitreihen- oder Graphdatenbanken verfügen über fortschrittlichere Datenlayouts und Suchstrategien, die multidimensionale Indizes und spaltenbezogene Komprimierung kombinieren. Der Zugriff auf das metrische Objekt der Zeitreihe erfolgt zum Beispiel über den metrischen Schlüssel (Name) und die Verwendung von Technologien zur spaltenweisen Komprimierung, um die einzelnen Werte (nach Zeit) zu durchsuchen oder zu aggregieren.
Viele interaktive Systeme verwenden die SQL-Sprache oder eine SQL-ähnliche Semantik, um Daten zu verarbeiten.
Einige Unterkategorien nennenswerter Datensysteme sind in Tabelle 4-1 aufgeführt.
Kategorie | Beschreibung |
---|---|
Relationale |
Speichern strukturierter Daten, Zugriff über SQL-Befehl. Beispiele sind MySQL, PostgreSQL, Oracle und Microsoft SQL Server. |
NoSQL |
Beispiele dafür sind MongoDB, Cassandra, Redis, Elasticsearch, AWS DynamoDB, Google BigTable und nicht-tabellarische Datenbanken. |
Zeitreihen |
Speichern und Abfragen von Zeitreihendaten. Beispiele sind InfluxDB, Prometheus und TimescaleDB. |
Grafik |
Speichern und Abfragen von Daten in einem Graphenformat. Beispiele sind Neo4j und Titan. |
Vektor |
Eine Vektordatenbank indiziert und speichert hochdimensionale Vektoreinbettungen für ein schnelles Wiederfinden und eine Ähnlichkeitssuche. Beispiele sind Chroma, Pinecone, Milvus, Weaviate und Pgvector. |
Analytische Systeme durchlaufen und verarbeiten in der Regel größere Datensätze. Daher unterstützen sie umfangreichere Transformationen (Filtern, Gruppieren, Verbinden, Aggregieren, Zuordnen usw.) und benutzerdefinierte Funktionen. Außerdem können einige Lösungen Daten aus anderen Datenbanken oder aus Dateien verarbeiten und aggregieren. Lösungen wie Spark SQL oder PrestoDB haben zum Beispiel Konnektoren zu vielen Datenquellen und können Abfragen verarbeiten, die viele Datensätze umfassen und in verschiedenen Systemen gespeichert sind.
Eine der beliebtesten verteilten SQL-basierten Analyse-Engines ist PrestoDB und ihr Nachfolgeprojekt Trino. Presto wurde ursprünglich von Facebook entwickelt und als Open Source zur Verfügung gestellt. Später wurde es in Projekte wie Trino und in kommerzielle Produkte wie den Amazon Athena Cloud Service abgezweigt. Trino hat eine lange Liste von Datenkonnektoren.
Abbildung 4-11 zeigt die Architekturen von Presto und Trino. Die Anfragen kommen über HTTP-Anfragen, werden geparst und vom Planer und dem Zeitplannungsprogramm in kleinere Aufgaben zerlegt, die von den einzelnen Workern bearbeitet und zusammengeführt werden.
Batch-Datenverarbeitung
Die Stapelverarbeitung wird verwendet, wenn große Datenmengen verarbeitet und eine Reihe von Datenumwandlungen durchgeführt werden müssen und die Verarbeitungszeit weniger wichtig ist. Bei der Stapelverarbeitung werden die Daten gelesen und in Stücke zerlegt, die an mehrere Worker zur Verarbeitung weitergegeben werden. Sobald das Ergebnis fertig ist, wird es in das Zielsystem geschrieben. Die Stapelverarbeitung wird häufig eingesetzt, um große Mengen historischer Daten zu verarbeiten und den Datensatz für das Training von ML-Modellen zu erzeugen.
Eines der bekanntesten Batch-Datenverarbeitungs-Frameworks war Apache Hadoop, ein Open Source Software-Framework für verteilte Speicherung und groß angelegte Verarbeitung datenintensiver Aufgaben. Hadoop wurde ursprünglich von Yahoo! Ingenieuren entwickelt und basiert auf dem MapReduce-Programmiermodell, das aus zwei Hauptfunktionen besteht: Map
und Reduce
. Die Funktion Map
verarbeitet einen Eingabedatensatz zu einer Reihe von Schlüssel-Wert-Paaren, die dann nach Schlüssel gruppiert und von der Funktion Reduce
verarbeitet werden, um die endgültige Ausgabe zu erzeugen.
Inzwischen wurde Hadoop durch modernere und Cloud-native Architekturen ersetzt, die auf Cloud-Objektspeicherung, containerisierter Infrastruktur und Berechnungsframeworks wie Spark, Flink, Beam, Dask und anderen basieren.
Eine alltägliche Anwendung für die Stapelverarbeitung ist die ETL-Verarbeitung. ETL bedeutet, Daten aus verschiedenen Quellen zu extrahieren, umzuwandeln und in eine Zieldatenbank, ein Data Warehouse oder einen Data Lake zu laden. ETL ist ein wichtiger Schritt im Datenintegrationsprozess, da er es Unternehmen ermöglicht, Daten aus verschiedenen Quellen zu extrahieren, zu bereinigen und in ein einziges, zentrales Repository umzuwandeln.
Batch-Verarbeitungspipelines können komplex sein und mehrere Schritte und Abhängigkeiten aufweisen. Apache Airflow ist eines der beliebtesten Open-Source-Frameworks für die Erstellung, Planung und Überwachung von Batch-Datenpipelines.
Airflow wurde ursprünglich von Airbnb entwickelt und wird jetzt von der Apache Software Foundation gepflegt. Es bietet eine einfache und leicht zu bedienende Schnittstelle zur Definition von Workflows als DAGs von Aufgaben, wobei jede Aufgabe einen einzelnen Verarbeitungsschritt darstellt. Die Aufgaben können in Python geschrieben werden und in verschiedenen Umgebungen ausgeführt werden, z. B. lokal, über Kubernetes oder in der Cloud.
Airflow bietet außerdem eine webbasierte Benutzeroberfläche (siehe Abbildung 4-12) für die Verwaltung und Überwachung von Workflows, mit der du den Status der einzelnen Aufgaben einsehen, fehlgeschlagene Aufgaben wiederholen und Aufgaben manuell auslösen oder planen kannst. Sie enthält außerdem Funktionen zur Verwaltung und Organisation von Workflows, wie z. B. die Definition von Abhängigkeiten zwischen Aufgaben und die Einrichtung einer Logik zur Wiederholung von Aufgaben.
Beispiel 4-5 ist ein Beispiel für Python-Code, der verwendet werden kann, um eine DAG in Apache Airflow zu erstellen, die Daten aus einer CSV-Datei liest, sie verarbeitet und an ein Ziel schreibt.
Beispiel 4-5. Code-Beispiel für die Luftstromdaten-Pipeline
import
csv
from
airflow
import
DAG
from
airflow.operators.python_operator
import
PythonOperator
from
datetime
import
datetime
,
timedelta
def
process_data
(
**
kwargs
):
ti
=
kwargs
[
'ti'
]
input_file
=
ti
.
xcom_pull
(
task_ids
=
'read_file'
)
processed_data
=
do_data_processing
(
input_file
)
return
processed_data
def
do_data_processing
(
input_file
):
# Placeholder function that performs data processing
processed_data
=
input_file
return
processed_data
def
read_csv_file
(
file_path
):
with
open
(
file_path
,
'r'
)
as
file
:
reader
=
csv
.
reader
(
file
)
return
list
(
reader
)
def
write_csv_file
(
file_path
,
data
):
with
open
(
file_path
,
'w'
)
as
file
:
writer
=
csv
.
writer
(
file
)
writer
.
writerows
(
data
)
default_args
=
{
'owner'
:
'airflow'
,
'depends_on_past'
:
False
,
'start_date'
:
datetime
(
2021
,
1
,
1
),
'email_on_failure'
:
False
,
'email_on_retry'
:
False
,
'retries'
:
1
,
'retry_delay'
:
timedelta
(
minutes
=
5
),
}
dag
=
DAG
(
'data_processing_dag'
,
default_args
=
default_args
,
description
=
'A DAG that reads data from a CSV file, processes it'
', and writes it to a destination'
,
schedule_interval
=
timedelta
(
hours
=
1
),
)
read_file
=
PythonOperator
(
task_id
=
'read_file'
,
python_callable
=
lambda
:
read_csv_file
(
'/path/to/input_file.csv'
),
xcom_push
=
True
,
dag
=
dag
,
)
process_data
=
PythonOperator
(
task_id
=
'process_data'
,
python_callable
=
process_data
,
provide_context
=
True
,
dag
=
dag
,
)
write_file
=
PythonOperator
(
task_id
=
'write_file'
,
python_callable
=
lambda
:
write_csv_file
(
'/path/to/output_file.csv'
,
ti
.
xcom_pull
(
task_ids
=
'process_data'
)),
provide_context
=
True
,
dag
=
dag
,
)
read_file
>>
process_data
>>
write_file
Es gibt mehrere Cloud-basierte Batch-Daten-Pipeline-Dienste wie AWS Glue, Google Cloud Composer (basierend auf Airflow) und Azure Data Factory.
Einer der Nachteile von Hadoop oder anderen Batch-Pipelines ist, dass die Daten bei jedem Schritt von der Festplatte gelesen, verarbeitet und wieder auf die Festplatte geschrieben werden müssen. Frameworks wie Spark und Dask wissen jedoch, wie man die Verarbeitungspipeline zu einem optimalen Graphen kompiliert, bei dem die Aufgaben nach Möglichkeit im Speicher ausgeführt werden, was die IO auf der Festplatte minimiert und die Leistung maximiert.
Beispiel 4-6 demonstriert einen Spark-Code, der eine CSV-Datei liest, die Daten verarbeitet und das Ergebnis in eine Zieldatei schreibt.
Beispiel 4-6. PySpark Daten-Pipeline Codebeispiel
from
pyspark.sql
import
SparkSession
# Create a Spark session
spark
=
SparkSession
.
builder
.
appName
(
"SimpleBatchProcessing"
)
.
getOrCreate
()
# Load a CSV file into a Spark DataFrame
df
=
spark
.
read
.
csv
(
"/path/to/input_file.csv"
,
header
=
True
,
inferSchema
=
True
)
# Perform some data processing on the DataFrame
processed_df
=
df
.
groupBy
(
"column_name"
)
.
agg
({
"column_name"
:
"mean"
})
# Write the processed DataFrame to a new CSV file
processed_df
.
write
.
csv
(
"/path/to/output_file.csv"
,
header
=
True
)
# Stop the Spark session
spark
.
stop
()
Beispiel 4-7 zeigt die gleiche Aufgabe, implementiert mit Dask. Der Vorteil von Dask ist, dass die Operationen denen von Python pandas sehr ähnlich sind, was ein enormer Vorteil für Datenwissenschaftler ist. Allerdings ist Spark in der Regel skalierbarer und robuster.
Beispiel 4-7. Dask Daten-Pipeline Codebeispiel
import
dask.dataframe
as
dd
# Load a CSV file into a Dask DataFrame
df
=
dd
.
read_csv
(
'/path/to/input_file.csv'
)
# Perform some data processing on the DataFrame
processed_df
=
df
.
groupby
(
'column_name'
)
.
column_name
.
mean
()
.
compute
()
# Write the processed DataFrame to a new CSV file
processed_df
.
to_csv
(
'/path/to/output_file.csv'
,
index
=
False
)
Du kannst sehen, dass die Beispiele für Spark und Dask viel einfacher sind als die für Airflow. Für die Verwaltung und Verfolgung langer, komplexer Aufträge kann Airflow jedoch besser geeignet sein.
Stream-Verarbeitung
Stream Processing ermöglicht eine skalierbare, fehlertolerante und Echtzeit-Datenverarbeitung. Sie wird häufig in Anwendungen eingesetzt, die große Datenmengen in Echtzeit verarbeiten, wie z. B. Echtzeit-Analysen, Betrugserkennung oder Empfehlungen.
Bei der Stream-Verarbeitung werden die Daten und eingehenden Ereignisse in einen Stream (eine Warteschlange) geschoben und von einem oder mehreren Workern gelesen. Die Worker verarbeiten die Daten nacheinander, führen Transformationen durch, fassen die Ergebnisse zusammen und schreiben die Ergebnisse in eine Datenbank oder einen Ausgabestrom. Im Gegensatz zu traditionellen Warteschlangen erfolgt die Stream-Verarbeitung in der Reihenfolge. Nehmen wir zum Beispiel an, der Stream enthält zwei Ereignisse: eines für die Kundenanmeldung und eines für die Kundenabmeldung. Werden sie nicht in der richtigen Reihenfolge verarbeitet, kann dies zu einem fehlerhaften Zustand führen. Ein anderes Beispiel ist eine Geldeinzahlung, gefolgt von einer Abhebung. Die Abhebung kann abgelehnt werden, wenn die Vorgänge in der falschen Reihenfolge bearbeitet werden.
Streams sind so konzipiert, dass sie skalierbar sind. Sie sind in Partitionen unterteilt, und jede Partition verarbeitet einen bestimmten Satz von Datenobjekten, so dass sie die Reihenfolge nicht verletzen. Ein Stream mit Benutzeraktivitäten wird zum Beispiel nach der Benutzer-ID aufgeteilt, damit die Aktivitäten eines bestimmten Benutzers immer in derselben Partition gespeichert und von demselben Worker verarbeitet werden.
Streams wie Kafka, AWS Kinesis und andere unterscheiden sich von Nachrichtenwarteschlangen wie RabbitMQ, AMQP, Amazon SQS, Google Pub/Sub und so weiter. Nachrichtenwarteschlangen garantieren keine Reihenfolge der Nachrichten. Sie garantieren jedoch die zuverlässige Zustellung von Nachrichten, während bei Streams der Client für die Zuverlässigkeit sorgt. Außerdem sind sie aufgrund der einfacheren Logik und der Parallelität, die Streams bieten, viel schneller.
Abbildung 4-13 zeigt eine Streaming-Anwendung, in der Clients Daten veröffentlichen, die auf die einzelnen Partitionen verteilt werden (basierend auf einem Hash des Partitionsschlüssels). Ein Worker liest aus jeder Partition und verarbeitet die Daten. Der Worker kann eine Datenbank verwenden, um den Status in bekannten Intervallen zu speichern (Checkpoints), so dass der Status im Falle eines Ausfalls wiederhergestellt werden kann, oder der Worker kann ungenutzten Speicher freigeben. Schließlich können die Ergebnisse in eine Zieldatenbank oder einen Ausgabestrom geschrieben werden.
Streams bieten eine "At-least-once-Semantik". Daher kann dieselbe Nachricht mehrmals vorkommen. Eine Möglichkeit, die "Exact Once"-Semantik zu gewährleisten (dieselbe Nachricht wird nur einmal verarbeitet), ist die Verwendung von Checkpoints. Die Streams werden der Reihe nach verarbeitet und der Status kann nach jedem Micro-Batch persistiert werden. Im Falle eines Ausfalls kann der Worker die letzten Checkpoint-Daten (Status) wiederherstellen, die Ereignisse ab diesem Zeitpunkt verarbeiten und ältere Ereignisse ignorieren.
Stream Processing Frameworks
Echtzeit-Analysen mit Echtzeit-Streams unterscheiden sich von Batch- oder SQL-Analysen. Bei Streams können die Worker die Daten nur einmal in sequentieller Reihenfolge durchgehen und einen Teil der Daten (in derselben Partition) sehen. Aus diesem Grund konzentrieren sich Echtzeit-Analyse-Frameworks wie Spark Streaming, Apache Flink, Apache Beam, Apache NiFi und andere auf die Stream-Verarbeitung und implementieren die Standard-Analyse- und Statistikmethoden auf eine Stream-optimierte Weise.
Ein typisches Szenario bei der Stream-Verarbeitung ist die Aggregation von Werten über einen bestimmten Zeitraum, z. B. die Untersuchung des Gesamtwerts der Kundentransaktionen der letzten Stunde, um Betrug zu erkennen. Es ist nicht machbar, die Gesamtsumme für jedes neue Ereignis mit Stream Processing zu berechnen. Das würde eine beträchtliche Menge an Zeit und Speicherplatz benötigen. Stattdessen werden die Werte in gefensterte Buckets gruppiert, z. B. sechs Buckets oder mehr, die jeweils den Gesamtwert pro 10 Minuten enthalten. Der Prozess summiert nur die Werte der letzten sechs Buckets und lässt den ältesten Bucket alle 10 Minuten fallen. Abbildung 4-14 zeigt überlappende Schiebefenster mit einer einminütigen Fensterdauer und 30-sekündigen Fensterperioden.
Beispiel 4-8 zeigt den Apache Beam-Code für die Definition eines solchen Fensters.
Beispiel 4-8. Definieren des Schiebefensters mit Apache Beam
from
apache_beam
import
window
sliding_windowed_items
=
(
items
|
'window'
>>
beam
.
WindowInto
(
window
.
SlidingWindows
(
60
,
30
)))
Die Codierung mit Stream-Processing-Frameworks erfordert fortgeschrittene Kenntnisse in der Datentechnik. Aus diesem Grund meiden viele Nutzer Echtzeitdaten, obwohl sie einen viel besseren Geschäftswert und genauere Modellbewertungsergebnisse liefern können. Feature Stores können hier Abhilfe schaffen, denn sie können die Batch- und die Streaming-Pipeline automatisch aus derselben übergeordneten Datenverarbeitungslogik generieren.
Feature Stores
Feature Stores sind eine Fabrik und ein zentrales Repository für Machine Learning Features. Feature Stores übernehmen die Sammlung von Rohdaten aus verschiedenen Quellen, die Transformationspipeline, die Speicherung, Katalogisierung, Versionierung, Sicherheit, Bereitstellung und Überwachung. Sie automatisieren viele der in diesem Kapitel beschriebenen Prozesse, beschleunigen die Produktionszeit und reduzieren den Entwicklungsaufwand. Feature Stores bilden einen gemeinsamen Katalog produktionsreifer Funktionen, ermöglichen die Zusammenarbeit und den Austausch zwischen Teams und beschleunigen die Innovation und Bereitstellung neuer KI-Anwendungen.
Die ersten Feature Stores wurden von großen Dienstleistern wie Uber, Twitter und Spotify implementiert. Bei diesen Anbietern ist KI das Herzstück des Geschäfts, und Feature Stores halfen ihnen, die Entwicklung und den Einsatz neuer KI-Anwendungen zu beschleunigen und die Zusammenarbeit und Wiederverwendung zu verbessern. Heute gibt es zahlreiche kommerzielle und Open-Source-Implementierungen, aus denen du wählen kannst.
Erweiterte Feature Stores bieten die folgenden Möglichkeiten:
- Datenkonnektivität
-
Problemlose Integration mit mehreren Offline- (Data Lakes, Data Warehouses, Datenbanken usw.) und Online-Quellen (Streams, Message Queues, APIs, Managed Services usw.).
- Offline- und Online-Transformation
-
Einige Feature Stores bieten die Möglichkeit, die Batch- und Streaming-Pipelines automatisch aus der übergeordneten Logik zu erstellen und zu verwalten.
- Speicherung
-
Speicherung der generierten Merkmale in einem Offline-Speicher (z. B. einem Objektspeicher) und einem Online-Speicher (normalerweise eine Schlüssel/Wert-Datenbank).
- Metadaten-Management
-
Automatisches Erzeugen, Speichern und Verwalten aller Metadaten von Merkmalen, einschließlich Abstammung, Schemata, Statistiken, Beschriftungen und mehr.
- Versionierung
-
Du verwaltest mehrere Versionen jedes Features und den Prozess der Überführung von Features aus der Entwicklung in die Produktion und integrierst sie in CI/CD.
- Erzeugen und Verwalten von Feature-Vektoren
-
Korrektes Zusammenfügen mehrerer Merkmale zu einem einzigen Datensatz für die Verwendung in Trainings- oder Serviceanwendungen.
- Zentrale Katalogisierung
-
Zentraler Zugriff auf Funktionen zum Erstellen, Beschriften oder Suchen.
- Sicherheit und Governance
-
Kontrolle des Zugriffs auf Merkmale und Rohdaten und Protokollierung des Merkmalszugriffs.
- Benutzerfreundliches UI und SDK
-
Einfacher Zugang über APIs und eine Benutzeroberfläche, um die unterschwellige Komplexität zu abstrahieren, Funktionen zu visualisieren und sie für Datenwissenschaftler nutzbar zu machen.
- Überwachung und hohe Verfügbarkeit
-
Überwache die Anlagen und Datenverarbeitungsaufgaben automatisch und stelle sie bei Ausfällen zuverlässig wieder her.
- Validierung und Analyse von Merkmalen
-
Ausführung verschiedener Datenverarbeitungsaufgaben automatisch oder auf Initiative des Nutzers, um die Korrektheit von Merkmalen zu überprüfen oder eine tiefgehende Analyse von Merkmalen, Korrelationen usw. zu erstellen.
Bevor du dich für einen Feature Store entscheidest, solltest du seine Funktionen gründlich vergleichen. Viele haben zum Beispiel nur eine sehr eingeschränkte Funktionalität, konzentrieren sich auf die Katalogisierung von Merkmalen oder verfügen nicht über automatisierte Transformationen, Datenmanagement im großen Maßstab und Echtzeitfunktionen. Diese Funktionen bieten den größten Nutzen, um die Produktionszeit zu verkürzen.
Architektur und Nutzung des Feature Stores
Abbildung 4-15 veranschaulicht die allgemeine Architektur und Nutzung eines Feature Stores. Rohdaten werden aufgenommen und in Features umgewandelt, und die Features werden katalogisiert und für verschiedene Anwendungen bereitgestellt (Training, Servicing, Monitoring). Über APIs und eine Benutzeroberfläche können Datenwissenschaftler, Dateningenieure und ML-Ingenieure Features aktualisieren, suchen, überwachen und nutzen.
Die wichtigsten Komponenten eines Feature Stores sind:
- Transformationsschicht
-
Wandelt Offline- oder Online-Rohdaten in Merkmale um und speichert sie sowohl in einem Online- (Schlüssel/Wert) als auch in einem Offline-Speicher (Objekt).
- Ebene der Speicherung
-
Speichert mehrere Versionen eines Features in Feature-Tabellen (Feature-Sets) und verwaltet den Lebenszyklus der Daten (Erstellen, Anhängen, Löschen, Überwachen und Sichern der Daten). Die Datenschicht speichert jedes Feature in zwei Formen: offline für Training und Analyse und online für die Bereitstellung und Überwachung.
- Feature Retrieval
-
Nimmt Anfragen nach mehreren Merkmalen (Merkmalsvektoren) und anderen Eigenschaften (z. B. Zeitbereiche und Ereignisdaten) entgegen und erstellt einen Offline-Datenschnappschuss für das Training oder einen Echtzeitvektor für die Bereitstellung.
- Metadatenmanagement und Katalogisierung
-
Speichert die Merkmalsdefinition, Metadaten, Beschriftungen und Beziehungen.
Ingestion und Transformation Service
In diesem Kapitel wurde die Komplexität der Implementierung einer umfangreichen Verarbeitung von Batch- und Echtzeitdaten, der Datenversionierung und des Metadatenmanagements erörtert. Feature Stores zielen darauf ab, diese Komplexität durch Abstraktion und Automatisierung zu reduzieren. In modernen Feature Stores werden Datenpipelines mit Hilfe von High-Level-Transformationslogik beschrieben. Diese Logik wird in die zugrundeliegende Semantik der Verarbeitungsmaschine umgewandelt und als kontinuierlicher und produktionsfähiger Dienst bereitgestellt, was einen erheblichen Entwicklungsaufwand erspart.
Die Pipeline-Implementierung ist unterschiedlich für die lokale Entwicklung (mit Paketen wie Pandas), für große Offline-Daten (mit Batch-Verarbeitung) und für Echtzeit-Daten (mit Stream-Verarbeitung). Der Vorteil eines Feature Stores, der automatisierte Transformationen unterstützt, besteht darin, dass er eine Definition für alle drei Bereitstellungsmodi verwendet und das Reengineering bei der Portierung von Datenpipelines von einer Methode zur anderen entfällt. In einigen Feature Stores wird die Datenpipeline-Technologie von den Datenquellen bestimmt, ob offline (Data Lakes, Data Warehouses, Datenbanken usw.) oder online (Streams, Message Queues, APIs, Managed Services usw.).
Merkmalspeicher implementieren die Datenaufnahme und -umwandlung für Gruppen von Merkmalen (genannt Merkmalsätze oder Merkmalsgruppen), die aus derselben Quelle stammen, z. B. alle Merkmale, die aus einem Kreditkarten-Transaktionsprotokoll extrahiert wurden. Feature-Sets nehmen Daten aus Offline- oder Online-Quellen auf, erstellen durch eine Reihe von Transformationen eine Liste von Merkmalen und speichern die daraus resultierenden Merkmale zusammen mit den zugehörigen Metadaten und Statistiken.
Abbildung 4-16 veranschaulicht den Transformationsdienst (Feature-Set). Sobald die Daten von der Quelle eingelesen wurden, durchlaufen sie einen Graphen (DAG) von Transformationen, und die daraus resultierenden Merkmale werden in die Offline- und Online-Stores geschrieben.
Beispiele für Transformationen (nach Datentyp):
- Strukturiert
-
Filtern, gruppieren, zusammenführen, aggregieren, OneHot-Kodierung, zuordnen, extrahieren und klassifizieren
- Text
-
Extrahieren, Parsen, Disassemblieren, Erkennen von Entitäten, Stimmungen und Einbettungen
- Visuell (Bilder und Videos)
-
Rahmen, Größe ändern, Objekte erkennen, zuschneiden, neu einfärben, drehen, zuordnen und klassifizieren
Der generierte Transformationsdienst sollte produktionsfähig sein und automatische Skalierung, hohe Verfügbarkeit, Live-Upgrades und mehr unterstützen. Außerdem sollte er eine kontinuierliche Dateneingabe und -verarbeitung unterstützen. Neue Daten können zum Beispiel kontinuierlich (in Echtzeit) oder in geplanten Intervallen (offline) eintreffen. Daher sind serverlose Funktionstechnologien eine hervorragende Lösung.
Merkmal Speicherung
Die Merkmale werden in der Regel in zwei Formen gespeichert: Offline-Speicherung für Schulungs- und Analyseanwendungen und Online-Speicherung für Echtzeit-Serving- und Überwachungsanwendungen. Siehe Abbildung 4-17.
Der Offline-Speicher enthält alle historischen Daten und nutzt oft Data Lakes, Objektspeicher oder Data Warehouse-Technologien. Eine gängige Wahl ist zum Beispiel die Verwendung von komprimierten Parquet-Dateien, die in einer Objektspeicherung wie AWS S3 gespeichert werden.
Der Online-Speicher enthält die aktuellsten Daten und verwendet oft NoSQL- oder Key/Value-Speicher wie Redis, AWS DynamoDB, Google BigTable und andere. Der Online-Store muss Lesefunktionen in Millisekunden unterstützen.
Feature Retrieval (für Training und Service)
Anwendungen zum Trainieren, Bedienen und Analysieren benötigen mehrere Merkmale aus verschiedenen Datensätzen und Quellen. Im Gegensatz dazu werden Merkmale in Gruppen (so genannte Merkmalsätze) organisiert, die auf ihrer Herkunft und ihrer Entität (Primärschlüssel wie Benutzer-ID, Produkt-ID usw.) basieren.
Das Abrufen mehrerer Merkmale aus verschiedenen Quellen, zu unterschiedlichen Zeiten und mit unterschiedlichen Indizes kann eine komplexe Analyseaufgabe sein. Feature-Stores bestimmen automatisch die für die JOIN
Abfrage erforderlichen Parameter auf der Grundlage der Feature-Metadaten, der Entitätsnamen und der Benutzeranfragedaten. Wenn die Datensätze transaktional sind (die Datensätze sind mit einem Zeitstempel versehen), muss die Join-Operation außerdem die zeitliche Korrektheit und den Zeitverlauf berücksichtigen, damit nur die zum Zeitpunkt des Ereignisses bekannten Werte zurückgegeben werden (auch als Join-Analyseoperation bezeichnet).
Offline-Feature-Sets können durch SQL-Abfragen aus dem Feature Store generiert werden. Bei Echtzeitanwendungen, die innerhalb von Millisekunden reagieren müssen, führt dies jedoch zu einem erheblichen Overhead, sodass andere Echtzeitmethoden verwendet werden. Außerdem können zeitbasierte Merkmale (z. B. die Anzahl der Anfragen in der letzten Stunde) nicht vorberechnet werden und erfordern eine besondere Handhabung, um ein genaues Ergebnis zu erzielen (z. B. durch die Kombination von vorberechneten Zeitfensterdaten und Ad-hoc-Berechnungen der letzten Meile).
Abbildung 4-18 zeigt den Ablauf der Merkmalsabfrage mit zwei separaten Engines, eine für die Offline-Abfrage und die andere für die Echtzeit-Abfrage. Beachte, dass im Fall von Offline der Datensatz als Snapshot oder in einem neuen Datensatz aufbewahrt wird, um die Nachverfolgung der Datenreihenfolge und die Erklärbarkeit zu ermöglichen.
Die Anfrage get_offline_features
kann Ereignisdaten als Grundlage für die Abfrage, einen gültigen Zeitbereich (z. B. wenn wir das Modell auf der Grundlage der Daten des letzten Monats trainieren wollen) und die zurückzugebenden Merkmale und Spalten (z. B. ob Index-, Zeit- oder Label-Spalten enthalten sein sollen) akzeptieren. Dann wird ein lokaler oder serverloser Analyseauftrag gestartet, der die Ergebnisse berechnet und den Merkmalsvektor-Datensatz zurückgibt.
Beim Echtzeitabruf initialisiert das System den Abrufdienst (indem es einmalig eine lokale oder entfernte Echtzeit-Analysefunktion konfiguriert, um Zeit bei den Anfragen zu sparen). Dann werden die Benutzeranfragen mit den Entitätsschlüsseln (aus den Ereignisdaten) gepusht und ein Ergebnisvektor akzeptiert. Darüber hinaus ermöglichen einige Feature-Stores die Echtzeit-Imputation (Ersetzen fehlender oder NaN-Daten durch statistische Feature-Werte aus den Feature-Metadaten).
Feature Stores Lösungen und Anwendungsbeispiele
Feature Stores begannen als interne Plattformen bei führenden Cloud-Providern (wie Uber, Spotify und Twitter). Inzwischen gibt es aber viele Open-Source- und kommerzielle Feature-Store-Lösungen auf dem Markt. Wie bei jeder wichtigen neuen Technologie gibt es jedoch auch bei diesen Lösungen viele Unterschiede in der Funktionalität, die du kennen solltest, um die richtige Lösung zu wählen.
Der wichtigste Unterschied ist, ob die Feature-Store-Plattform die Datenpipeline (Transformation) für dich verwaltet und ob sie sowohl Offline- als auch Echtzeit-Pipelines (Streaming) unterstützt. Wie du in diesem Kapitel gelesen hast, ist der Aufbau und die Verwaltung einer skalierbaren Datenpipeline die größte Herausforderung. Wenn du gezwungen bist, dies manuell zu tun, untergräbt dies den Wert eines Feature Stores erheblich.
Tabelle 4-2 vergleicht die führenden Feature Store-Lösungen:
Kategorie | Fest | Tecton | MLRun | SageMaker | Vertex AI | Databricks | HopsWorks |
---|---|---|---|---|---|---|---|
Offene Quelle |
Ja |
Nein |
Ja |
Nein |
Nein |
Nein |
Ja |
Verwaltete Option |
Nein |
große Wolken |
Cloud + On-Premise |
auf AWS |
auf GCP |
große Wolken |
Cloud + On-Premise |
Offline-Pipelines |
Nein |
Ja |
Ja |
Nein |
Nein |
Nein |
Ja |
Pipelines in Echtzeit |
Nein |
Ja |
Ja |
Nein |
Nein |
Nein |
Nein |
Feature Retrieval |
Ja |
Ja |
Ja |
Ja |
Ja |
Ja |
Ja |
Motoren |
Funke |
Funke |
Python, Dask, Spark, Nuclio |
Keine |
Funke |
Funke |
Spark, Flink |
Merkmal Analytik |
Nein |
Ja |
Ja |
Nein |
Nein |
Nein |
Ja |
Versionierung und Abstammung |
Nein |
Ja |
Ja |
Nein |
Nein |
Nein |
Ja |
Merkmale Sicherheit |
Nein |
Ja |
Ja |
Ja |
Nein |
Nein |
Nein |
Überwachung |
Nein |
Ja |
Ja |
Nein |
Nein |
Nein |
Ja |
Leimfreie Ausbildung und Bedienung |
Nein |
Nein |
Ja |
Nein |
Nein |
Nein |
Ja |
In den folgenden Abschnitten wird gezeigt, wie Feature Stores mit den beiden führenden Open-Source-Frameworks verwendet werden: Feast und MLRun. Beachte, dass MLRun einen größeren Funktionsumfang hat und neben vielen anderen einzigartigen Funktionen auch Offline- und Online-Transformationsdienste (basierend auf den serverlosen Engines von MLRun) bietet.
Feast Feature Store verwenden
Feast bietet keinen Transformationsdienst an. Die Daten sollten im Voraus vorbereitet und in einer unterstützten Quelle (wie S3, GCS, BigQuery) gespeichert werden. Feast registriert den Quelldatensatz und seine Metadaten (Schema, Entität usw.) in einem FeatureView-Objekt, wie in Beispiel 4-9 gezeigt.
Beispiel 4-9. Definieren von Feast FeatureView (Quelle: Feast)
# Read data from parquet files. Parquet is convenient for local development mode.
# For production, you can use your favorite DWH, such as BigQuery. See Feast
# documentation for more info.
driver_hourly_stats
=
FileSource
(
name
=
"driver_hourly_stats_source"
,
path
=
"/content/feature_repo/data/driver_stats.parquet"
,
timestamp_field
=
"event_timestamp"
,
created_timestamp_column
=
"created"
,
)
# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver
=
Entity
(
name
=
"driver"
,
join_keys
=
[
"driver_id"
])
# Our parquet files contain sample data that includes a driver_id column, timestamps
# and three feature column. Here we define a Feature View that will allow us to serve
# this data to our model online.
driver_hourly_stats_view
=
FeatureView
(
name
=
"driver_hourly_stats"
,
entities
=
[
driver
],
ttl
=
timedelta
(
days
=
1
),
schema
=
[
Field
(
name
=
"conv_rate"
,
dtype
=
Float32
),
Field
(
name
=
"acc_rate"
,
dtype
=
Float32
),
Field
(
name
=
"avg_daily_trips"
,
dtype
=
Int64
),
],
online
=
True
,
source
=
driver_hourly_stats
,
tags
=
{},
)
Feast bietet keinen Online-Transformations- oder Ingestion-Dienst. Stattdessen muss der Nutzer eine Materialisierungsaufgabe ausführen, um die Offline-Features in den Echtzeitspeicher (Datenbank) zu kopieren. Leider bedeutet dies auch, dass die im Online-Speicher gespeicherten Daten zwischen den Materialisierungen ungenau sind, und wenn die Materialisierung zu häufig ausgeführt wird, kann dies zu einem erheblichen Berechnungsaufwand führen.
Ausführen der Materialisierungsaufgabe über das SDK:
store
=
FeatureStore
(
repo_path
=
"."
)
store
.
materialize_incremental
(
datetime
.
now
())
Das Projekt kann eine oder mehrere Merkmalsansichten enthalten, die jeweils unabhängig voneinander definiert und materialisiert werden. Merkmale können aus einer oder mehreren Feature-Ansichten abgerufen werden (dies löst eine JOIN-Operation aus).
Um Offline-Features (direkt von der Offline-Quelle) abzurufen, verwendest du den API-Aufruf get_historical_features()
wie in Beispiel 4-10 gezeigt.
Beispiel 4-10. Abrufen von Offline-Features mit Feast (Quelle: Feast)
# The entity dataframe is the dataframe we want to enrich with feature values
# see https://docs.feast.dev/getting-started/concepts/feature-retrieval for details
# for all entities in the offline store instead
entity_df
=
pd
.
DataFrame
.
from_dict
(
{
# entity's join key -> entity values
"driver_id"
:
[
1001
,
1002
,
1003
],
# "event_timestamp" (reserved key) -> timestamps
"event_timestamp"
:
[
datetime
(
2021
,
4
,
12
,
10
,
59
,
42
),
datetime
(
2021
,
4
,
12
,
8
,
12
,
10
),
datetime
(
2021
,
4
,
12
,
16
,
40
,
26
),
],
# (optional) label name -> label values. Feast does not process these
"label_driver_reported_satisfaction"
:
[
1
,
5
,
3
],
# values we're using for an on-demand transformation
"val_to_add"
:
[
1
,
2
,
3
],
"val_to_add_2"
:
[
10
,
20
,
30
],
}
)
store
=
FeatureStore
(
repo_path
=
"."
)
# retrieve offline features, feature names are specified with <view>:<feature-name>
training_df
=
store
.
get_historical_features
(
entity_df
=
entity_df
,
features
=
[
"driver_hourly_stats:conv_rate"
,
"driver_hourly_stats:acc_rate"
,
"driver_hourly_stats:avg_daily_trips"
,
"transformed_conv_rate:conv_rate_plus_val1"
,
"transformed_conv_rate:conv_rate_plus_val2"
,
],
)
.
to_df
()
(
"----- Example features -----
\n
"
)
(
training_df
.
head
())
Um Online-Funktionen aus dem Online-Store abzurufen, verwenden wir den API-Aufruf get_online_features()
, wie in Beispiel 4-11 gezeigt.
Beispiel 4-11. Abrufen von Online-Features mit Feast (Quelle: Feast)
from
pprint
import
pprint
from
feast
import
FeatureStore
store
=
FeatureStore
(
repo_path
=
"."
)
feature_vector
=
store
.
get_online_features
(
features
=
[
"driver_hourly_stats:acc_rate"
,
"driver_hourly_stats:avg_daily_trips"
,
"transformed_conv_rate:conv_rate_plus_val1"
,
"transformed_conv_rate:conv_rate_plus_val2"
,
],
entity_rows
=
[
# {join_key: entity_value}
{
"driver_id"
:
1001
,
"val_to_add"
:
1000
,
"val_to_add_2"
:
2000
,
},
{
"driver_id"
:
1002
,
"val_to_add"
:
1001
,
"val_to_add_2"
:
2002
,
},
],
)
.
to_dict
()
pprint
(
feature_vector
)
# results:
{
'acc_rate'
:
[
0.86463862657547
,
0.6959823369979858
],
'avg_daily_trips'
:
[
359
,
311
],
'conv_rate_plus_val1'
:
[
1000.6638441681862
,
1001.1511893719435
],
'conv_rate_plus_val2'
:
[
2000.6638441681862
,
2002.1511893719435
],
'driver_id'
:
[
1001
,
1002
]}
MLRun Feature Store verwenden
MLRun unterstützt die Registrierung bestehender Quellen (wie Feast) oder die Definition einer Datenpipeline zur Umwandlung von Quelldaten in Features. Bei der Definition der Datenpipeline ( Graph genannt) stellt MLRun die ausgewählte Datenverarbeitungs-Engine auf der Grundlage der abstrakten Benutzerdefinitionen bereit. MLRun unterstützt einige Verarbeitungs-Engines, darunter lokales Python, Dask, Spark und Nuclio (eine serverlose Echtzeit-Engine).
In MLRun schreibt die Pipeline standardmäßig in Online- und Offline-Speicher, sodass keine separaten Materialisierungsaufträge erforderlich sind und die Online- und Offline-Funktionen immer synchronisiert sind. Außerdem kann MLRun das Datenschema automatisch erkennen, was es einfacher und robuster macht.
MLRun trennt die Definition des Feature-Sets (eine Sammlung von Features, die von der gleichen Pipeline erzeugt werden) von den Definitionen der Datenquellen. Auf diese Weise kannst du dasselbe Feature Set in der interaktiven Entwicklung und in der Produktion verwenden. Tausche einfach die Datenquelle von einer lokalen Datei in der Entwicklung gegen eine Datenbank oder einen Echtzeit-Kafka-Stream im Produktionseinsatz aus.
Beispiel 4-12 zeigt ein Beispiel für die Definition eines Feature Sets für die Verarbeitung von Kreditkartentransaktionen zur Aufdeckung von Kreditkartenbetrug. Die Definition umfasst die Entität, den Zeitstempel und den Transformationsgraphen mit eingebauten Operatoren und Aggregationen. Beachte, dass ein Nutzer auch eigene Python-Operatoren hinzufügen kann. Siehe das vollständige Beispiel.
Die Datenpipeline besteht aus den folgenden Elementen:
-
Extrahieren der Datenkomponenten (Stunde, Wochentag).
-
Zuordnung der Alterswerte
-
One-Hot-Codierung für die Transaktionskategorie und das Geschlecht
-
Aggregieren des Betrags (Mittelwert, Summe, Anzahl, Maximum über 2/12/24-Stunden-Zeitfenster)
-
Aggregieren der Transaktionen pro Kategorie (über 14-Tage-Zeitfenster)
-
Schreiben der Ergebnisse in Offline- (Parquet) und Online- (NoSQL) Ziele
Beispiel 4-12. MLRun FeatureSet definieren (Quelle: MLRun)
import
mlrun.feature_store
as
fs
# Define the credit transactions FeatureSet
transaction_set
=
fs
.
FeatureSet
(
"transactions"
,
entities
=
[
fs
.
Entity
(
"source"
)],
timestamp_key
=
'timestamp'
,
description
=
"transactions feature set"
)
# Define and add value mapping
main_categories
=
[
"es_transportation"
,
"es_health"
,
"es_otherservices"
,
"es_food"
,
"es_hotelservices"
,
"es_barsandrestaurants"
,
"es_tech"
,
"es_sportsandtoys"
,
"es_wellnessandbeauty"
,
"es_hyper"
,
"es_fashion"
,
"es_home"
,
"es_contents"
,
"es_travel"
,
"es_leisure"
]
# One Hot Encode the newly defined mappings
one_hot_encoder_mapping
=
{
'category'
:
main_categories
,
'gender'
:
list
(
transactions_data
.
gender
.
unique
())}
# Define the data pipeline (graph) steps
transaction_set
.
graph
\.
to
(
DateExtractor
(
parts
=
[
'hour'
,
'day_of_week'
],
timestamp_col
=
'timestamp'
))
\.
to
(
MapValues
(
mapping
=
{
'age'
:
{
'U'
:
'0'
}},
with_original_features
=
True
))
\.
to
(
OneHotEncoder
(
mapping
=
one_hot_encoder_mapping
))
# Add aggregations for 2, 12, and 24 hour time windows
transaction_set
.
add_aggregation
(
name
=
'amount'
,
column
=
'amount'
,
operations
=
[
'avg'
,
'sum'
,
'count'
,
'max'
],
windows
=
[
'2h'
,
'12h'
,
'24h'
],
period
=
'1h'
)
# Add the category aggregations over a 14 day window
for
category
in
main_categories
:
transaction_set
.
add_aggregation
(
name
=
category
,
column
=
f
'category_
{
category
}
'
,
operations
=
[
'count'
],
windows
=
[
'14d'
],
period
=
'1d'
)
Die Datenpipeline kann mit transaction_set.plot(rankdir="LR", with_targets=True)
visualisiert werden, wie in Abbildung 4-19 zu sehen ist.
Sobald du die Feature-Set-Definition hast, kannst du sie mit derpreview()
Methode testen und debuggen, mit der die Datenpipeline lokal ausgeführt wird und du die Ergebnisse sehen kannst:
df
=
fs
.
preview
(
transaction_set
,
transactions_data
)
df
.
head
()
Wenn die Definition des Featuresets abgeschlossen ist, kannst du es als Produktionsjob einsetzen, der bei Bedarf, nach einem bestimmten Zeitplan oder als Echtzeit-Pipeline läuft.
Für die Batch-Ingestion verwendest du die Methode ingest()
. Für Echtzeit-Ingestion von HTTP oder Streams verwendest du deploy_ingestion_service_v2()
, die eine serverlose Echtzeit-Pipeline von Nuclio startet. Siehe Beispiel 4-13.
Beispiel 4-13. Daten in das MLRun FeatureSet einlesen (Quelle: MLRun)
# Batch ingest the transactions dataset (from CSV file) through the defined pipeline
source
=
CSVSource
(
"mycsv"
,
path
=
"measurements.csv"
)
fs
.
ingest
(
transaction_set
,
source
=
source
)
# Deploy a real-time pipeline with HTTP API endpoint as the source
# MLRun support other real-time sources like Kafka, Kinesis, etc.
source
=
HTTPSource
()
fs
.
deploy_ingestion_service_v2
(
transaction_set
,
source
)
Du kannst die Feature-Sets, ihre Metadaten und Statistiken im MLRun Feature Store UI beobachten. Siehe Abbildung 4-20.
Das Abrufen von Merkmalen in MLRun erfolgt über das Feature-Vektor-Objekt. Feature-Vektoren enthalten die Definitionen der angeforderten Features und zusätzliche Parameter. Darüber hinaus speichern sie auch berechnete Werte wie die Metadaten der Features, Statistiken usw., die beim Training, bei der Bedienung oder bei Überwachungsaufgaben hilfreich sein können. Feature-Statistiken werden zum Beispiel für die automatische Wertberechnung bei fehlenden oder NaN-Feature-Werten und für die Überwachung der Modellabweichung in der Serving-Anwendung verwendet.
Feature-Vektoren können in der Benutzeroberfläche von MLRun erstellt, aktualisiert und angezeigt werden.
Die Nutzer definieren zunächst den Feature-Vektor und können ihn dann verwenden, um Offline- oder Online-Features zu erhalten. In Beispiel 4-14 siehst du, wie du Offline-Features abrufst und die Methode get_offline_features()
verwendest.
Beispiel 4-14. Offline-Features von MLRun abrufen (Quelle: MLRun)
# Define the list of features you will be using (<feature-set>.<feature>)
features
=
[
'transactions.amount_max_2h'
,
'transactions.amount_sum_2h'
,
'transactions.amount_count_2h'
,
'transactions.amount_avg_2h'
,
'transactions.amount_max_12h'
]
# Import MLRun's Feature Store
import
mlrun.feature_store
as
fstore
# Define the feature vector name for future reference
fv_name
=
'transactions-fraud'
# Define the feature vector using our Feature Store
transactions_fv
=
fstore
.
FeatureVector
(
fv_name
,
features
,
label_feature
=
"labels.label"
,
description
=
'Predicting a fraudulent transaction'
)
# Save the feature vector definition in the Feature Store
transactions_fv
.
save
()
# Get offline feature vector as dataframe and save the dataset to a parquet file
train_dataset
=
fstore
.
get_offline_features
(
transactions_fv
,
target
=
ParquetTarget
())
# Preview the dataset
train_dataset
.
to_dataframe
()
.
tail
(
5
)
Um Echtzeit-Features abzurufen, musst du zunächst einen Dienst definieren (der die Echtzeit-Abrufpipeline initialisiert), gefolgt von .get()
Methoden, um Feature-Werte in Echtzeit abzufragen. Die Trennung zwischen der Erstellung des Dienstes (einmalige Initialisierung) und den einzelnen Abfragen sorgt für geringere Abfragelatenzen. Darüber hinaus unterstützt MLRun die automatische Berechnung von Werten auf der Grundlage der Metadaten und Statistiken des Merkmals. Dadurch kann ein erheblicher Entwicklungs- und Berechnungsaufwand eingespart werden. Siehe Beispiel 4-15.
Beispiel 4-15. Online-Funktionen von MLRun abrufen (Quelle: MLRun)
# Create the online feature service, substitute NaN values with
# the feature mean value
svc
=
fstore
.
get_online_feature_service
(
'transactions-fraud:latest'
,
impute_policy
=
{
"*"
:
"$mean"
})
# Get sample feature vector
sample_fv
=
svc
.
get
([{
'source'
:
'C76780537'
}])
# sample_fv Result
[{
'amount_max_2h'
:
14.68
,
'amount_max_12h'
:
70.81
,
'amount_sum_2h'
:
14.68
,
'amount_count_2h'
:
1.0
,
'amount_avg_2h'
:
14.68
}]
Hinweis
Die Merkmalspeicher von MLRun bieten genaue Echtzeit-Aggregationen und niedrige Latenzzeiten, indem sie vorberechnete Werte während des Ingestionsprozesses mit Echtzeitberechnungen zum Zeitpunkt der Merkmalsanfrage kombinieren.
Das MLRun-Framework bietet eine Modellentwicklungs- und Trainingspipeline, Echtzeit-Serving-Pipelines und eine integrierte Modellüberwachung. Der Funktionsspeicher von MLRun ist nativ in die anderen Komponenten integriert, wodurch überflüssige Glue-Logik, Metadatenübersetzung usw. entfallen und die Zeit bis zur Produktion verkürzt wird.
Fazit
Da die Datenverwaltung und -verarbeitung die wichtigsten Komponenten von ML sind, ist es wichtig zu wissen, wie man datenbezogene Aufgaben optimal erledigt. Dieses Kapitel befasst sich mit den empfohlenen Tools und Praktiken für die verschiedenen Phasen der Arbeit mit deinen Daten. Zu Beginn des Kapitels haben wir uns mit der Versionierung und der Herkunft der Daten befasst, die für die Rückverfolgung des Ursprungs der Daten unerlässlich sind. Dann haben wir uns mit der Datenaufbereitung und -analyse im großen Maßstab befasst, also damit, wie die Daten behandelt werden, damit sie in der Produktion verwendet werden können. In diesem Abschnitt haben wir auch die Architektur interaktiver Datenverarbeitungslösungen und die Unterschiede zwischen Batch-Datenverarbeitung und Echtzeitverarbeitung besprochen.
Nachdem wir die Herausforderungen bei der Umsetzung dieser Praktiken im großen Maßstab besprochen hatten, stellten wir das Konzept der Feature Stores vor, die ein zentrales Repository für ML-Features sind. Wir haben uns mit den Funktionen eines Feature Stores befasst, z. B. mit der Datenkonnektivität und der Offline- und Online-Transformation. Wir haben auch gezeigt, wo der Feature Store in die MLOps-Pipeline passt, von der Aufnahme von Rohdaten bis hin zur Unterstützung der Nutzung dieser Daten beim Training, Servicing, Monitoring und mehr. Zum Schluss haben wir verschiedene Feature-Store-Lösungen und ihre Verwendung vorgestellt.
Kritisches Denken - Diskussionsfragen
-
Welche Details liefern Metadaten? Warum brauchen wir als Datenexperten diese Informationen?
-
Welche Open-Source-Tools zur Datenversionierung gibt es? Welches könnte für dein Unternehmen geeignet sein?
-
Was ist der Unterschied zwischen Stapelverarbeitung und Stream Processing? Wann wird beides verwendet?
-
Wie vereinfacht ein Feature Store die Datenverwaltung und -verarbeitung? Welche Funktionen ermöglichen dies?
-
Was sind die Unterschiede zwischen dem Feast und dem MLRun Feature Store? Welcher könnte für deine Organisation der richtige sein?
Übungen
-
Wähle eine Open-Source-Lösung (DVC, Pachyderm, MLflow oder MLRun) und erstelle ein Skript oder einen Workflow für die Datenversionierung, mit dem du Daten und Metadaten aufzeichnen und versionieren kannst.
-
Erstelle einen Prototyp einer Stapelverarbeitungspipeline mit einem Werkzeug deiner Wahl.
-
Verbinde einen Trino-Datenkonnektor mit einer Datenquelle.
-
Trainiere ein Demomodell (du kannst Hugging Face verwenden, wenn du ein Beispielmodell brauchst) mit einem Feature Store.
-
Erstelle ein Feature Set und eine Ingestion Pipeline in MLRun. Du kannst dieses Projekt als Referenz verwenden.
Get Implementierung von MLOps im Unternehmen now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.