Kapitel 4. Kubeflow-Pipelines

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Im vorherigen Kapitel haben wir Kubeflow Pipelines beschrieben,die Komponente von Kubeflow, die Machine-Learning-Anwendungen orchestriert. Die Orchestrierung ist notwendig, weil eine typische Machine-Learning-Implementierung eine Kombination von Tools verwendet, um Daten vorzubereiten, das Modell zu trainieren, die Leistung zu bewerten und es einzusetzen. Durch die Formalisierung der Schritte und ihrer Abfolge im Code ermöglichen es Pipelines den Nutzern, alle Schritte der Datenverarbeitung formal zu erfassen und ihre Reproduzierbarkeit und Überprüfbarkeit sowie die Trainings- und Bereitstellungsschritte zu gewährleisten.

Zu Beginn dieses Kapitels werfen wir einen Blick auf die Pipelines-Benutzeroberfläche und zeigen dir, wie du einfache Pipelines in Python schreiben kannst. Wir werden uns ansehen, wie Daten zwischen den einzelnen Phasen übertragen werden können, und uns dann damit beschäftigen, wie man bestehende Anwendungen als Teil einer Pipeline nutzen kann. Außerdem schauen wir uns die zugrunde liegende Workflow-Engine Argo Workflows an, ein Standard-Kubernetes-Pipeline-Tool, das Kubeflow zur Ausführung von Pipelines verwendet. Wenn du die Grundlagen von Argo Workflows verstehst, kannst du ein tieferes Verständnis für Kubeflow-Pipelines entwickeln und die Fehlersuche erleichtern. Anschließend zeigen wir, was Kubeflow Pipelines zu Argo beiträgt.

Zum Abschluss von Kubeflow Pipelines zeigen wir dir, wie du eine bedingte Ausführung in Pipelines implementierst und wie du Pipelines zeitgesteuert ausführst. Aufgabenspezifische Komponenten von Pipelines werden in den jeweiligen Kapiteln behandelt.

Erste Schritte mit Pipelines

Die Kubeflow Pipelines Plattform besteht aus:

  • Eine Benutzeroberfläche für die Verwaltung und Verfolgung von Pipelines und deren Ausführung

  • Ein Motor für die Planung der Ausführung einer Pipeline

  • Ein SDK zum Definieren, Erstellen und Bereitstellen von Pipelines in Python

  • Notebook-Unterstützung für die Verwendung des SDK und die Pipeline-Ausführung

Der einfachste Weg, sich mit Pipelines vertraut zu machen, ist ein Blick auf vorgefertigte Beispiele.

Erkundung der vorgefertigten Probenpipelines

Um das Verständnis von Pipelines zu erleichtern, wird Kubeflow mit einigen Beispielpipelines installiert. Du findest diese vorgefertigten in der Pipeline Web UI, wie in Abbildung 4-1 zu sehen ist. Beachte, dass zum Zeitpunkt der Erstellung dieses Artikels nur die Pipelines Basic to Conditional execution generisch sind, während der Rest nur auf der Google Kubernetes Engine (GKE) läuft. Wenn du versuchst, sie in anderen Umgebungen als GKE auszuführen, werden sie fehlschlagen.

Kubeflow Pipelines UI - Prepackaged Pipelines
Abbildung 4-1. Kubeflow-Pipelines UI: vorgefertigte Pipelines

Wenn du auf eine bestimmte Pipeline klickst, wird ihr Ausführungsdiagramm oder ihre Quelle angezeigt, wie in Abbildung 4-2 dargestellt.

Kubeflow Pipelines UI - Pipeline Graph View
Abbildung 4-2. Kubeflow-Pipelines UI: Ansicht des Pipeline-Graphen

Wenn du auf die Registerkarte "Quelle" klickst, wird der kompilierte Code der Pipeline angezeigt,, der eine Argo YAML-Datei ist (mehr dazu in "Argo: die Grundlage der Pipelines").

In diesem Bereich kannst du gerne mit laufenden Pipelines experimentieren, um ein besseres Gefühl für ihre Ausführung und die Möglichkeiten der Pipelines-Benutzeroberfläche zu bekommen.

Um eine bestimmte Pipeline aufzurufen, klickst du sie einfach an. Daraufhin wird die Pipeline-Ansicht wie in Abbildung 4-3 dargestellt angezeigt.

Kubeflow Pipelines UI - Pipeline View
Abbildung 4-3. Kubeflow Pipelines UI: Pipeline-Ansicht

Um die Pipeline zu starten, klicke auf die Schaltfläche "Lauf erstellen" und folge den Anweisungen auf dem Bildschirm.

Tipp

Wenn du eine Pipeline ausführst, musst du ein Experiment auswählen. Experiment ist hier nur eine praktische Gruppierung für Pipeline-Ausführungen (Läufe). Du kannst immer das "Standard"-Experiment verwenden, das bei der Installation von Kubeflow erstellt wird. Wähle außerdem "Einmalig" als Lauftyp, um die Pipeline einmalig auszuführen. Über die wiederkehrende Ausführung sprechen wir in "Pipelines nach Zeitplan ausführen".

Aufbau einer einfachen Pipeline in Python

