Capítulo 4. Canalizaciones Kubeflow
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
En el capítulo anterior describimos Kubeflow Pipelines,el componente de Kubeflow que orquesta las aplicaciones de aprendizaje automático. La orquestación es necesaria porque una implementación típica de aprendizaje automático utiliza una combinación de herramientas para preparar los datos, entrenar el modelo, evaluar el rendimiento e implementarlo. Al formalizar los pasos y su secuenciación en código, las canalizaciones permiten a los usuarios capturar formalmente todos los pasos del procesamiento de datos, garantizando su reproducibilidad y auditabilidad, así como los pasos de entrenamiento e implementación.
Comenzaremos este capítulo echando un vistazo a la interfaz de usuario de Pipelines y mostrando cómo empezar a escribir pipelines sencillos en Python. Exploraremos cómo transferir datos entre etapas, y luego continuaremos adentrándonos en formas de aprovechar aplicaciones existentes como parte de una canalización. También veremos el motor de flujo de trabajo subyacente -Argo Workflows, una herramienta estándar de canalización de Kubernetes- que Kubeflow utiliza para ejecutar canalizaciones. Comprender los fundamentos de Argo Workflows te permitirá obtener una comprensión más profunda de las canalizaciones de Kubeflow y te ayudará en la depuración. A continuación, mostraremos lo que Kubeflow Pipelines añade a Argo.
Concluiremos Kubeflow Pipelines mostrando cómo implementar la ejecución condicional en pipelines y cómo ejecutar la ejecución de pipelines de forma programada. Los componentes de tareas específicas de las canalizaciones se tratarán en sus respectivos capítulos.
Primeros pasos con las tuberías
La plataforma Kubeflow Pipelines se compone de:
-
Una interfaz de usuario para gestionar y seguir las canalizaciones y su ejecución
-
Un motor para programar la ejecución de un pipeline
-
Un SDK para definir, construir e implementar canalizaciones en Python
-
Soporte de cuaderno para utilizar el SDK y la ejecución de pipelines
La forma más fácil de familiarizarte con las canalizaciones es echar un vistazo a los ejemplos preempaquetados.
Explorando las Tuberías de Muestras Preempaquetadas
Para ayudar a los usuarios a entender los pipelines, Kubeflow se instala con unos cuantos pipelines de ejemplo. Puedes encontrarlos preempaquetados en la interfaz web de canalizaciones, como se ve en Figura 4-1. Ten en cuenta que, en el momento de escribir esto, sólo las canalizaciones de ejecución Básica a Condicional son genéricas, mientras que el resto sólo se ejecutarán en Google Kubernetes Engine (GKE). Si intentas ejecutarlas en entornos que no sean GKE, fallarán.
Al hacer clic en una canalización concreta, se mostrará su gráfico de ejecución o fuente, como se ve en la Figura 4-2.
Al hacer clic en la pestaña "Fuente" se mostrará el código compilado de la tubería, que es un archivo Argo YAML (esto se trata con más detalle en "Argo: la base de las tuberías").
En esta área puedes experimentar ejecutando pipelines para conocer mejor su ejecución y las capacidades de la interfaz de usuario de Pipelines.
Para invocar un pipeline concreto, sólo tienes que hacer clic en él; esto hará que aparezca la vista del Pipeline tal y como se presenta en la Figura 4-3.
Para ejecutar la tubería, haz clic en el botón "Crear ejecución" y sigue las instrucciones de que aparecen en pantalla.
Consejo
Al ejecutar un pipeline debes elegir un experimento. Experimento aquí es sólo una agrupación de conveniencia para las ejecuciones del pipeline (ejecuciones). Siempre puedes utilizar el experimento "Predeterminado" creado por la instalación de Kubeflow. Además, elige "Una sola vez" para el tipo de Ejecución para ejecutar el pipeline una sola vez. Hablaremos de la ejecución recurrente en "Ejecutar pipelines de forma programada".
Construir una tubería sencilla en Python
Hemos visto cómo ejecutar Pipelines Kubeflow precompilados, ahora vamos a investigar cómo crear nuestros propios pipelines nuevos. Los Pipelines Kubeflow se almacenan como archivos YAML ejecutados por un programa llamado Argo (ver "Argo: la base de los Pipelines"). Afortunadamente, Kubeflow expone un lenguaje específico del dominio (DSL ) de Python para la autoría de pipelines. El DSL es una representación pitónica de las operaciones que se realizan en el flujo de trabajo de ML y se ha creado pensando específicamente en las cargas de trabajo de ML. El DSL también permite utilizar algunas funciones sencillas de Python como etapas de canalización sin tener que construir explícitamente un contenedor.
Consejo
Los ejemplos del Capítulo 4 se encuentran en los cuadernos del repositorio GitHub de este libro.
Un pipeline es, en su esencia, un gráfico de ejecución de contenedores. Además de especificar qué contenedores deben ejecutarse y en qué orden, también permite al usuario pasar argumentos a toda la canalización y entre los contenedores participantes.
Para cada contenedor (cuando se utiliza el SDK de Python), debemos:
-
Crea el contenedor, ya sea como una simple función de Python, o con cualquier contenedor Docker (más información en el Capítulo 9).
-
Crea una operación que haga referencia a ese contenedor, así como los argumentos de la línea de comandos, los montajes de datos y la variable para pasar el contenedor.
-
Secuencia las operaciones, definiendo cuáles pueden suceder en paralelo y cuáles deben completarse antes de pasar a otro paso.1
-
Compila este pipeline, definido en Python, en un archivo YAML que los Pipelines de Kubeflow puedan consumir.
Los Pipelines son una característica clave de Kubeflow y volverás a verlos a lo largo del libro. En este capítulo vamos a mostrar los ejemplos más sencillos posibles para ilustrar los principios básicos de los Pipelines. Esto no te parecerá "aprendizaje automático" y eso es así por diseño.
Para nuestra primera operación de Kubeflow, vamos a utilizar una técnica conocida como funciones ligeras de Python. Sin embargo, no debemos dejar que la palabra ligero nos engañe. En una función Python ligera, definimos una función Python y luego dejamos que Kubeflow se encargue de empaquetar esa función en un contenedor y crear una operación.
En aras de la simplicidad, vamos a declarar eco a la más simple de las funciones. Es decir, una función que toma una única entrada, un número entero, y devuelve esa entrada.
Empecemos importando kfp
y definiendo nuestra función:
import
kfp
def
simple_echo
(
i
:
int
)
->
int
:
return
i
Advertencia
Ten en cuenta que utilizamos snake_case
, no camelCase
, para los nombres de nuestras funciones. En el momento de escribir esto, existe un error (¿característica?) por el que los nombres en camel case (por ejemplo: nombrar nuestra función simpleEcho
) producirán errores.
A continuación, queremos envolver nuestra función simple_echo
en una operación Kubeflow Pipeline. Hay un pequeño y bonito método para hacerlo: kfp.components.func_to_container_op
. Este método devuelve una función de fábrica con una firma fuertemente tipada:
simpleStronglyTypedFunction
=
kfp
.
components
.
func_to_container_op
(
deadSimpleIntEchoFn
)
Cuando creemos una canalización en el paso siguiente, la función de fábrica construirá una ContainerOp, que ejecutará la función original (echo_fn) en un contenedor:
foo
=
simpleStronglyTypedFunction
(
1
)
type
(
foo
)
Out
[
5
]:
kfp
.
dsl
.
_container_op
.
ContainerOp
Consejo
Si tu código puede ser acelerado por una GPU es fácil marcar una etapa como que utiliza recursos de la GPU; simplemente añade .set_gpu_limit(NUM_GPUS)
a tu ContainerOp
.
Ahora vamos a secuenciar el/los Contenedor(es) (sólo hay uno) en una canalización. Esta canalización tomará un parámetro (el número del que nos haremos eco). La canalización también tiene un poco de metadatos asociados. Aunque hacer eco de números puede ser un uso trivial de los parámetros, en casos de uso del mundo real incluirías variables que quizá quieras ajustar más adelante, como hiperparámetros para algoritmos de aprendizaje automático.
Por último, compilamos nuestro pipeline en un archivo YAML comprimido, que podemos subir a la interfaz de usuario de Pipelines.
@kfp.dsl.pipeline
(
name
=
'Simple Echo'
,
description
=
'This is an echo pipeline. It echoes numbers.'
)
def
echo_pipeline
(
param_1
:
kfp
.
dsl
.
PipelineParam
):
my_step
=
simpleStronglyTypedFunction
(
i
=
param_1
)
kfp
.
compiler
.
Compiler
()
.
compile
(
echo_pipeline
,
'echo-pipeline.zip'
)
Consejo
También es posible ejecutar el pipeline directamente desde el bloc de notas, lo que haremos en el siguiente ejemplo.
Un pipeline con un solo componente no es muy interesante. Para nuestro siguiente ejemplo, personalizaremos los contenedores de nuestras funciones ligeras de Python. Crearemos una nueva canalización que instale e importe bibliotecas Python adicionales, construya a partir de una imagen base especificada y pase la salida entrecontenedores.
Vamos a crear una tubería que divida un número por otro número, y luego sume un tercer número. Primero vamos a crear nuestra sencilla función add
, como se muestra en el Ejemplo 4-1.
Ejemplo 4-1. Una función sencilla de Python
def
add
(
a
:
float
,
b
:
float
)
->
float
:
'''Calculates sum of two arguments'''
return
a
+
b
add_op
=
comp
.
func_to_container_op
(
add
)
A continuación, vamos a crear una función algo más compleja. Además, hagamos que esta función requiera e importe de una biblioteca Python no estándar de , numpy
. Esto debe hacerse dentro de la función. Esto se debe a que las importaciones globales del bloc de notas no se empaquetarán en los contenedores que creemos. Por supuesto, también es importante asegurarse de que nuestro contenedor tiene instaladas las bibliotecas que estamos importando.
Para ello, pasaremos el contenedor concreto que queremos utilizar como imagen base a .func_to_container(
, como en el Ejemplo 4-2.
Ejemplo 4-2. Una función Python menos simple
from
typing
import
NamedTuple
def
my_divmod
(
dividend
:
float
,
divisor
:
float
)
-
>
\
NamedTuple
(
'
MyDivmodOutput
'
,
[
(
'
quotient
'
,
float
)
,
(
'
remainder
'
,
float
)
]
)
:
'''Divides two numbers and calculate the quotient and remainder'''
#Imports inside a component function:
import
numpy
as
np
#This function demonstrates how to use nested functions inside a
# component function:
def
divmod_helper
(
dividend
,
divisor
)
:
return
np
.
divmod
(
dividend
,
divisor
)
(
quotient
,
remainder
)
=
divmod_helper
(
dividend
,
divisor
)
from
collections
import
namedtuple
divmod_output
=
namedtuple
(
'
MyDivmodOutput
'
,
[
'
quotient
'
,
'
remainder
'
]
)
return
divmod_output
(
quotient
,
remainder
)
divmod_op
=
comp
.
func_to_container_op
(
my_divmod
,
base_image
=
'
tensorflow/tensorflow:1.14.0-py3
'
)
Ahora construiremos una canalización. La cadena del Ejemplo 4-3 utiliza las funciones definidas anteriormente, my_divmod
y add
, como etapas.
Ejemplo 4-3. Una tubería sencilla
@dsl.pipeline( name='Calculation pipeline', description='A toy pipeline that performs arithmetic calculations.' ) def calc_pipeline( a='a', b='7', c='17', ): #Passing pipeline parameter and a constant value as operation arguments add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. #Passing a task output reference as operation arguments #For an operation with a single return value, the output # reference can be accessed using `task.output` # or `task.outputs['output_name']` syntax divmod_task = divmod_op(add_task.output, b) #For an operation with multiple return values, the output references # can be accessed using `task.outputs['output_name']` syntax result_task = add_op(divmod_task.outputs['quotient'], c)
Por último, utilizamos el cliente para enviar el pipeline para su ejecución, que devuelve los enlaces a la ejecución y al experimento. Los experimentos agrupan las ejecuciones. Si lo prefieres, también puedes utilizar kfp.compiler.Compiler().compile
y subir el archivo zip como en el primer ejemplo:
client
=
kfp
.
Client
()
#Specify pipeline argument values
# arguments = {'a': '7', 'b': '8'} #whatever makes sense for new version
#Submit a pipeline run
client
.
create_run_from_pipeline_func
(
calc_pipeline
,
arguments
=
arguments
)
Siguiendo el enlace devuelto por create_run_from_pipeline_func
, podemos llegar a la interfaz de usuario web de la ejecución , que muestra el pipeline propiamente dicho y los resultados intermedios, como se ve en la Figura 4-4.
Como hemos visto, lo de ligero en las funciones Python ligeras se refiere a la facilidad de realizar estos pasos en nuestro proceso y no a la potencia de las funciones en sí. Podemos utilizar importaciones personalizadas, imágenes base y la forma de pasar pequeños resultados entre contenedores.
En la siguiente sección, mostraremos cómo pasar archivos de datos más grandes entre contenedores montando volúmenes en los contenedores.
Almacenar datos entre pasos
En el ejemplo anterior, los datos pasados entre contenedores eran pequeños y de tipos primitivos (como numéricos, cadenas, listas y matrices). En la práctica, sin embargo, es probable que pasemos datos mucho más grandes (por ejemplo, conjuntos de datos completos). En Kubeflow, hay dos métodos principales para hacerlo: volúmenes persistentes dentro del clúster Kubernetes y opciones de almacenamiento en la nube (como S3), aunque cada método tiene problemas inherentes.
Los volúmenes persistentes abstraen la capa de almacenamiento. Dependiendo del proveedor, los volúmenes persistentes pueden ser lentos con el aprovisionamiento y tener límites de E/S. Comprueba si tu proveedor admite clases de almacenamiento de lectura-escritura-muchos, lo que permite el acceso al almacenamiento por parte de varios pods, necesario para algunos tipos de paralelismo. Las clases de almacenamiento pueden ser una de las siguientes2
- LecturaEscrituraUnaVez
-
El volumen puede ser montado como lectura-escritura por un solo nodo.
- ReadOnlyMany
-
El volumen puede ser montado como sólo lectura por muchos nodos.
- LeerEscribirMuchos
-
El volumen puede ser montado como lectura-escritura por muchos nodos.
Es posible que el administrador de tu sistema/cluster pueda añadir soporte de lectura-escritura-manía.3 Además, muchos proveedores de la nube incluyen sus propias implementaciones de lectura-escritura-manía, véase por ejemplo el aprovisionamiento dinámico en GKE. pero asegúrate de preguntar si existe un cuello de botella en un único nodo.
VolumeOp
de Kubeflow Pipelines te permite crear un volumen persistente gestionado automáticamente, como se muestra en el Ejemplo 4-4. Para añadir el volumen a tu operación, sólo tienes que llamar a add_pvolumes
con un diccionario de puntos de montaje a volúmenes, por ejemplo, download_data_op(year).add_pvolumes({"/data_processing": dvop.volume})
.
Ejemplo 4-4. Preparación de datos de la lista de correo
dvop
=
dsl
.
VolumeOp
(
name
=
"create_pvc"
,
resource_name
=
"my-pvc-2"
,
size
=
"5Gi"
,
modes
=
dsl
.
VOLUME_MODE_RWO
)
Aunque es menos habitual en los ejemplos de Kubeflow, utilizar una solución de almacenamiento de objetos, en algunos casos, puede ser más adecuado. MinIO proporciona almacenamiento de objetos nativo en la nube funcionando como pasarela a un motor de almacenamiento de objetos existente o por sí mismo.4 Ya vimos cómo configurar MinIO en el Capítulo 3.
El mecanismo file_output
incorporado en Kubeflow transfiere automáticamente el archivo local especificado a MinIO entre los pasos del pipeline por ti. Para utilizar file_output
, escribe tus archivos localmente en tu contenedor y especifica el parámetro en tu ContainerOp
, como se muestra en el Ejemplo 4-5.
Ejemplo 4-5. Ejemplo de salida de archivo
fetch
=
kfp
.
dsl
.
ContainerOp
(
name
=
'download'
,
image
=
'busybox'
,
command
=
[
'sh'
,
'-c'
],
arguments
=
[
'sleep 1;'
'mkdir -p /tmp/data;'
'wget '
+
data_url
+
' -O /tmp/data/results.csv'
],
file_outputs
=
{
'downloaded'
:
'/tmp/data'
})
# This expects a directory of inputs not just a single file
Si no quieres utilizar MinIO, también puedes utilizar directamente el almacenamiento de objetos de tu proveedor, pero esto puede comprometer cierta portabilidad.
La capacidad de montar datos localmente es una tarea esencial en cualquier canal de aprendizaje automático. Aquí hemos esbozado brevemente varios métodos y proporcionado ejemplos de cada uno de ellos.
Introducción a los componentes de las tuberías Kubeflow
Kubeflow Pipelines se basa en Argo Workflows, un motor de flujo de trabajo de código abierto y nativo de contenedores para Kubernetes. En esta sección describiremos cómo funciona Argo, qué hace, y cómo Kubeflow Pipelines complementa a Argo para facilitar su uso a los científicos de datos.
Argo: la base de los oleoductos
Kubeflow instala todos los componentes de Argo. Aunque no es necesario tener Argo instalado en tu ordenador para utilizar las canalizaciones de Kubeflow, disponer de la herramienta de línea de comandos de Argo facilita la comprensión y depuración de tus canalizaciones.
Consejo
Por defecto, Kubeflow configura Argo para que utilice el ejecutor Docker. Si tu plataforma no admite las API de Docker, debes cambiar tu ejecutor por uno compatible. Esto se hace cambiando el valor containerRuntimeExecutor
en el archivo params de Argo. Consulta el Apéndice A para más detalles sobre los cambios. La mayoría de los ejemplos de este libro utilizan el ejecutor Docker, pero pueden adaptarse a otros ejecutores.
En macOS, puedes instalar Argo con Homebrew, como se muestra en el Ejemplo 4-6.5
Ejemplo 4-6. Instalación de Argo
#!/bin/bash
# Download the binary
curl -sLO https://github.com/argoproj/argo/releases/download/v2.8.1/argo-linux-amd64# Make binary executable
chmod +x argo-linux-amd64# Move binary to path
mv ./argo-linux-amd64 ~/bin/argo
Puedes verificar tu instalación de Argo ejecutando los ejemplos de Argo con la herramienta de línea de comandos en el espacio de nombres Kubeflow: sigue estas instrucciones de Argo. Cuando ejecutes los ejemplos de Argo, los pipelines serán visibles con el comando argo
, como en el Ejemplo 4-7.
Ejemplo 4-7. Listado de ejecuciones de Argo
$
argo list -n kubeflow
NAME STATUS AGE DURATION
loops-maps-4mxp5 Succeeded 30m 12s
hello-world-wsxbr Succeeded 39m 15s
Puesto que los pipelines se implementan con Argo, puedes utilizar la misma técnica para comprobarlos también. También puedesobtener información sobre la ejecución de un flujo de trabajo concreto, como se muestra en el Ejemplo 4-8.
Ejemplo 4-8. Obtener detalles de la ejecución de Argo
$
argo
get
hello-world-wsxbr
-n
kubeflow
Name:
hello-world-wsxbr
Namespace:
kubeflow
ServiceAccount:
default
Status:
Succeeded
Created:
Tue
Feb
12
10:05:04
-0600
(
2
minutes
ago
)
Started:
Tue
Feb
12
10:05:04
-0600
(
2
minutes
ago
)
Finished:
Tue
Feb
12
10:05:23
-0600
(
1
minute
ago
)
Duration:
19
seconds
STEP
PODNAME
DURATION
MESSAGE
✔
hello-world-wsxbr
hello-world-wsxbr
18s
También podemos ver los registros de ejecución utilizando el comando del Ejemplo 4-9.
Ejemplo 4-9. Obtener el registro de ejecución de Argo
$
argo logs hello-world-wsxbr -n kubeflow
Esto produce el resultado que se muestra en el Ejemplo 4-10.
Ejemplo 4-10. Registro de ejecución de Argo
< hello world > -------------\
\
\
## .
## ## ## ==
## ## ## ## ===
/""""""""""""""""
___/===
~~~{
~~ ~~~~ ~~~ ~~~~ ~~ ~ /===
- ~~~\_
_____ o __/\
\
__/\_
___\_
_____/
También puedes eliminar un flujo de trabajo concreto; consulta el Ejemplo 4-11.
Ejemplo 4-11. Borrar la ejecución de Argo
$
argo delete hello-world-wsxbr -n kubeflow
Alternativamente, puedes obtener información sobre la ejecución de la tubería utilizando la interfaz de usuario de Argo, como se ve en la Figura 4-5.
También puedes ver los detalles del gráfico de ejecución del flujo haciendo clic en un flujo de trabajo concreto, como se ve en la Figura 4-6.
Para cualquier canalización Kubeflow que ejecutes, también puedes ver esa canalización en la CLI/UI de Argo. Ten en cuenta que, como las canalizaciones ML utilizan el CRD de Argo, también puedes ver el resultado de la ejecución de la canalización en la IU de Argo (como en la Figura 4-7).
Consejo
Actualmente, la comunidad Kubeflow está estudiando activamente tecnologías fundacionales alternativas de para ejecutar tuberías Kubeflow, una de las cuales es Tekton. El artículo de A. Singh et al., " Kubeflow Pipelines with Tekton", ofrece "un diseño inicial, especificaciones y código para permitir que los Kubeflow Pipelines se ejecuten sobre Tekton". La idea básica aquí es crear un formato intermedio que pueda ser producido por tuberías y luego ejecutado utilizando Argo, Tekton u otros tiempos de ejecución. El código inicial para esta implementación se encuentra en este repositorio GitHub de Kubeflow.
Qué añade Kubeflow Pipelines al flujo de trabajo de Argo
Argo subyace a la ejecución del flujo de trabajo; sin embargo, utilizarlo directamente requiere que hagas cosas incómodas. En primer lugar, debes definir tu flujo de trabajo en YAML, lo que puede resultar difícil. En segundo lugar, debes contenerizar tu código, lo que puede resultar tedioso. La principal ventaja de KF Pipelines es que puedes utilizar las API de Python para definir/crear pipelines, lo que automatiza la generación de gran parte de la jerga YAML para las definiciones de flujos de trabajo y es extremadamente fácil para los científicos de datos/desarrolladores de Python. Kubeflow Pipelines también tiene ganchos que añaden bloques de construcción paracomponentes específicos del aprendizaje automático. Estas API no sólo generan el YAML, sino que también pueden simplificar la creación de contenedores y el uso de recursos. Además de las API, Kubeflow añade un programador recurrente y una interfaz de usuario para la configuración y la ejecución.
Construir una tubería utilizando imágenes existentes
Construir etapas de canalización directamente desde Python proporciona un punto de entrada sencillo. Sin embargo, limita nuestra implementacióna Python. Otra característica de Kubeflow Pipelines es la posibilidad de orquestar la ejecución de una implementación multilenguaje aprovechando imágenes Docker precompiladas (véase el Capítulo 9).
Además de nuestras importaciones anteriores, también queremos importar el cliente de Kubernetes , que nos permite utilizar funciones de Kubernetes directamente desde código Python (ver Ejemplo 4-12).
Ejemplo 4-12. Exportar cliente Kubernetes
from
kubernetes
import
client
as
k8s_client
De nuevo, creamos un cliente y un experimento para ejecutar nuestro pipeline. Como ya hemos dicho, los experimentos de agrupan las ejecuciones de las canalizaciones. Sólo puedes crear un experimento determinado una vez, por lo que el Ejemplo 4-13 muestra cómo crear un experimento nuevo o utilizar uno existente.
Ejemplo 4-13. Obtener experimento de tuberías
client
=
kfp
.
Client
()
exp
=
client
.
get_experiment
(
experiment_name
=
'mdupdate'
)
Ahora creamos nuestra canalización(Ejemplo 4-14). Las imágenes utilizadas tienen que ser accesibles, y estamos especificando los nombres completos, para que se resuelvan. Como estos contenedores están precompilados, tenemos que configurarlos para nuestra canalización.
Los contenedores preconstruidos que estamos utilizando tienen su almacenamiento configurado por las variables de entorno MINIO_*
. Así que los configuramos para que utilicen nuestra instalación local de MinIO llamando a add_env_variable
.
Además de las dependencias automáticas que se crean al pasar parámetros entre etapas, también puedes especificar que una etapa requiere una etapa anterior con after
. Esto es muy útil cuando hay un efecto secundario externo, como la actualización de una base de datos.
Ejemplo 4-14. Ejemplo de canal de recomendación
@dsl.pipeline
(
name
=
'Recommender model update'
,
description
=
'Demonstrate usage of pipelines for multi-step model update'
)
def
recommender_pipeline
():
# Load new data
data
=
dsl
.
ContainerOp
(
name
=
'updatedata'
,
image
=
'lightbend/recommender-data-update-publisher:0.2'
)
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_URL'
,
value
=
'http://minio-service.kubeflow.svc.cluster.local:9000'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_KEY'
,
value
=
'minio'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_SECRET'
,
value
=
'minio123'
))
# Train the model
train
=
dsl
.
ContainerOp
(
name
=
'trainmodel'
,
image
=
'lightbend/ml-tf-recommender:0.2'
)
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_URL'
,
value
=
'minio-service.kubeflow.svc.cluster.local:9000'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_KEY'
,
value
=
'minio'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_SECRET'
,
value
=
'minio123'
))
train
.
after
(
data
)
# Publish new model
publish
=
dsl
.
ContainerOp
(
name
=
'publishmodel'
,
image
=
'lightbend/recommender-model-publisher:0.2'
)
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_URL'
,
value
=
'http://minio-service.kubeflow.svc.cluster.local:9000'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_KEY'
,
value
=
'minio'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'MINIO_SECRET'
,
value
=
'minio123'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'KAFKA_BROKERS'
,
value
=
'cloudflow-kafka-brokers.cloudflow.svc.cluster.local:9092'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'DEFAULT_RECOMMENDER_URL'
,
value
=
'http://recommendermodelserver.kubeflow.svc.cluster.local:8501'
))
\.
add_env_variable
(
k8s_client
.
V1EnvVar
(
name
=
'ALTERNATIVE_RECOMMENDER_URL'
,
value
=
'http://recommendermodelserver1.kubeflow.svc.cluster.local:8501'
))
publish
.
after
(
train
)
Como la definición de la tubería es sólo código, puedes hacerla más compacta utilizando un bucle para establecer los parámetros MinIO en lugar de hacerlo en cada etapa.
Como antes, tenemos que compilar la tubería, explícitamente con compiler.Compiler().compile
o implícitamente concreate_run_from_pipeline_func
. Ahora sigue adelante y ejecuta la tubería (como en la Figura 4-8).
Componentes de la tubería Kubeflow
Además de las operaciones con contenedores que acabamos de comentar, Kubeflow Pipelines también expone operaciones adicionales con componentes. Los componentes exponen diferentes recursos de Kubernetes u operaciones externas (como dataproc
). Los componentes de Kubeflow permiten a los desarrolladores empaquetar herramientas de aprendizaje automático abstrayéndose de las particularidades de los contenedores o CRD utilizados.
Hemos utilizado los bloques de construcción de Kubeflow de forma bastante directa, y hemos utilizado el componente func_to_container
.6
Algunos componentes, como func_to_container
, están disponibles como código Python y pueden importarse de forma normal. Otros componentes se especifican mediante el sistema component.yaml
de Kubeflow y hay que cargarlos. En nuestra opinión, la mejor forma de trabajar con componentes Kubeflow es descargar una etiqueta específica del repositorio, lo que nos permite utilizar load_component_from_file
, como se muestra en el Ejemplo 4-15.
Ejemplo 4-15. Liberación de tuberías
wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz tar -xvf 0.2.5.tar.gz
Advertencia
Existe una función load_component
que toma el nombre de un componente de e intenta resolverlo. No te recomendamos que utilices esta función, ya que utiliza por defecto una ruta de búsqueda que incluye la obtención, desde Github, de la rama maestra de la biblioteca pipelines, que es inestable.
Exploraremos en profundidad los componentes de preparación de datos en el próximo capítulo; sin embargo, veamos rápidamente un componente de obtención de archivos como ejemplo. En nuestro ejemplo de recomendador anterior del capítulo, utilizamos un contenedor especial preconstruido para obtener nuestros datos, ya que no estaban en un volumen persistente. En su lugar, podemos utilizar el componente GCS de Kubeflow google-cloud/storage/download/
para descargar nuestros datos. Suponiendo que hayas descargado la versión de la tubería como en el Ejemplo 4-15, puedes cargar el componente con load_component_from_file
como en el Ejemplo 4-16.
Ejemplo 4-16. Cargar componente de descarga GCS
gcs_download_component
=
kfp
.
components
.
load_component_from_file
(
"pipelines-0.2.5/components/google-cloud/storage/download/component.yaml"
)
Cuando se carga un componente, devuelve una función que produce una etapa pipeline al ser llamada. La mayoría de los componentes toman parámetros para configurar su comportamiento. Puedes obtener una lista de las opciones de los componentes llamando a help
en el componente cargado, o consultando el component.yaml. El componente de descarga GCS requiere que configuremos lo que estamos descargando con gcs_path
, que se muestra en el Ejemplo 4-17.
Ejemplo 4-17. Cargar el componente de almacenamiento pipeline desde una ruta relativa y un enlace web
dl_op
=
gcs_download_component
(
gcs_path
=
"gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv"
)
# Your path goes here
En el Capítulo 5, exploramos componentes más comunes de la canalización Kubeflow para la preparación de datos y características.
Temas avanzados en tuberías
Todos los ejemplos que hemos mostrado hasta ahora son puramente secuenciales. También hay casos en los que necesitamos la capacidad de comprobar condiciones y cambiar el comportamiento de la tubería en consecuencia.
Ejecución condicional de las etapas de la tubería
Kubeflow Pipelines permite ejecuciones condicionales a través de dsl.Condition
. Veamos un ejemplo muy sencillo, en el que, en función del valor de una variable, se ejecutan diferentes cálculos.
A continuación se presenta un cuaderno sencillo que pone en práctica este ejemplo. Comienza con las importaciones necesarias para ello, en el Ejemplo 4-18.
Ejemplo 4-18. Importar componentes necesarios
import
kfp
from
kfp
import
dsl
from
kfp.components
import
func_to_container_op
,
InputPath
,
OutputPath
Una vez realizadas las importaciones, podemos implementar varias funciones sencillas, como se muestra en el Ejemplo 4-19.
Ejemplo 4-19. Implementación de funciones
@func_to_container_op
def
get_random_int_op
(
minimum
:
int
,
maximum
:
int
)
->
int
:
"""Generate a random number between minimum and maximum (inclusive)."""
import
random
result
=
random
.
randint
(
minimum
,
maximum
)
(
result
)
return
result
@func_to_container_op
def
process_small_op
(
data
:
int
):
"""Process small numbers."""
(
"Processing small result"
,
data
)
return
@func_to_container_op
def
process_medium_op
(
data
:
int
):
"""Process medium numbers."""
(
"Processing medium result"
,
data
)
return
@func_to_container_op
def
process_large_op
(
data
:
int
):
"""Process large numbers."""
(
"Processing large result"
,
data
)
return
Implementamos todas las funciones directamente utilizando Python (como en el ejemplo anterior). La primera función genera un número entero entre 0 y 100, y las tres siguientes constituyen un esqueleto simple para el procesamiento real. La tubería se implementa como en el Ejemplo 4-20.
Ejemplo 4-20. Implementación de tuberías
@dsl.pipeline
(
name
=
'
Conditional execution pipeline
'
,
description
=
'
Shows how to use dsl.Condition().
'
)
def
conditional_pipeline
(
)
:
number
=
get_random_int_op
(
0
,
100
)
.
output
with
dsl
.
Condition
(
number
<
10
)
:
process_small_op
(
number
)
with
dsl
.
Condition
(
number
>
10
and
number
<
50
)
:
process_medium_op
(
number
)
with
dsl
.
Condition
(
number
>
50
)
:
process_large_op
(
number
)
kfp
.
Client
(
)
.
create_run_from_pipeline_func
(
conditional_pipeline
,
arguments
=
{
}
)
Por último, el gráfico de ejecución, como se muestra en la Figura 4-9.
A partir de este gráfico, podemos ver que la tubería realmente se divide en tres ramas y que en esta ejecución se selecciona la ejecución proceso-grande-op. Para validar que esto es correcto, miramos el registro de ejecución, que se muestra en la Figura 4-10.
Aquí podemos ver que el número generado es 67. Este número es mayor que 50, lo que significa que la rama process_large_op debe ejecutarse .7
Ejecutar tuberías según lo previsto
Hemos ejecutado nuestro pipeline manualmente. Esto es bueno para las pruebas, pero a menudo es insuficiente para los entornos de producción. Afortunadamente, puedes ejecutar canalizaciones de forma programada, como se describe en esta página de documentación deKubeflow. En primer lugar, tienes que cargar una definición de canalización y especificar una descripción. Una vez hecho esto, puedes crear una ejecución periódica creando una ejecución y seleccionando un tipo de ejecución "Recurrente", y luego siguiendo las instrucciones de la pantalla, como se ve en la Figura 4-11.
En esta figura estamos configurando una canalización para que se ejecute todos los días.
Advertencia
Al crear una ejecución periódica estamos especificando con qué frecuencia ejecutar una tubería, no cuándo ejecutarla. En la implementación actual, el tiempo de ejecución se define por el momento en que se crea la ejecución. Una vez creada, se ejecuta inmediatamente y luego se ejecuta con la frecuencia definida. Si, por ejemplo, una ejecución diaria se crea a las 10 de la mañana, se ejecutará a las 10 de la mañana todos los días.
Establecer la ejecución periódica de las canalizaciones es una funcionalidad importante, que te permite automatizar completamente la ejecución de las canalizaciones.
Conclusión
Ahora deberías tener las nociones básicas sobre cómo construir, programar y ejecutar algunas canalizaciones sencillas. También has aprendido sobre las herramientas que utilizan las canalizaciones para cuando necesitas depurar. Hemos mostrado cómo integrar el software existente en las canalizaciones, cómo implementar la ejecución condicional dentro de una canalización y cómo ejecutar canalizaciones programadas.
En nuestro próximo capítulo, veremos cómo utilizar canalizaciones para la preparación de datos con algunos ejemplos.
1 A menudo esto puede deducirse automáticamente al pasar el resultado de una etapa del pipeline como entrada a otras. También puedes especificar dependencias adicionales manualmente.
2 Los volúmenes persistentes de Kubernetes pueden ofrecer distintos modos de acceso.
3 La implementación genérica de lectura-escritura-muchos es el servidor NFS.
4 El uso del almacenamiento de acceso nativo en la nube puede ser útil si necesitas garantizar la portabilidad de tu solución a través de varios proveedores de la nube.
5 Para instalar Argo Workflow en otro sistema operativo, consulta estas instrucciones de Argo.
6 Muchos de los componentes estándar están en este repositorio GitHub de Kubeflow.
7 Puedes encontrar un ejemplo algo más complejo de procesamiento condicional (con condiciones anidadas) en este sitio de GitHub.
Get Kubeflow para el aprendizaje automático now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.