Kapitel 4. Kubeflow-Pipelines
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Im vorherigen Kapitel haben wir Kubeflow Pipelines beschrieben,die Komponente von Kubeflow, die Machine-Learning-Anwendungen orchestriert. Die Orchestrierung ist notwendig, weil eine typische Machine-Learning-Implementierung eine Kombination von Tools verwendet, um Daten vorzubereiten, das Modell zu trainieren, die Leistung zu bewerten und es einzusetzen. Durch die Formalisierung der Schritte und ihrer Abfolge im Code ermöglichen es Pipelines den Nutzern, alle Schritte der Datenverarbeitung formal zu erfassen und ihre Reproduzierbarkeit und Überprüfbarkeit sowie die Trainings- und Bereitstellungsschritte zu gewährleisten.
Zu Beginn dieses Kapitels werfen wir einen Blick auf die Pipelines-Benutzeroberfläche und zeigen dir, wie du einfache Pipelines in Python schreiben kannst. Wir werden uns ansehen, wie Daten zwischen den einzelnen Phasen übertragen werden können, und uns dann damit beschäftigen, wie man bestehende Anwendungen als Teil einer Pipeline nutzen kann. Außerdem schauen wir uns die zugrunde liegende Workflow-Engine Argo Workflows an, ein Standard-Kubernetes-Pipeline-Tool, das Kubeflow zur Ausführung von Pipelines verwendet. Wenn du die Grundlagen von Argo Workflows verstehst, kannst du ein tieferes Verständnis für Kubeflow-Pipelines entwickeln und die Fehlersuche erleichtern. Anschließend zeigen wir, was Kubeflow Pipelines zu Argo beiträgt.
Zum Abschluss von Kubeflow Pipelines zeigen wir dir, wie du eine bedingte Ausführung in Pipelines implementierst und wie du Pipelines zeitgesteuert ausführst. Aufgabenspezifische Komponenten von Pipelines werden in den jeweiligen Kapiteln behandelt.
Erste Schritte mit Pipelines
Die Kubeflow Pipelines Plattform besteht aus:
-
Eine Benutzeroberfläche für die Verwaltung und Verfolgung von Pipelines und deren Ausführung
-
Ein Motor für die Planung der Ausführung einer Pipeline
-
Ein SDK zum Definieren, Erstellen und Bereitstellen von Pipelines in Python
-
Notebook-Unterstützung für die Verwendung des SDK und die Pipeline-Ausführung
Der einfachste Weg, sich mit Pipelines vertraut zu machen, ist ein Blick auf vorgefertigte Beispiele.
Erkundung der vorgefertigten Probenpipelines
Um das Verständnis von Pipelines zu erleichtern, wird Kubeflow mit einigen Beispielpipelines installiert. Du findest diese vorgefertigten in der Pipeline Web UI, wie in Abbildung 4-1 zu sehen ist. Beachte, dass zum Zeitpunkt der Erstellung dieses Artikels nur die Pipelines Basic to Conditional execution generisch sind, während der Rest nur auf der Google Kubernetes Engine (GKE) läuft. Wenn du versuchst, sie in anderen Umgebungen als GKE auszuführen, werden sie fehlschlagen.
Wenn du auf eine bestimmte Pipeline klickst, wird ihr Ausführungsdiagramm oder ihre Quelle angezeigt, wie in Abbildung 4-2 dargestellt.
Wenn du auf die Registerkarte "Quelle" klickst, wird der kompilierte Code der Pipeline angezeigt,, der eine Argo YAML-Datei ist (mehr dazu in "Argo: die Grundlage der Pipelines").
In diesem Bereich kannst du gerne mit laufenden Pipelines experimentieren, um ein besseres Gefühl für ihre Ausführung und die Möglichkeiten der Pipelines-Benutzeroberfläche zu bekommen.
Um eine bestimmte Pipeline aufzurufen, klickst du sie einfach an. Daraufhin wird die Pipeline-Ansicht wie in Abbildung 4-3 dargestellt angezeigt.
Um die Pipeline zu starten, klicke auf die Schaltfläche "Lauf erstellen" und folge den Anweisungen auf dem Bildschirm.
Tipp
Wenn du eine Pipeline ausführst, musst du ein Experiment auswählen. Experiment ist hier nur eine praktische Gruppierung für Pipeline-Ausführungen (Läufe). Du kannst immer das "Standard"-Experiment verwenden, das bei der Installation von Kubeflow erstellt wird. Wähle außerdem "Einmalig" als Lauftyp, um die Pipeline einmalig auszuführen. Über die wiederkehrende Ausführung sprechen wir in "Pipelines nach Zeitplan ausführen".
Aufbau einer einfachen Pipeline in Python
Wir haben gesehen, wie vorkompilierte Kubeflow-Pipelines ausgeführt werden können. Jetzt wollen wir untersuchen, wie wir unsere eigenen neuen Pipelines erstellen können. Kubeflow-Pipelines werden als YAML-Dateien gespeichert, die von einem Programm namens Argo ausgeführt werden (siehe "Argo: die Grundlage der Pipelines"). Glücklicherweise stellt Kubeflow eine domänenspezifische Sprache (DSL) in Python zur Verfügung, um Pipelines zu erstellen. Die DSL ist eine Python-Darstellung der Operationen, die im ML-Workflow ausgeführt werden, und wurde speziell für ML-Workloads entwickelt. Die DSL ermöglicht auch die Verwendung einiger einfacher Python-Funktionen als Pipeline-Stufen, ohne dass du explizit einen Container erstellen musst.
Tipp
Die Beispiele aus Kapitel 4 findest du in den Notebooks im GitHub-Repository dieses Buches.
Eine Pipeline ist im Wesentlichen ein Graph der Containerausführung. Unter kannst du nicht nur angeben, welche Container in welcher Reihenfolge ausgeführt werden sollen, sondern auch Argumente an die gesamte Pipeline und zwischen den beteiligten Containern übergeben.
Für jeden Container (bei Verwendung des Python SDK) müssen wir:
-
Erstelle den Container - entweder als einfache Python-Funktion oder mit einem beliebigen Docker-Container (mehr dazu in Kapitel 9).
-
Erstelle eine Operation, die auf diesen Container verweist, sowie die Befehlszeilenargumente, Datenhalterungen und Variablen, um den Container zu übergeben.
-
Lege die Reihenfolge der Vorgänge fest, indem du definierst, welche Vorgänge parallel ablaufen können und welche abgeschlossen sein müssen, bevor du zu einem weiteren Schritt übergehst.1
-
Kompiliere diese in Python definierte Pipeline in eine YAML-Datei, die von Kubeflow Pipelines verarbeitet werden kann.
Pipelines sind ein zentrales Merkmal von Kubeflow und du wirst sie im Laufe des Buches immer wieder sehen. In diesem Kapitel werden wir möglichst einfache Beispiele zeigen, um die Grundprinzipien von Pipelines zu veranschaulichen. Es wird sich nicht wie "maschinelles Lernen" anfühlen und das ist auch so gewollt.
Für unsere erste Kubeflow-Operation werden wir eine Technik verwenden, die als leichtgewichtige Python-Funktionen bekannt ist. Wir sollten uns jedoch nicht von dem Wort " leichtgewichtig" täuschen lassen. Bei einer leichtgewichtigen Python-Funktion definieren wir eine Python-Funktion und überlassen es dann Kubeflow, diese Funktion in einen Container zu packen und eine Operation zu erstellen.
Der Einfachheit halber erklären wir die einfachste aller Funktionen als Echo. Das ist eine Funktion, die eine einzige Eingabe, eine ganze Zahl, benötigt und diese zurückgibt.
Beginnen wir damit, kfp
zu importieren und unsere Funktion zu definieren:
import
kfp
def
simple_echo
(
i
:
int
)
->
int
:
return
i
Warnung
Beachte, dass wir snake_case
, nicht camelCase
, für unsere Funktionsnamen verwenden. Zum Zeitpunkt der Erstellung dieses Artikels gibt es einen Fehler (Feature?), der dazu führt, dass Namen in Großbuchstaben (z. B. unsere Funktion simpleEcho
) zu Fehlern führen.
Als Nächstes wollen wir unsere Funktion simple_echo
in eine Kubeflow-Pipeline-Operation verpacken. Dafür gibt es eine nette kleine Methode: kfp.components.func_to_container_op
. Diese Methode gibt eine Fabrikfunktion mit einer stark typisierten Signatur zurück:
simpleStronglyTypedFunction
=
kfp
.
components
.
func_to_container_op
(
deadSimpleIntEchoFn
)
Wenn wir im nächsten Schritt eine Pipeline erstellen, wird die Fabrikfunktion eine ContainerOp konstruieren, die die ursprüngliche Funktion (echo_fn) in einem Container ausführt:
foo
=
simpleStronglyTypedFunction
(
1
)
type
(
foo
)
Out
[
5
]:
kfp
.
dsl
.
_container_op
.
ContainerOp
Tipp
Wenn dein Code von einer GPU beschleunigt werden kann, ist es einfach, eine Stufe als GPU-Ressourcen verwendend zu markieren ; füge einfach .set_gpu_limit(NUM_GPUS)
zu deinem ContainerOp
hinzu.
Nun ordnen wir die ContainerOp(s) (es gibt nur eine) in eine Pipeline ein. Diese Pipeline benötigt einen Parameter (die Nummer, die wir als Echo ausgeben). Die Pipeline hat auch ein paar Metadaten, die mit ihr verbunden sind. Während das Echo von Zahlen eine triviale Verwendung von Parametern sein mag, würdest du in realen Anwendungsfällen Variablen einbeziehen, die du später anpassen möchtest, wie z. B. Hyperparameter für Algorithmen des maschinellen Lernens.
Schließlich kompilieren wir unsere Pipeline in eine gezippte YAML-Datei, , die wir dann in die Pipelines UI hochladen können.
@kfp.dsl.pipeline
(
name
=
'Simple Echo'
,
description
=
'This is an echo pipeline. It echoes numbers.'
)
def
echo_pipeline
(
param_1
:
kfp
.
dsl
.
PipelineParam
):
my_step
=
simpleStronglyTypedFunction
(
i
=
param_1
)
kfp
.
compiler
.
Compiler
()
.
compile
(
echo_pipeline
,
'echo-pipeline.zip'
)
Tipp
Es ist auch möglich, die Pipeline direkt im Notebook auszuführen, was wir im nächsten Beispiel tun werden.
Eine Pipeline mit nur einer Komponente ist nicht sehr interessant. In unserem nächsten Beispiel werden wir die Container für unsere leichtgewichtigen Python-Funktionen anpassen. Wir erstellen eine neue Pipeline, die zusätzliche Python-Bibliotheken installiert und importiert, von einem bestimmten Basisbild baut und die Ausgabe zwischen denContainern weiterleitet.
Wir werden eine Pipeline erstellen, die eine Zahl durch eine andere Zahl dividiert und dann eine dritte Zahl addiert. Zuerst erstellen wir unsere einfache Funktion add
, wie in Beispiel 4-1 gezeigt.
Beispiel 4-1. Eine einfache Python-Funktion
def
add
(
a
:
float
,
b
:
float
)
->
float
:
'''Calculates sum of two arguments'''
return
a
+
b
add_op
=
comp
.
func_to_container_op
(
add
)
Als Nächstes wollen wir eine etwas komplexere Funktion erstellen. Außerdem soll diese Funktion eine nicht standardisierte Python-Bibliothek, numpy
, benötigen und importieren. Dies muss innerhalb der Funktion geschehen. Der Grund dafür ist, dass globale Importe aus dem Notebook nicht in die von uns erstellten Container verpackt werden. Natürlich ist es auch wichtig, dass unser Container die Bibliotheken, die wir importieren, installiert hat.
Dazu übergeben wir den Container, den wir als Basisbild verwenden wollen, an .func_to_container(
, wie in Beispiel 4-2.
Beispiel 4-2. Eine weniger einfache Python-Funktion
from
typing
import
NamedTuple
def
my_divmod
(
dividend
:
float
,
divisor
:
float
)
-
>
\
NamedTuple
(
'
MyDivmodOutput
'
,
[
(
'
quotient
'
,
float
)
,
(
'
remainder
'
,
float
)
]
)
:
'''Divides two numbers and calculate the quotient and remainder'''
#Imports inside a component function:
import
numpy
as
np
#This function demonstrates how to use nested functions inside a
# component function:
def
divmod_helper
(
dividend
,
divisor
)
:
return
np
.
divmod
(
dividend
,
divisor
)
(
quotient
,
remainder
)
=
divmod_helper
(
dividend
,
divisor
)
from
collections
import
namedtuple
divmod_output
=
namedtuple
(
'
MyDivmodOutput
'
,
[
'
quotient
'
,
'
remainder
'
]
)
return
divmod_output
(
quotient
,
remainder
)
divmod_op
=
comp
.
func_to_container_op
(
my_divmod
,
base_image
=
'
tensorflow/tensorflow:1.14.0-py3
'
)
Jetzt werden wir eine Pipeline erstellen. Die Pipeline in Beispiel 4-3 verwendet die zuvor definierten Funktionen my_divmod
und add
als Stufen.
Beispiel 4-3. Eine einfache Pipeline
@dsl.pipeline( name='Calculation pipeline', description='A toy pipeline that performs arithmetic calculations.' ) def calc_pipeline( a='a', b='7', c='17', ): #Passing pipeline parameter and a constant value as operation arguments add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. #Passing a task output reference as operation arguments #For an operation with a single return value, the output # reference can be accessed using `task.output` # or `task.outputs['output_name']` syntax divmod_task = divmod_op(add_task.output, b) #For an operation with multiple return values, the output references # can be accessed using `task.outputs['output_name']` syntax result_task = add_op(divmod_task.outputs['quotient'], c)
Schließlich verwenden wir den Client, um die Pipeline zur Ausführung zu übermitteln, die die Links zu Ausführung und Experiment zurückgibt. Experimente fassen die Ausführungen zusammen. Du kannst auch kfp.compiler.Compiler().compile
verwenden und die Zip-Datei wie im ersten Beispiel hochladen, wenn du möchtest:
client
=
kfp
.
Client
()
#Specify pipeline argument values
# arguments = {'a': '7', 'b': '8'} #whatever makes sense for new version
#Submit a pipeline run
client
.
create_run_from_pipeline_func
(
calc_pipeline
,
arguments
=
arguments
)
Über den von create_run_from_pipeline_func
zurückgegebenen Link gelangen wir zur Web-UI der Ausführung , die die Pipeline selbst und die Zwischenergebnisse anzeigt (siehe Abbildung 4-4).
Wie wir gesehen haben, bezieht sich das Leichtgewicht in den leichtgewichtigen Python-Funktionen auf die Leichtigkeit, mit der wir diese Schritte in unserem Prozess durchführen, und nicht auf die Leistungsfähigkeit der Funktionen selbst. Wir können benutzerdefinierte Importe, Basisbilder und die Weitergabe von kleinen Ergebnissen zwischen Containern verwenden.
Im nächsten Abschnitt zeigen wir dir, wie du größere Datendateien zwischen Containern weitergeben kannst, indem du Volumes in die Container einhängst.
Speichern von Daten zwischen den Schritten
Im vorherigen Beispiel waren die zwischen Containern übertragenen Daten klein und von primitiven Typen (wie Zahlen, Strings, Listen und Arrays). In der Praxis werden wir jedoch wahrscheinlich viel größere Daten (z. B. ganze Datensätze) weitergeben. In Kubeflow gibt es dafür zwei primäre Methoden: persistente Volumes innerhalb des Kubernetes-Clusters und Cloud-Speicherung (z. B. S3), wobei jede Methode mit Problemen verbunden ist.
Persistente Volumes abstrahieren die Ebene der Speicherung. Je nach Anbieter können persistente Volumes bei der Bereitstellung langsam sein und IO-Limits haben. Überprüfe, ob dein Anbieter read-write-many Speicherklassen unterstützt, die den Speicherzugriff durch mehrere Pods ermöglichen, was für einige Arten der Parallelität erforderlich ist. Speicherklassen können eine der folgenden sein.2
- ReadWriteOnce
-
Das Volume kann von einem einzelnen Knoten als Lese- und Schreibzugriff gemountet werden.
- ReadOnlyMany
-
Das Volume kann von vielen Nodes schreibgeschützt gemountet werden.
- ReadWriteMany
-
Das Volume kann von vielen Knoten als Lese- und Schreibzugriff gemountet werden.
Dein System-/Clusteradministrator kann vielleicht die Unterstützung für Read-Write-Many hinzufügen.3 Darüber hinaus bieten viele Cloud-Provider eigene Read-Write-Many-Implementierungen an, z. B. Dynamic Provisioning auf GKE. Frag aber unbedingt nach, ob es einen Engpass bei einem einzelnen Knoten gibt.
Mit VolumeOp
von Kubeflow Pipelines kannst du ein automatisch verwaltetes persistentes Volume erstellen, wie in Beispiel 4-4 gezeigt. Um das Volume zu deiner Operation hinzuzufügen, rufst du einfach add_pvolumes
mit einem Wörterbuch von Einhängepunkten zu Volumes auf, z.B. download_data_op(year).add_pvolumes({"/data_processing": dvop.volume})
.
Beispiel 4-4. Mailinglisten-Daten vorbereiten
dvop
=
dsl
.
VolumeOp
(
name
=
"create_pvc"
,
resource_name
=
"my-pvc-2"
,
size
=
"5Gi"
,
modes
=
dsl
.
VOLUME_MODE_RWO
)
In den Kubeflow-Beispielen ist die Verwendung einer Objektspeicherlösung ( ) zwar weniger üblich, aber in manchen Fällen besser geeignet. MinIO bietet eine Cloud-native Speicherung von Objekten, indem es entweder als Gateway zu einer bestehenden Objektspeicher-Engine oder als eigenständige Lösung arbeitet.4 Wie du MinIO konfigurierst, haben wir bereits in Kapitel 3 beschrieben.
Der in Kubeflow integrierte file_output
Mechanismus überträgt die angegebene lokale Datei zwischen den Pipelineschritten automatisch in MinIO. Um file_output
zu verwenden, schreibst du deine Dateien lokal in deinen Container und gibst den Parameter in deinem ContainerOp
an, wie in Beispiel 4-5 gezeigt.
Beispiel 4-5. Beispiel für eine Dateiausgabe
fetch
=
kfp
.
dsl
.
ContainerOp
(
name
=
'download'
,
image
=
'busybox'
,
command
=
[
'sh'
,
'-c'
],
arguments
=
[
'sleep 1;'
'mkdir -p /tmp/data;'
'wget '
+
data_url
+
' -O /tmp/data/results.csv'
],
file_outputs
=
{
'downloaded'
:
'/tmp/data'
})
# This expects a directory of inputs not just a single file
Wenn du MinIO nicht verwenden möchtest, kannst du auch direkt die Objektspeicherung deines Anbieters nutzen, aber das kann die Portabilität beeinträchtigen.
Die Möglichkeit, Daten lokal zu montieren, ist eine wichtige Aufgabe in jeder Pipeline für maschinelles Lernen. Hier haben wir mehrere Methoden kurz beschrieben und Beispiele für jede Methode gegeben.
Einführung in die Kubeflow Pipelines Komponenten
Kubeflow Pipelines baut auf Argo Workflows auf, eine quelloffene, container-native Workflow-Engine für Kubernetes. In diesem Abschnitt beschreiben wir, wie Argo funktioniert, was es leistet und wie Kubeflow Pipelines Argo ergänzt, um es für Datenwissenschaftler/innen leichter nutzbar zu machen.
Argo: das Fundament der Pipelines
Kubeflow installiert alle Argo-Komponenten. Es ist zwar nicht notwendig, Argo auf deinem Computer zu installieren, um Kubeflow Pipelines zu nutzen, aber das Argo-Kommandozeilen-Tool macht es einfacher, deine Pipelines zu verstehen und zu debuggen.
Tipp
Kubeflow konfiguriert Argo standardmäßig so, dass es den Docker-Executor verwendet. Wenn deine Plattform die Docker-APIs nicht unterstützt, musst du deinen Executor auf einen kompatiblen umstellen. Dazu änderst du den Wert containerRuntimeExecutor
in der Argo-Parameterdatei. In Anhang A findest du weitere Informationen zu den Kompromissen. Die meisten Beispiele in diesem Buch verwenden den Docker-Executor, können aber auch an andere Executors angepasst werden.
Unter macOS kannst du Argo mit Homebrew installieren, wie in Beispiel 4-6 gezeigt.5
Beispiel 4-6. Argo-Installation
#!/bin/bash
# Download the binary
curl -sLO https://github.com/argoproj/argo/releases/download/v2.8.1/argo-linux-amd64# Make binary executable
chmod +x argo-linux-amd64# Move binary to path
mv ./argo-linux-amd64 ~/bin/argo
Du kannst deine Argo-Installation überprüfen, indem du die Argo-Beispiele mit dem Kommandozeilen-Tool im Kubeflow-Namensraum ausführst: Befolge diese Argo-Anleitung. Wenn du die Argo-Beispiele ausführst, sind die Pipelines mit dem Befehl argo
sichtbar, wie in Beispiel 4-7.
Beispiel 4-7. Argo-Ausführungen auflisten
$
argo list -n kubeflow
NAME STATUS AGE DURATION
loops-maps-4mxp5 Succeeded 30m 12s
hello-world-wsxbr Succeeded 39m 15s
Da Pipelines mit Argo implementiert sind, kannst du die gleiche Technik auch für die Überprüfung von Pipelines verwenden. Du kannstauch Informationen über die Ausführung eines bestimmten Workflows erhalten, wie in Beispiel 4-8 gezeigt.
Beispiel 4-8. Argo-Ausführungsdetails erhalten
$
argo
get
hello-world-wsxbr
-n
kubeflow
Name:
hello-world-wsxbr
Namespace:
kubeflow
ServiceAccount:
default
Status:
Succeeded
Created:
Tue
Feb
12
10:05:04
-0600
(
2
minutes
ago
)
Started:
Tue
Feb
12
10:05:04
-0600
(
2
minutes
ago
)
Finished:
Tue
Feb
12
10:05:23
-0600
(
1
minute
ago
)
Duration:
19
seconds
STEP
PODNAME
DURATION
MESSAGE
✔
hello-world-wsxbr
hello-world-wsxbr
18s
Wir können die Ausführungsprotokolle auch mit dem Befehl in Beispiel 4-9 anzeigen.
Beispiel 4-9. Abrufen des Protokolls der Argo-Ausführung
$
argo logs hello-world-wsxbr -n kubeflow
Dies ergibt das in Beispiel 4-10 gezeigte Ergebnis.
Beispiel 4-10. Argo-Ausführungsprotokoll
< hello world > -------------\
\
\
## .
## ## ## ==
## ## ## ## ===
/""""""""""""""""
___/===
~~~{
~~ ~~~~ ~~~ ~~~~ ~~ ~ /===
- ~~~\_
_____ o __/\
\
__/\_
___\_
_____/
Du kannst auch einen bestimmten Workflow löschen; siehe Beispiel 4-11.
Beispiel 4-11. Löschen der Argo-Ausführung
$
argo delete hello-world-wsxbr -n kubeflow
Alternativ kannst du die Informationen zur Pipeline-Ausführung auch über die Argo UI abrufen, wie in Abbildung 4-5 zu sehen ist.
Du kannst dir auch die Details des Ablaufdiagramms ansehen, indem du auf einen bestimmten Workflow klickst, wie in Abbildung 4-6 zu sehen ist.
Für jede Kubeflow-Pipeline, die du ausführst, kannst du diese Pipeline auch in der Argo CLI/UI anzeigen. Da ML-Pipelines das Argo CRD verwenden, kannst du das Ergebnis der Pipeline-Ausführung auch in der Argo-Benutzeroberfläche sehen (wie in Abbildung 4-7).
Tipp
Derzeit sucht die Kubeflow-Gemeinschaft aktiv nach alternativen Basistechnologien für die Ausführung von Kubeflow-Pipelines, eine davon ist Tekton. Das Papier " Kubeflow Pipelines with Tekton" von A. Singh et al. enthält "erste Entwürfe, Spezifikationen und Code für die Ausführung von Kubeflow-Pipelines auf Tekton". Die Grundidee dabei ist, ein Zwischenformat zu erstellen, das von Pipelines erzeugt und dann mit Argo, Tekton oder anderen Laufzeiten ausgeführt werden kann. Den ersten Code für diese Implementierung findest du in diesem Kubeflow GitHub Repo.
Was Kubeflow Pipelines dem Argo Workflow hinzufügt
Argo ist die Grundlage für die Ausführung von Workflows, aber wenn du es direkt verwendest, musst du einige umständliche Dinge tun. Erstens musst du deinen Arbeitsablauf in YAML definieren, was schwierig sein kann. Zweitens musst du deinen Code containerisieren, was mühsam sein kann. Der Hauptvorteil von KF Pipelines besteht darin, dass du Python-APIs für die Definition/Erstellung von Pipelines verwenden kannst. Dadurch wird ein Großteil des YAML-Boilerplates für Workflow-Definitionen automatisiert, was für Datenwissenschaftler/innen und Python-Entwickler/innen äußerst hilfreich ist. Kubeflow Pipelines hat auch Hooks, die Bausteine für Machine Learning-spezifischeKomponenten hinzufügen. Diese APIs generieren nicht nur die YAML, sondern können auch die Erstellung von Containern und die Nutzung von Ressourcen vereinfachen. Zusätzlich zu den APIs bietet Kubeflow ein wiederkehrendes Zeitplannungsprogramm und eine Benutzeroberfläche für die Konfiguration und Ausführung.
Aufbau einer Pipeline unter Verwendung vorhandener Bilder
Die Erstellung von Pipeline-Stufen direkt aus Python bietet einen einfachen Einstieg. Allerdings ist unsere Implementierungdadurch auf Python beschränkt. Eine weitere Funktion von Kubeflow Pipelines ist die Möglichkeit, die Ausführung einer mehrsprachigen Implementierung mithilfe von vorgefertigten Docker-Images zu orchestrieren (siehe Kapitel 9).
Zusätzlich zu unseren vorherigen Importen wollen wir auch den Kubernetes-Client importieren, mit dem wir Kubernetes-Funktionen direkt aus dem Python-Code verwenden können (siehe Beispiel 4-12).
Beispiel 4-12. Kubernetes-Client exportieren
from
kubernetes
import
client
as
k8s_client
Auch hier erstellen wir einen Client und ein Experiment, um unsere Pipeline auszuführen. Wie bereits erwähnt, gruppieren Experimente die Läufe von Pipelines. Du kannst ein bestimmtes Experiment nur einmal erstellen. Beispiel 4-13 zeigt also, wie du entweder ein neues Experiment erstellst oder ein bestehendes verwendest.
Beispiel 4-13. Pipeline-Experiment abrufen
client
=
kfp
.
Client
()
exp
=
client
.
get_experiment
(
experiment_name
=
'mdupdate'
)
Jetzt erstellen wir unsere Pipeline(Beispiel 4-14). Die verwendeten Images müssen zugänglich sein, und wir geben die vollständigen Namen an, damit sie aufgelöst werden können. Da diese Container vorgefertigt sind, müssen wir sie für unsere Pipeline konfigurieren.
Die vorgefertigten Container, die wir verwenden, haben ihre Speicherung über die MINIO_*
Umgebungsvariablen konfiguriert. Wir konfigurieren sie also so, dass sie unsere lokale MinIO-Installation verwenden, indem wir add_env_variable
aufrufen.
Zusätzlich zu den automatischen Abhängigkeiten, die bei der Übergabe von Parametern zwischen Stufen entstehen, kannst du mit after
auch angeben, dass eine Stufe eine vorherige Stufe benötigt. Dies ist vor allem dann sinnvoll, wenn es einen externen Nebeneffekt gibt, z. B. die Aktualisierung einer Datenbank.
Beispiel 4-14. Beispiel einer Empfehlungs-Pipeline
@dsl.pipeline
(
name
=
'Recommender model update'
,
description
=
'Demonstrate usage of pipelines for multi-step model update'
)
def
recommender_pipeline
():
# Load new data
data
=
dsl
.
ContainerOp
(
name
=
'updatedata'
,
image
=
'lightbend/recommender-data-update-publisher:0.2'
)
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_URL'
,
value
=
'http://minio-service.kubeflow.svc.cluster.local:9000'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_KEY'
,
value
=
'minio'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_SECRET'
,
value
=
'minio123'
))
# Train the model
train
=
dsl
.
ContainerOp
(
name
=
'trainmodel'
,
image
=
'lightbend/ml-tf-recommender:0.2'
)
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_URL'
,
value
=
'minio-service.kubeflow.svc.cluster.local:9000'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_KEY'
,
value
=
'minio'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_SECRET'
,
value
=
'minio123'
))
train
.
after
(
data
)
# Publish new model
publish
=
dsl
.
ContainerOp
(
name
=
'publishmodel'
,
image
=
'lightbend/recommender-model-publisher:0.2'
)
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_URL'
,
value
=
'http://minio-service.kubeflow.svc.cluster.local:9000'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_KEY'
,
value
=
'minio'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_SECRET'
,
value
=
'minio123'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'KAFKA_BROKERS'
,
value
=
'cloudflow-kafka-brokers.cloudflow.svc.cluster.local:9092'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'DEFAULT_RECOMMENDER_URL'
,
value
=
'http://recommendermodelserver.kubeflow.svc.cluster.local:8501'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'ALTERNATIVE_RECOMMENDER_URL'
,
value
=
'http://recommendermodelserver1.kubeflow.svc.cluster.local:8501'
))
publish
.
after
(
train
)
Da die Pipeline-Definition nur aus Code besteht, kannst du sie kompakter gestalten, indem du eine Schleife verwendest, um die MinIO-Parameter zu setzen, anstatt dies auf jeder Stufe zu tun.
Wie zuvor müssen wir die Pipeline kompilieren, entweder explizit mit compiler.Compiler().compile
oder implizit mitcreate_run_from_pipeline_func
. Nun kannst du die Pipeline ausführen (siehe Abbildung 4-8).
Kubeflow Pipeline Komponenten
Zusätzlich zu den Container-Operationen, die wir gerade besprochen haben, bietet Kubeflow Pipelines weitere Operationen mit Komponenten an. Komponenten stellen verschiedene Kubernetes-Ressourcen oder externe Operationen (wie dataproc
) zur Verfügung. Kubeflow-Komponenten ermöglichen es Entwicklern, Tools für maschinelles Lernen zu verpacken und dabei von den Besonderheiten der verwendeten Container oder CRDs zu abstrahieren.
Wir haben die Bausteine von Kubeflow ziemlich direkt verwendet, und wir haben die Komponente func_to_container
verwendet.6
Einige Komponenten, wie func_to_container
, sind als Python-Code verfügbar und können ganz normal importiert werden. Andere Komponenten werden über das component.yaml
System von Kubeflow spezifiziert und müssen geladen werden. Unserer Meinung nach ist der beste Weg, mit Kubeflow-Komponenten zu arbeiten, ein bestimmtes Tag des Repos herunterzuladen, das uns die Verwendung von load_component_from_file
ermöglicht, wie in Beispiel 4-15 gezeigt.
Beispiel 4-15. Pipeline-Freigabe
wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz tar -xvf 0.2.5.tar.gz
Warnung
Es gibt eine load_component
Funktion, die den Namen einer Komponente nimmt und versucht, ihn aufzulösen. Wir raten davon ab, diese Funktion zu verwenden, da sie standardmäßig einen Suchpfad verwendet, der den Master-Zweig der Pipelines-Bibliothek von Github abruft, der nicht stabil ist.
Im nächsten Kapitel werden wir uns eingehend mit Komponenten zur Datenaufbereitung befassen. Als Beispiel wollen wir uns jedoch kurz eine Komponente zum Abrufen von Dateien anschauen: . In unserem Empfehlungsbeispiel weiter oben im Kapitel haben wir einen speziellen vorgefertigten Container verwendet, um unsere Daten zu holen, da sie sich nicht in einem persistenten Volume befanden. Stattdessen können wir die Kubeflow GCS-Komponente google-cloud/storage/download/
verwenden, um unsere Daten herunterzuladen. Angenommen, du hast die Pipeline-Version wie in Beispiel 4-15 heruntergeladen, kannst du die Komponente mit load_component_from_file
wie in Beispiel 4-16 laden.
Beispiel 4-16. GCS Download Komponente laden
gcs_download_component
=
kfp
.
components
.
load_component_from_file
(
"pipelines-0.2.5/components/google-cloud/storage/download/component.yaml"
)
Wenn eine Komponente geladen wird, gibt sie eine Funktion zurück, die bei ihrem Aufruf eine Stufe der Pipeline erzeugt. Die meisten Komponenten benötigen Parameter, um ihr Verhalten zu konfigurieren. Du kannst eine Liste der Komponentenoptionen erhalten, indem du help
für die geladene Komponente aufrufst oder dir die component.yaml ansiehst. Für die GCS-Download-Komponente müssen wir mit gcs_path
konfigurieren, was wir herunterladen, wie in Beispiel 4-17 gezeigt.
Beispiel 4-17. Laden einer Komponente der Pipeline Speicherung aus einem relativen Pfad und einem Weblink
dl_op
=
gcs_download_component
(
gcs_path
=
"gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv"
)
# Your path goes here
In Kapitel 5 erkunden wir weitere gängige Kubeflow-Pipeline-Komponenten für die Daten- und Feature-Vorbereitung.
Fortgeschrittene Themen in Pipelines
Alle Beispiele, die wir bisher gezeigt haben, sind rein sequenziell. Es gibt auch Fälle, in denen wir die Möglichkeit brauchen, Bedingungen zu prüfen und das Verhalten der Pipeline entsprechend zu ändern.
Bedingte Ausführung von Pipeline-Stufen
Kubeflow Pipelines ermöglicht bedingte Ausführungen über dsl.Condition
. Schauen wir uns ein sehr einfaches Beispiel an, bei dem abhängig vom Wert einer Variablen verschiedene Berechnungen ausgeführt werden.
Es folgt ein einfaches Notizbuch, das dieses Beispiel umsetzt. Es beginnt mit den dafür notwendigen Importen in Beispiel 4-18.
Beispiel 4-18. Erforderliche Komponenten importieren
import
kfp
from
kfp
import
dsl
from
kfp.components
import
func_to_container_op
,
InputPath
,
OutputPath
Sobald die Importe vorhanden sind, können wir einige einfache Funktionen implementieren, wie in Beispiel 4-19 gezeigt.
Beispiel 4-19. Implementierung von Funktionen
@func_to_container_op
def
get_random_int_op
(
minimum
:
int
,
maximum
:
int
)
->
int
:
"""Generate a random number between minimum and maximum (inclusive)."""
import
random
result
=
random
.
randint
(
minimum
,
maximum
)
(
result
)
return
result
@func_to_container_op
def
process_small_op
(
data
:
int
):
"""Process small numbers."""
(
"Processing small result"
,
data
)
return
@func_to_container_op
def
process_medium_op
(
data
:
int
):
"""Process medium numbers."""
(
"Processing medium result"
,
data
)
return
@func_to_container_op
def
process_large_op
(
data
:
int
):
"""Process large numbers."""
(
"Processing large result"
,
data
)
return
Wir implementieren alle Funktionen direkt mit Python (wie im vorherigen Beispiel). Die erste Funktion erzeugt eine ganze Zahl zwischen 0 und 100, und die nächsten drei bilden ein einfaches Gerüst für die eigentliche Verarbeitung. Die Pipeline wird wie in Beispiel 4-20 implementiert.
Beispiel 4-20. Pipeline-Implementierung
@dsl.pipeline
(
name
=
'
Conditional execution pipeline
'
,
description
=
'
Shows how to use dsl.Condition().
'
)
def
conditional_pipeline
(
)
:
number
=
get_random_int_op
(
0
,
100
)
.
output
with
dsl
.
Condition
(
number
<
10
)
:
process_small_op
(
number
)
with
dsl
.
Condition
(
number
>
10
and
number
<
50
)
:
process_medium_op
(
number
)
with
dsl
.
Condition
(
number
>
50
)
:
process_large_op
(
number
)
kfp
.
Client
(
)
.
create_run_from_pipeline_func
(
conditional_pipeline
,
arguments
=
{
}
)
Schließlich der Ausführungsgraph, wie in Abbildung 4-9 dargestellt.
Anhand dieses Diagramms können wir sehen, dass sich die Pipeline tatsächlich in drei Zweige aufspaltet und in diesem Lauf die Prozess-Großauftragsausführung gewählt wird. Um zu überprüfen, ob dies korrekt ist, sehen wir uns das Ausführungsprotokoll an, das in Abbildung 4-10 dargestellt ist.
Hier können wir sehen, dass die erzeugte Zahl 67 ist. Diese Zahl ist größer als 50, was bedeutet, dass der process_large_op-Zweig auf ausgeführt werden sollte.7
Pipelines nach Zeitplan ausführen
Wir haben unsere Pipeline manuell ausgeführt. Das ist gut für Tests, aber für Produktionsumgebungen oft unzureichend. Zum Glück kannst du Pipelines nach einem Zeitplan ausführen, wie auf dieserKubeflow-Dokumentationsseite beschrieben. Zuerst musst du eine Pipeline-Definition hochladen und eine Beschreibung angeben. Danach kannst du einen periodischen Lauf erstellen, indem du einen Lauf erstellst, den Lauftyp "Wiederkehrend" auswählst und dann den Anweisungen auf dem Bildschirm folgst (siehe Abbildung 4-11).
In dieser Abbildung stellen wir eine Pipeline so ein, dass sie jeden Tag läuft.
Warnung
Wenn wir einen periodischen Lauf erstellen, legen wir fest, wie oft eine Pipeline ausgeführt werden soll, nicht wann sie ausgeführt werden soll. In der aktuellen Implementierung wird der Zeitpunkt der Ausführung dadurch bestimmt, wann der Lauf erstellt wird. Sobald er erstellt ist, wird er sofort und dann mit der festgelegten Häufigkeit ausgeführt. Wenn zum Beispiel ein täglicher Lauf um 10 Uhr erstellt wird, wird er täglich um 10 Uhr ausgeführt.
Die Einstellung der regelmäßigen Ausführung von Pipelines ist eine wichtige Funktion, mit der du die Ausführung von Pipelines vollständig automatisieren kannst.
Fazit
Du solltest jetzt die Grundlagen kennen, um einfache Pipelines zu erstellen, zu planen und auszuführen. Du hast auch gelernt, welche Werkzeuge Pipelines verwenden, wenn du Fehler beheben musst. Wir haben gezeigt, wie man bestehende Software in Pipelines integriert, wie man eine bedingte Ausführung innerhalb einer Pipeline implementiert und wie man Pipelines nach einem Zeitplan ausführt.
In unserem nächsten Kapitel schauen wir uns an, wie Pipelines für die Datenaufbereitung genutzt werden können, und zeigen einige Beispiele.
1 Dies kann oft automatisch abgeleitet werden, wenn das Ergebnis einer Pipelinestufe als Eingabe an andere übergeben wird. Du kannst zusätzliche Abhängigkeiten auch manuell angeben.
2 Kubernetes persistente Volumes können verschiedene Zugriffsmodi bieten.
3 Die generische Read-Write-Many-Implementierung ist ein NFS-Server.
4 Die Nutzung der Cloud Native Access Speicherung kann praktisch sein, wenn du die Portabilität deiner Lösung über mehrere Cloud-Provider hinweg sicherstellen musst.
5 Für die Installation von Argo Workflow auf einem anderen Betriebssystem, siehe diese Argo-Anleitung.
6 Viele der Standardkomponenten findest du in diesem Kubeflow GitHub Repo.
7 Ein etwas komplexeres Beispiel für bedingte Verarbeitung (mit verschachtelten Bedingungen) findest du auf dieser GitHub-Seite.
Get Kubeflow für maschinelles Lernen now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.