Wir haben gesehen, wie vorkompilierte Kubeflow-Pipelines ausgeführt werden können. Jetzt wollen wir untersuchen, wie wir unsere eigenen neuen Pipelines erstellen können. Kubeflow-Pipelines werden als YAML-Dateien gespeichert, die von einem Programm namens Argo ausgeführt werden (siehe "Argo: die Grundlage der Pipelines"). Glücklicherweise stellt Kubeflow eine domänenspezifische Sprache (DSL) in Python zur Verfügung, um Pipelines zu erstellen. Die DSL ist eine Python-Darstellung der Operationen, die im ML-Workflow ausgeführt werden, und wurde speziell für ML-Workloads entwickelt. Die DSL ermöglicht auch die Verwendung einiger einfacher Python-Funktionen als Pipeline-Stufen, ohne dass du explizit einen Container erstellen musst.

Tipp

Die Beispiele aus Kapitel 4 findest du in den Notebooks im GitHub-Repository dieses Buches.

Eine Pipeline ist im Wesentlichen ein Graph der Containerausführung. Unter kannst du nicht nur angeben, welche Container in welcher Reihenfolge ausgeführt werden sollen, sondern auch Argumente an die gesamte Pipeline und zwischen den beteiligten Containern übergeben.

Für jeden Container (bei Verwendung des Python SDK) müssen wir:

  • Erstelle den Container - entweder als einfache Python-Funktion oder mit einem beliebigen Docker-Container (mehr dazu in Kapitel 9).

  • Erstelle eine Operation, die auf diesen Container verweist, sowie die Befehlszeilenargumente, Datenhalterungen und Variablen, um den Container zu übergeben.

  • Lege die Reihenfolge der Vorgänge fest, indem du definierst, welche Vorgänge parallel ablaufen können und welche abgeschlossen sein müssen, bevor du zu einem weiteren Schritt übergehst.1

  • Kompiliere diese in Python definierte Pipeline in eine YAML-Datei, die von Kubeflow Pipelines verarbeitet werden kann.

Pipelines sind ein zentrales Merkmal von Kubeflow und du wirst sie im Laufe des Buches immer wieder sehen. In diesem Kapitel werden wir möglichst einfache Beispiele zeigen, um die Grundprinzipien von Pipelines zu veranschaulichen. Es wird sich nicht wie "maschinelles Lernen" anfühlen und das ist auch so gewollt.

Für unsere erste Kubeflow-Operation werden wir eine Technik verwenden, die als leichtgewichtige Python-Funktionen bekannt ist. Wir sollten uns jedoch nicht von dem Wort " leichtgewichtig" täuschen lassen. Bei einer leichtgewichtigen Python-Funktion definieren wir eine Python-Funktion und überlassen es dann Kubeflow, diese Funktion in einen Container zu packen und eine Operation zu erstellen.

Der Einfachheit halber erklären wir die einfachste aller Funktionen als Echo. Das ist eine Funktion, die eine einzige Eingabe, eine ganze Zahl, benötigt und diese zurückgibt.

Beginnen wir damit, kfp zu importieren und unsere Funktion zu definieren:

import kfp
def simple_echo(i: int) -> int:
    return i
Warnung

Beachte, dass wir snake_case, nicht camelCase, für unsere Funktionsnamen verwenden. Zum Zeitpunkt der Erstellung dieses Artikels gibt es einen Fehler (Feature?), der dazu führt, dass Namen in Großbuchstaben (z. B. unsere Funktion simpleEcho) zu Fehlern führen.

Als Nächstes wollen wir unsere Funktion simple_echo in eine Kubeflow-Pipeline-Operation verpacken. Dafür gibt es eine nette kleine Methode: kfp.components.func_to_container_op. Diese Methode gibt eine Fabrikfunktion mit einer stark typisierten Signatur zurück:

simpleStronglyTypedFunction =
  kfp.components.func_to_container_op(deadSimpleIntEchoFn)

Wenn wir im nächsten Schritt eine Pipeline erstellen, wird die Fabrikfunktion eine ContainerOp konstruieren, die die ursprüngliche Funktion (echo_fn) in einem Container ausführt:

foo = simpleStronglyTypedFunction(1)
type(foo)
Out[5]: kfp.dsl._container_op.ContainerOp
Tipp

Wenn dein Code von einer GPU beschleunigt werden kann, ist es einfach, eine Stufe als GPU-Ressourcen verwendend zu markieren ; füge einfach .set_gpu_limit(NUM_GPUS) zu deinem ContainerOp hinzu.

Nun ordnen wir die ContainerOp(s) (es gibt nur eine) in eine Pipeline ein. Diese Pipeline benötigt einen Parameter (die Nummer, die wir als Echo ausgeben). Die Pipeline hat auch ein paar Metadaten, die mit ihr verbunden sind. Während das Echo von Zahlen eine triviale Verwendung von Parametern sein mag, würdest du in realen Anwendungsfällen Variablen einbeziehen, die du später anpassen möchtest, wie z. B. Hyperparameter für Algorithmen des maschinellen Lernens.

Schließlich kompilieren wir unsere Pipeline in eine gezippte YAML-Datei, , die wir dann in die Pipelines UI hochladen können.

@kfp.dsl.pipeline(
  name='Simple Echo',
  description='This is an echo pipeline. It echoes numbers.'
)
def echo_pipeline(param_1: kfp.dsl.PipelineParam):
  my_step = simpleStronglyTypedFunction(i= param_1)

kfp.compiler.Compiler().compile(echo_pipeline,
  'echo-pipeline.zip')
Tipp

Es ist auch möglich, die Pipeline direkt im Notebook auszuführen, was wir im nächsten Beispiel tun werden.

Eine Pipeline mit nur einer Komponente ist nicht sehr interessant. In unserem nächsten Beispiel werden wir die Container für unsere leichtgewichtigen Python-Funktionen anpassen. Wir erstellen eine neue Pipeline, die zusätzliche Python-Bibliotheken installiert und importiert, von einem bestimmten Basisbild baut und die Ausgabe zwischen denContainern weiterleitet.

Wir werden eine Pipeline erstellen, die eine Zahl durch eine andere Zahl dividiert und dann eine dritte Zahl addiert. Zuerst erstellen wir unsere einfache Funktion add, wie in Beispiel 4-1 gezeigt.

Beispiel 4-1. Eine einfache Python-Funktion
def add(a: float, b: float) -> float:
   '''Calculates sum of two arguments'''
   return a + b

add_op = comp.func_to_container_op(add)

Als Nächstes wollen wir eine etwas komplexere Funktion erstellen. Außerdem soll diese Funktion eine nicht standardisierte Python-Bibliothek, numpy, benötigen und importieren. Dies muss innerhalb der Funktion geschehen. Der Grund dafür ist, dass globale Importe aus dem Notebook nicht in die von uns erstellten Container verpackt werden. Natürlich ist es auch wichtig, dass unser Container die Bibliotheken, die wir importieren, installiert hat.

Dazu übergeben wir den Container, den wir als Basisbild verwenden wollen, an .func_to_container(, wie in Beispiel 4-2.

Beispiel 4-2. Eine weniger einfache Python-Funktion
from typing import NamedTuple
def my_divmod(dividend: float, divisor:float) -> \
       NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float)]):
    '''Divides two numbers and calculate  the quotient and remainder'''
    #Imports inside a component function:
    import numpy as np 1

    #This function demonstrates how to use nested functions inside a
    # component function:
    def divmod_helper(dividend, divisor): 2
	return np.divmod(dividend, divisor)

    (quotient, remainder) = divmod_helper(dividend, divisor)

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder'])
    return divmod_output(quotient, remainder)

divmod_op = comp.func_to_container_op(
                my_divmod, base_image='tensorflow/tensorflow:1.14.0-py3') 3
1

Importieren von Bibliotheken innerhalb der Funktion.

2

Verschachtelte Funktionen innerhalb von leichtgewichtigen Python-Funktionen sind ebenfalls OK.

3

Aufruf für einen bestimmten Basiscontainer.

Jetzt werden wir eine Pipeline erstellen. Die Pipeline in Beispiel 4-3 verwendet die zuvor definierten Funktionen my_divmod und add als Stufen.

Beispiel 4-3. Eine einfache Pipeline
@dsl.pipeline(
   name='Calculation pipeline',
   description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
   a='a',
   b='7',
   c='17',
):
    #Passing pipeline parameter and a constant value as operation arguments
    add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance.

    #Passing a task output reference as operation arguments
    #For an operation with a single return value, the output
    # reference can be accessed using `task.output`
    # or `task.outputs['output_name']` syntax
    divmod_task = divmod_op(add_task.output, b) 1

    #For an operation with multiple return values, the output references
    # can be accessed using `task.outputs['output_name']` syntax
    result_task = add_op(divmod_task.outputs['quotient'], c) 1
1

Werte, die zwischen Containern weitergegeben werden. Die Reihenfolge der Vorgänge wird hieraus abgeleitet.

Schließlich verwenden wir den Client, um die Pipeline zur Ausführung zu übermitteln, die die Links zu Ausführung und Experiment zurückgibt. Experimente fassen die Ausführungen zusammen. Du kannst auch kfp.compiler.Compiler().compile verwenden und die Zip-Datei wie im ersten Beispiel hochladen, wenn du möchtest:

client = kfp.Client()
#Specify pipeline argument values
# arguments = {'a': '7', 'b': '8'} #whatever makes sense for new version
#Submit a pipeline run
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

Über den von create_run_from_pipeline_func zurückgegebenen Link gelangen wir zur Web-UI der Ausführung , die die Pipeline selbst und die Zwischenergebnisse anzeigt (siehe Abbildung 4-4).

Pipeline Execution
Abbildung 4-4. Pipeline-Ausführung

Wie wir gesehen haben, bezieht sich das Leichtgewicht in den leichtgewichtigen Python-Funktionen auf die Leichtigkeit, mit der wir diese Schritte in unserem Prozess durchführen, und nicht auf die Leistungsfähigkeit der Funktionen selbst. Wir können benutzerdefinierte Importe, Basisbilder und die Weitergabe von kleinen Ergebnissen zwischen Containern verwenden.

Im nächsten Abschnitt zeigen wir dir, wie du größere Datendateien zwischen Containern weitergeben kannst, indem du Volumes in die Container einhängst.

Speichern von Daten zwischen den Schritten

Im vorherigen Beispiel waren die zwischen Containern übertragenen Daten klein und von primitiven Typen (wie Zahlen, Strings, Listen und Arrays). In der Praxis werden wir jedoch wahrscheinlich viel größere Daten (z. B. ganze Datensätze) weitergeben. In Kubeflow gibt es dafür zwei primäre Methoden: persistente Volumes innerhalb des Kubernetes-Clusters und Cloud-Speicherung (z. B. S3), wobei jede Methode mit Problemen verbunden ist.

Persistente Volumes abstrahieren die Ebene der Speicherung. Je nach Anbieter können persistente Volumes bei der Bereitstellung langsam sein und IO-Limits haben. Überprüfe, ob dein Anbieter read-write-many Speicherklassen unterstützt, die den Speicherzugriff durch mehrere Pods ermöglichen, was für einige Arten der Parallelität erforderlich ist. Speicherklassen können eine der folgenden sein.2

ReadWriteOnce

Das Volume kann von einem einzelnen Knoten als Lese- und Schreibzugriff gemountet werden.

ReadOnlyMany

Das Volume kann von vielen Nodes schreibgeschützt gemountet werden.

ReadWriteMany

Das Volume kann von vielen Knoten als Lese- und Schreibzugriff gemountet werden.

Dein System-/Clusteradministrator kann vielleicht die Unterstützung für Read-Write-Many hinzufügen.3 Darüber hinaus bieten viele Cloud-Provider eigene Read-Write-Many-Implementierungen an, z. B. Dynamic Provisioning auf GKE. Frag aber unbedingt nach, ob es einen Engpass bei einem einzelnen Knoten gibt.

Mit VolumeOp von Kubeflow Pipelines kannst du ein automatisch verwaltetes persistentes Volume erstellen, wie in Beispiel 4-4 gezeigt. Um das Volume zu deiner Operation hinzuzufügen, rufst du einfach add_pvolumes mit einem Wörterbuch von Einhängepunkten zu Volumes auf, z.B. download_data_op(year).add_pvolumes({"/data_processing": dvop.volume}).

Beispiel 4-4. Mailinglisten-Daten vorbereiten
dvop = dsl.VolumeOp(name="create_pvc",
                    resource_name="my-pvc-2",
                    size="5Gi",
                    modes=dsl.VOLUME_MODE_RWO)

In den Kubeflow-Beispielen ist die Verwendung einer Objektspeicherlösung ( ) zwar weniger üblich, aber in manchen Fällen besser geeignet. MinIO bietet eine Cloud-native Speicherung von Objekten, indem es entweder als Gateway zu einer bestehenden Objektspeicher-Engine oder als eigenständige Lösung arbeitet.4 Wie du MinIO konfigurierst, haben wir bereits in Kapitel 3 beschrieben.

Der in Kubeflow integrierte file_output Mechanismus überträgt die angegebene lokale Datei zwischen den Pipelineschritten automatisch in MinIO. Um file_output zu verwenden, schreibst du deine Dateien lokal in deinen Container und gibst den Parameter in deinem ContainerOp an, wie in Beispiel 4-5 gezeigt.

Beispiel 4-5. Beispiel für eine Dateiausgabe
    fetch = kfp.dsl.ContainerOp(name='download',
                                image='busybox',
                                command=['sh', '-c'],
                                arguments=[
                                    'sleep 1;'
                                    'mkdir -p /tmp/data;'
                                    'wget ' + data_url +
                                    ' -O /tmp/data/results.csv'
                                ],
                                file_outputs={'downloaded': '/tmp/data'})
    # This expects a directory of inputs not just a single file

Wenn du MinIO nicht verwenden möchtest, kannst du auch direkt die Objektspeicherung deines Anbieters nutzen, aber das kann die Portabilität beeinträchtigen.

Die Möglichkeit, Daten lokal zu montieren, ist eine wichtige Aufgabe in jeder Pipeline für maschinelles Lernen. Hier haben wir mehrere Methoden kurz beschrieben und Beispiele für jede Methode gegeben.

Einführung in die Kubeflow Pipelines Komponenten

Kubeflow Pipelines baut auf Argo Workflows auf, eine quelloffene, container-native Workflow-Engine für Kubernetes. In diesem Abschnitt beschreiben wir, wie Argo funktioniert, was es leistet und wie Kubeflow Pipelines Argo ergänzt, um es für Datenwissenschaftler/innen leichter nutzbar zu machen.

Argo: das Fundament der Pipelines

Kubeflow installiert alle Argo-Komponenten. Es ist zwar nicht notwendig, Argo auf deinem Computer zu installieren, um Kubeflow Pipelines zu nutzen, aber das Argo-Kommandozeilen-Tool macht es einfacher, deine Pipelines zu verstehen und zu debuggen.

Tipp

Kubeflow konfiguriert Argo standardmäßig so, dass es den Docker-Executor verwendet. Wenn deine Plattform die Docker-APIs nicht unterstützt, musst du deinen Executor auf einen kompatiblen umstellen. Dazu änderst du den Wert containerRuntimeExecutor in der Argo-Parameterdatei. In Anhang A findest du weitere Informationen zu den Kompromissen. Die meisten Beispiele in diesem Buch verwenden den Docker-Executor, können aber auch an andere Executors angepasst werden.

Unter macOS kannst du Argo mit Homebrew installieren, wie in Beispiel 4-6 gezeigt.5

Beispiel 4-6. Argo-Installation
#!/bin/bash
# Download the binary
curl -sLO https://github.com/argoproj/argo/releases/download/v2.8.1/argo-linux-amd64

# Make binary executable
chmod +x argo-linux-amd64

# Move binary to path
mv ./argo-linux-amd64 ~/bin/argo

Du kannst deine Argo-Installation überprüfen, indem du die Argo-Beispiele mit dem Kommandozeilen-Tool im Kubeflow-Namensraum ausführst: Befolge diese Argo-Anleitung. Wenn du die Argo-Beispiele ausführst, sind die Pipelines mit dem Befehl argo sichtbar, wie in Beispiel 4-7.

Beispiel 4-7. Argo-Ausführungen auflisten
$ argo list -n kubeflow
NAME                STATUS      AGE   DURATION
loops-maps-4mxp5    Succeeded   30m   12s
hello-world-wsxbr   Succeeded   39m   15s

Da Pipelines mit Argo implementiert sind, kannst du die gleiche Technik auch für die Überprüfung von Pipelines verwenden. Du kannstauch Informationen über die Ausführung eines bestimmten Workflows erhalten, wie in Beispiel 4-8 gezeigt.

Beispiel 4-8. Argo-Ausführungsdetails erhalten
$ argo get hello-world-wsxbr -n kubeflow  1
Name:                hello-world-wsxbr
Namespace:           kubeflow
ServiceAccount:      default
Status:              Succeeded
Created:             Tue Feb 12 10:05:04 -0600 (2 minutes ago)
Started:             Tue Feb 12 10:05:04 -0600 (2 minutes ago)
Finished:            Tue Feb 12 10:05:23 -0600 (1 minute ago)
Duration:            19 seconds

STEP                  PODNAME            DURATION  MESSAGE
  hello-world-wsxbr  hello-world-wsxbr  18s
1

hello-world-wsxbr ist der Name, den wir mit argo list -n kubeflow oben erhalten haben. In deinem Fall wird der Name anders lauten.

Wir können die Ausführungsprotokolle auch mit dem Befehl in Beispiel 4-9 anzeigen.

Beispiel 4-9. Abrufen des Protokolls der Argo-Ausführung
$ argo logs hello-world-wsxbr -n kubeflow

Dies ergibt das in Beispiel 4-10 gezeigte Ergebnis.

Beispiel 4-10. Argo-Ausführungsprotokoll
< hello world >
 -------------
    \
     \
      \
		    ##        .
	      ## ## ##       ==
	   ## ## ## ##      ===
       /""""""""""""""""___/ ===
  ~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ /  ===- ~~~
       \______ o          __/
	\    \        __/
	  \____\______/

Du kannst auch einen bestimmten Workflow löschen; siehe Beispiel 4-11.

Beispiel 4-11. Löschen der Argo-Ausführung
$ argo delete hello-world-wsxbr -n kubeflow

Alternativ kannst du die Informationen zur Pipeline-Ausführung auch über die Argo UI abrufen, wie in Abbildung 4-5 zu sehen ist.

Argo UI for pipelines execution
Abbildung 4-5. Argo UI für die Pipeline-Ausführung

Du kannst dir auch die Details des Ablaufdiagramms ansehen, indem du auf einen bestimmten Workflow klickst, wie in Abbildung 4-6 zu sehen ist.

Argo UI - Execution Graph
Abbildung 4-6. Argo UI Ausführungsdiagramm

Für jede Kubeflow-Pipeline, die du ausführst, kannst du diese Pipeline auch in der Argo CLI/UI anzeigen. Da ML-Pipelines das Argo CRD verwenden, kannst du das Ergebnis der Pipeline-Ausführung auch in der Argo-Benutzeroberfläche sehen (wie in Abbildung 4-7).

Viewing Kubeflow Pipelines in Argo UI
Abbildung 4-7. Anzeigen von Kubeflow-Pipelines in der Argo UI
Tipp

Derzeit sucht die Kubeflow-Gemeinschaft aktiv nach alternativen Basistechnologien für die Ausführung von Kubeflow-Pipelines, eine davon ist Tekton. Das Papier " Kubeflow Pipelines with Tekton" von A. Singh et al. enthält "erste Entwürfe, Spezifikationen und Code für die Ausführung von Kubeflow-Pipelines auf Tekton". Die Grundidee dabei ist, ein Zwischenformat zu erstellen, das von Pipelines erzeugt und dann mit Argo, Tekton oder anderen Laufzeiten ausgeführt werden kann. Den ersten Code für diese Implementierung findest du in diesem Kubeflow GitHub Repo.

Was Kubeflow Pipelines dem Argo Workflow hinzufügt

Argo ist die Grundlage für die Ausführung von Workflows, aber wenn du es direkt verwendest, musst du einige umständliche Dinge tun. Erstens musst du deinen Arbeitsablauf in YAML definieren, was schwierig sein kann. Zweitens musst du deinen Code containerisieren, was mühsam sein kann. Der Hauptvorteil von KF Pipelines besteht darin, dass du Python-APIs für die Definition/Erstellung von Pipelines verwenden kannst. Dadurch wird ein Großteil des YAML-Boilerplates für Workflow-Definitionen automatisiert, was für Datenwissenschaftler/innen und Python-Entwickler/innen äußerst hilfreich ist. Kubeflow Pipelines hat auch Hooks, die Bausteine für Machine Learning-spezifischeKomponenten hinzufügen. Diese APIs generieren nicht nur die YAML, sondern können auch die Erstellung von Containern und die Nutzung von Ressourcen vereinfachen. Zusätzlich zu den APIs bietet Kubeflow ein wiederkehrendes Zeitplannungsprogramm und eine Benutzeroberfläche für die Konfiguration und Ausführung.

Aufbau einer Pipeline unter Verwendung vorhandener Bilder

Die Erstellung von Pipeline-Stufen direkt aus Python bietet einen einfachen Einstieg. Allerdings ist unsere Implementierungdadurch auf Python beschränkt. Eine weitere Funktion von Kubeflow Pipelines ist die Möglichkeit, die Ausführung einer mehrsprachigen Implementierung mithilfe von vorgefertigten Docker-Images zu orchestrieren (siehe Kapitel 9).

Zusätzlich zu unseren vorherigen Importen wollen wir auch den Kubernetes-Client importieren, mit dem wir Kubernetes-Funktionen direkt aus dem Python-Code verwenden können (siehe Beispiel 4-12).

Beispiel 4-12. Kubernetes-Client exportieren
from kubernetes import client as k8s_client

Auch hier erstellen wir einen Client und ein Experiment, um unsere Pipeline auszuführen. Wie bereits erwähnt, gruppieren Experimente die Läufe von Pipelines. Du kannst ein bestimmtes Experiment nur einmal erstellen. Beispiel 4-13 zeigt also, wie du entweder ein neues Experiment erstellst oder ein bestehendes verwendest.

Beispiel 4-13. Pipeline-Experiment abrufen
client = kfp.Client()
exp = client.get_experiment(experiment_name ='mdupdate')

Jetzt erstellen wir unsere Pipeline(Beispiel 4-14). Die verwendeten Images müssen zugänglich sein, und wir geben die vollständigen Namen an, damit sie aufgelöst werden können. Da diese Container vorgefertigt sind, müssen wir sie für unsere Pipeline konfigurieren.

Die vorgefertigten Container, die wir verwenden, haben ihre Speicherung über die MINIO_* Umgebungsvariablen konfiguriert. Wir konfigurieren sie also so, dass sie unsere lokale MinIO-Installation verwenden, indem wir add_env_variable aufrufen.

Zusätzlich zu den automatischen Abhängigkeiten, die bei der Übergabe von Parametern zwischen Stufen entstehen, kannst du mit after auch angeben, dass eine Stufe eine vorherige Stufe benötigt. Dies ist vor allem dann sinnvoll, wenn es einen externen Nebeneffekt gibt, z. B. die Aktualisierung einer Datenbank.

Beispiel 4-14. Beispiel einer Empfehlungs-Pipeline
@dsl.pipeline(
  name='Recommender model update',
  description='Demonstrate usage of pipelines for multi-step model update'
)
def recommender_pipeline():
    # Load new data
  data = dsl.ContainerOp(
      name='updatedata',
      image='lightbend/recommender-data-update-publisher:0.2') \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL',
        value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
    # Train the model
  train = dsl.ContainerOp(
      name='trainmodel',
      image='lightbend/ml-tf-recommender:0.2') \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL',
            value='minio-service.kubeflow.svc.cluster.local:9000')) \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
  train.after(data)
    # Publish new model
  publish = dsl.ContainerOp(
      name='publishmodel',
      image='lightbend/recommender-model-publisher:0.2') \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL',
            value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123')) \
    .add_env_variable(k8s_client.V1EnvVar(name='KAFKA_BROKERS',
            value='cloudflow-kafka-brokers.cloudflow.svc.cluster.local:9092')) \
    .add_env_variable(k8s_client.V1EnvVar(name='DEFAULT_RECOMMENDER_URL',
            value='http://recommendermodelserver.kubeflow.svc.cluster.local:8501')) \
    .add_env_variable(k8s_client.V1EnvVar(name='ALTERNATIVE_RECOMMENDER_URL',
            value='http://recommendermodelserver1.kubeflow.svc.cluster.local:8501'))
  publish.after(train)

Da die Pipeline-Definition nur aus Code besteht, kannst du sie kompakter gestalten, indem du eine Schleife verwendest, um die MinIO-Parameter zu setzen, anstatt dies auf jeder Stufe zu tun.

Wie zuvor müssen wir die Pipeline kompilieren, entweder explizit mit compiler.Compiler().compile oder implizit mitcreate_run_from_pipeline_func. Nun kannst du die Pipeline ausführen (siehe Abbildung 4-8).

Execution of Recommender Pipelines Example
Abbildung 4-8. Beispiel für die Ausführung von Empfehlungspipelines

Kubeflow Pipeline Komponenten

Zusätzlich zu den Container-Operationen, die wir gerade besprochen haben, bietet Kubeflow Pipelines weitere Operationen mit Komponenten an. Komponenten stellen verschiedene Kubernetes-Ressourcen oder externe Operationen (wie dataproc) zur Verfügung. Kubeflow-Komponenten ermöglichen es Entwicklern, Tools für maschinelles Lernen zu verpacken und dabei von den Besonderheiten der verwendeten Container oder CRDs zu abstrahieren.

Wir haben die Bausteine von Kubeflow ziemlich direkt verwendet, und wir haben die Komponente func_to_container verwendet.6 Einige Komponenten, wie func_to_container, sind als Python-Code verfügbar und können ganz normal importiert werden. Andere Komponenten werden über das component.yaml System von Kubeflow spezifiziert und müssen geladen werden. Unserer Meinung nach ist der beste Weg, mit Kubeflow-Komponenten zu arbeiten, ein bestimmtes Tag des Repos herunterzuladen, das uns die Verwendung von load_component_from_file ermöglicht, wie in Beispiel 4-15 gezeigt.

Beispiel 4-15. Pipeline-Freigabe
wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz
tar -xvf 0.2.5.tar.gz
Warnung

Es gibt eine load_component Funktion, die den Namen einer Komponente nimmt und versucht, ihn aufzulösen. Wir raten davon ab, diese Funktion zu verwenden, da sie standardmäßig einen Suchpfad verwendet, der den Master-Zweig der Pipelines-Bibliothek von Github abruft, der nicht stabil ist.

Im nächsten Kapitel werden wir uns eingehend mit Komponenten zur Datenaufbereitung befassen. Als Beispiel wollen wir uns jedoch kurz eine Komponente zum Abrufen von Dateien anschauen: . In unserem Empfehlungsbeispiel weiter oben im Kapitel haben wir einen speziellen vorgefertigten Container verwendet, um unsere Daten zu holen, da sie sich nicht in einem persistenten Volume befanden. Stattdessen können wir die Kubeflow GCS-Komponente google-cloud/storage/download/ verwenden, um unsere Daten herunterzuladen. Angenommen, du hast die Pipeline-Version wie in Beispiel 4-15 heruntergeladen, kannst du die Komponente mit load_component_from_file wie in Beispiel 4-16 laden.

Beispiel 4-16. GCS Download Komponente laden
gcs_download_component = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/google-cloud/storage/download/component.yaml")

Wenn eine Komponente geladen wird, gibt sie eine Funktion zurück, die bei ihrem Aufruf eine Stufe der Pipeline erzeugt. Die meisten Komponenten benötigen Parameter, um ihr Verhalten zu konfigurieren. Du kannst eine Liste der Komponentenoptionen erhalten, indem du help für die geladene Komponente aufrufst oder dir die component.yaml ansiehst. Für die GCS-Download-Komponente müssen wir mit gcs_path konfigurieren, was wir herunterladen, wie in Beispiel 4-17 gezeigt.

Beispiel 4-17. Laden einer Komponente der Pipeline Speicherung aus einem relativen Pfad und einem Weblink
    dl_op = gcs_download_component(
        gcs_path=
        "gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv"
    )  # Your path goes here

In Kapitel 5 erkunden wir weitere gängige Kubeflow-Pipeline-Komponenten für die Daten- und Feature-Vorbereitung.

Fortgeschrittene Themen in Pipelines

Alle Beispiele, die wir bisher gezeigt haben, sind rein sequenziell. Es gibt auch Fälle, in denen wir die Möglichkeit brauchen, Bedingungen zu prüfen und das Verhalten der Pipeline entsprechend zu ändern.

Bedingte Ausführung von Pipeline-Stufen

Kubeflow Pipelines ermöglicht bedingte Ausführungen über dsl.Condition. Schauen wir uns ein sehr einfaches Beispiel an, bei dem abhängig vom Wert einer Variablen verschiedene Berechnungen ausgeführt werden.

Es folgt ein einfaches Notizbuch, das dieses Beispiel umsetzt. Es beginnt mit den dafür notwendigen Importen in Beispiel 4-18.

Beispiel 4-18. Erforderliche Komponenten importieren
import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath

Sobald die Importe vorhanden sind, können wir einige einfache Funktionen implementieren, wie in Beispiel 4-19 gezeigt.

Beispiel 4-19. Implementierung von Funktionen
@func_to_container_op
def get_random_int_op(minimum: int, maximum: int) -> int:
    """Generate a random number between minimum and maximum (inclusive)."""
    import random
    result = random.randint(minimum, maximum)
    print(result)
    return result

@func_to_container_op
def process_small_op(data: int):
    """Process small numbers."""
    print("Processing small result", data)
    return

@func_to_container_op
def process_medium_op(data: int):
    """Process medium numbers."""
    print("Processing medium result", data)
    return

@func_to_container_op
def process_large_op(data: int):
    """Process large numbers."""
    print("Processing large result", data)
    return

Wir implementieren alle Funktionen direkt mit Python (wie im vorherigen Beispiel). Die erste Funktion erzeugt eine ganze Zahl zwischen 0 und 100, und die nächsten drei bilden ein einfaches Gerüst für die eigentliche Verarbeitung. Die Pipeline wird wie in Beispiel 4-20 implementiert.

Beispiel 4-20. Pipeline-Implementierung
@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def conditional_pipeline():
    number = get_random_int_op(0, 100).output 1
    with dsl.Condition(number < 10): 2
	process_small_op(number)
    with dsl.Condition(number > 10 and number < 50): 2
	process_medium_op(number)
    with dsl.Condition(number > 50): 2
	process_large_op(number)

kfp.Client().create_run_from_pipeline_func(conditional_pipeline, arguments={}) 3
1

Je nachdem, wie viele wir hier bekommen...

2

Wir werden zu einem dieser Betriebe weiterfahren.

3

Beachte, dass wir hier den leeren Parameter arguments-required angeben.

Schließlich der Ausführungsgraph, wie in Abbildung 4-9 dargestellt.

Execution of conditional Pipelines Example
Abbildung 4-9. Beispiel für die Ausführung von bedingten Pipelines

Anhand dieses Diagramms können wir sehen, dass sich die Pipeline tatsächlich in drei Zweige aufspaltet und in diesem Lauf die Prozess-Großauftragsausführung gewählt wird. Um zu überprüfen, ob dies korrekt ist, sehen wir uns das Ausführungsprotokoll an, das in Abbildung 4-10 dargestellt ist.

Viewing Conditional Pipeline Log
Abbildung 4-10. Anzeigen des bedingten Pipeline-Protokolls

Hier können wir sehen, dass die erzeugte Zahl 67 ist. Diese Zahl ist größer als 50, was bedeutet, dass der process_large_op-Zweig auf ausgeführt werden sollte.7

Pipelines nach Zeitplan ausführen

Wir haben unsere Pipeline manuell ausgeführt. Das ist gut für Tests, aber für Produktionsumgebungen oft unzureichend. Zum Glück kannst du Pipelines nach einem Zeitplan ausführen, wie auf dieserKubeflow-Dokumentationsseite beschrieben. Zuerst musst du eine Pipeline-Definition hochladen und eine Beschreibung angeben. Danach kannst du einen periodischen Lauf erstellen, indem du einen Lauf erstellst, den Lauftyp "Wiederkehrend" auswählst und dann den Anweisungen auf dem Bildschirm folgst (siehe Abbildung 4-11).

In dieser Abbildung stellen wir eine Pipeline so ein, dass sie jeden Tag läuft.

Warnung

Wenn wir einen periodischen Lauf erstellen, legen wir fest, wie oft eine Pipeline ausgeführt werden soll, nicht wann sie ausgeführt werden soll. In der aktuellen Implementierung wird der Zeitpunkt der Ausführung dadurch bestimmt, wann der Lauf erstellt wird. Sobald er erstellt ist, wird er sofort und dann mit der festgelegten Häufigkeit ausgeführt. Wenn zum Beispiel ein täglicher Lauf um 10 Uhr erstellt wird, wird er täglich um 10 Uhr ausgeführt.

Die Einstellung der regelmäßigen Ausführung von Pipelines ist eine wichtige Funktion, mit der du die Ausführung von Pipelines vollständig automatisieren kannst.

Setting Up Periodic Execution of a Pipeline
Abbildung 4-11. Einrichten der periodischen Ausführung einer Pipeline

Fazit

Du solltest jetzt die Grundlagen kennen, um einfache Pipelines zu erstellen, zu planen und auszuführen. Du hast auch gelernt, welche Werkzeuge Pipelines verwenden, wenn du Fehler beheben musst. Wir haben gezeigt, wie man bestehende Software in Pipelines integriert, wie man eine bedingte Ausführung innerhalb einer Pipeline implementiert und wie man Pipelines nach einem Zeitplan ausführt.

In unserem nächsten Kapitel schauen wir uns an, wie Pipelines für die Datenaufbereitung genutzt werden können, und zeigen einige Beispiele.

1 Dies kann oft automatisch abgeleitet werden, wenn das Ergebnis einer Pipelinestufe als Eingabe an andere übergeben wird. Du kannst zusätzliche Abhängigkeiten auch manuell angeben.

2 Kubernetes persistente Volumes können verschiedene Zugriffsmodi bieten.

3 Die generische Read-Write-Many-Implementierung ist ein NFS-Server.

4 Die Nutzung der Cloud Native Access Speicherung kann praktisch sein, wenn du die Portabilität deiner Lösung über mehrere Cloud-Provider hinweg sicherstellen musst.

5 Für die Installation von Argo Workflow auf einem anderen Betriebssystem, siehe diese Argo-Anleitung.

6 Viele der Standardkomponenten findest du in diesem Kubeflow GitHub Repo.

7 Ein etwas komplexeres Beispiel für bedingte Verarbeitung (mit verschachtelten Bedingungen) findest du auf dieser GitHub-Seite.

Get Kubeflow für maschinelles Lernen now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.