Capítulo 4. Actores remotos
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
En el capítulo anterior de , aprendiste sobre las funciones remotas de Ray, que son útiles para la ejecución en paralelo de funciones sin estado. Pero, ¿qué ocurre si necesitas mantener un estado entre invocaciones? Los ejemplos de este tipo de situaciones abarcan desde un simple contador hasta una red neuronal durante el entrenamiento, pasando por el entorno de un simulador.
Una opción para mantener el estado en estas situaciones es devolver el estado junto con el resultado y pasarlo a la siguiente llamada. Aunque técnicamente esto funcione, no es la mejor solución, debido a la gran cantidad de datos que hay que pasar (especialmente cuando el tamaño del estado empieza a crecer). Ray utiliza actores, de los que hablaremos en este capítulo, para gestionar el estado.
Nota
Al igual que las funciones remotas de Ray, todos los actores de Ray son actores remotos, incluso cuando se ejecutan en la misma máquina.
En pocas palabras, un actor es un proceso informático con una dirección (handle). Esto significa que un actor también puede almacenar cosas en memoria, privadas para el proceso actor. Antes de profundizar en los detalles de la implementación y escalado de los actores Ray, echemos un vistazo a los conceptos que los sustentan. Los actores provienen del patrón de diseño del modelo de actor. Comprender el modelo de actor es clave para gestionar eficazmente el estado y la concurrencia.
Comprender el modelo de actor
El modelo de actor fue introducido por Carl Hewitt en 1973 para tratar la computación concurrente. El corazón de este modelo conceptual es un actor, una primitiva universal de computación concurrente con su estado.
Un actor tiene un trabajo sencillo:
-
Almacenar datos
-
Recibir mensajes de otros actores
-
Pasar mensajes a otros actores
-
Crear actores secundarios adicionales
Los datos que almacena un actor son privados para él y no son visibles desde el exterior; sólo el propio actor puede acceder a ellos y modificarlos. Cambiar el estado del actor requiere enviar mensajes al actor que modifiquen el estado. (Compáralo con el uso de llamadas a métodos en la programación orientada a objetos).
Para garantizar la coherencia del estado de un actor, los actores procesan una solicitud cada vez. Todas las invocaciones a métodos de actores se serializan globalmente para un actor determinado. Para mejorar el rendimiento, la gente suele crear un grupo de actores (suponiendo que puedan fragmentar o replicar el estado del actor).
El modelo de actor se adapta bien a muchos escenarios de sistemas distribuidos. He aquí algunos casos de uso típicos en los que el modelo de actor puede ser ventajoso:
-
Tienes que tratar con un gran estado distribuido que es difícil de sincronizar entre invocaciones.
-
Quieres trabajar con objetos de un solo hilo que no requieran una interacción significativa de componentes externos.
En ambas situaciones, implementarías las partes independientes del trabajo dentro de un actor. Puedes poner cada pieza de estado independiente dentro de su propio actor, y entonces cualquier cambio en el estado entra a través del actor. La mayoría de las implementaciones del sistema de actores evitan los problemas de concurrencia utilizando sólo actores de un único hilo.
Ahora que conoces los principios generales del modelo de actor, veamos más de cerca los actores remotos de Ray .
Creación de un Actor Remoto de Rayos Básico
Ray implementa actores remotos como trabajadores con estado. Cuando creas un nuevo actor remoto, Ray crea un nuevo trabajador y programa los métodos del actor en ese trabajador.
Un ejemplo común de actor es una cuenta bancaria. Veamos cómo implementar una cuenta utilizando actores remotos Ray. Crear un actor remoto Ray es tan sencillo como decorar una clase Python con el decorador @ray.remote
(Ejemplo 4-1).
Ejemplo 4-1. Implementación de un actor remoto Ray
@ray
.
remote
class
Account
:
def
__init__
(
self
,
balance
:
float
,
minimal_balance
:
float
):
self
.
minimal
=
minimal_balance
if
balance
<
minimal_balance
:
raise
Exception
(
"Starting balance is less than minimal balance"
)
self
.
balance
=
balance
def
balance
(
self
)
->
float
:
return
self
.
balance
def
deposit
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot deposit negative amount"
)
self
.
balance
=
self
.
balance
+
amount
return
self
.
balance
def
withdraw
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot withdraw negative amount"
)
balance
=
self
.
balance
-
amount
if
balance
<
self
.
minimal
:
raise
Exception
(
"Withdrawal is not supported by current balance"
)
self
.
balance
=
balance
return
balance
La propia clase de actor Account
es bastante sencilla y tiene cuatro métodos:
- El constructor
-
Crea una cuenta basándose en el saldo inicial y mínimo. También se asegura de que el saldo actual es mayor que el mínimo y lanza una excepción en caso contrario.
balance
-
Devuelve el saldo actual de la cuenta. Como el estado de un actor es privado para el actor, sólo se puede acceder a él a través del método del actor.
deposit
-
Ingresa un importe en la cuenta y devuelve un nuevo saldo.
withdraw
-
Retira una cantidad de la cuenta y devuelve un nuevo saldo. También se asegura de que el saldo restante es mayor que el saldo mínimo predefinido y lanza una excepción en caso contrario.
Ahora que has definido la clase, tienes que utilizar .remote
para crear una instancia de este actor(Ejemplo 4-2).
Ejemplo 4-2. Crear una instancia de tu actor remoto Ray
account_actor
=
Account
.
remote
(
balance
=
100.
,
minimal_balance
=
20.
)
Aquí, account_actor
representa un asa de actor. Estos manejadores desempeñan un papel importante en el ciclo de vida del actor. Los procesos de los actores terminan automáticamente cuando el manejador inicial del actor sale del ámbito de Python (ten en cuenta que, en este caso, se pierde el estado del actor).
Consejo
Puedes crear varios actores distintos a partir de la misma clase. Cada uno tendrá su propio estado independiente.
Al igual que con un ObjectRef
, puedes pasar un manejador de actor como parámetro a otro actor o función remota Ray o código Python.
Observa que el Ejemplo 4-1 utiliza la anotación @ray.remote
para definir una clase Python ordinaria como actor remoto Ray. Alternativamente, en lugar de utilizar una anotación, puedes utilizar el Ejemplo 4-3 para convertir una clase Python en un actor remoto.
Ejemplo 4-3. Creación de una instancia de un actor remoto Ray sin el decorador
Account
=
ray
.
remote
(
Account
)
account_actor
=
Account
.
remote
(
balance
=
100.
,
minimal_balance
=
20.
)
Una vez que tengas un actor remoto, puedes invocarlo a utilizando el Ejemplo 4-4.
Ejemplo 4-4. Invocar a un actor remoto
(
f
"Current balance
{
ray
.
get
(
account_actor
.
balance
.
remote
())
}
"
)
(
f
"New balance
{
ray
.
get
(
account_actor
.
withdraw
.
remote
(
40.
))
}
"
)
(
f
"New balance
{
ray
.
get
(
account_actor
.
deposit
.
remote
(
30.
))
}
"
)
Consejo
Es importante para manejar las excepciones, que en el ejemplo pueden ocurrir tanto en el código del método de depósito como en el de retirada. Para gestionar las excepciones, debes aumentar el Ejemplo 4-4 con las cláusulas try
/except
:
try
:
result
=
ray
.
get
(
account_actor
.
withdraw
.
remote
(
-
40.
))
except
Exception
as
e
:
(
f
"Oops! \
{
e
}
occurred."
)
Esto garantiza que el código interceptará todas las excepciones lanzadas por el código del actor e implementará todas las acciones necesarias.
También puedes crear actores con nombre utilizando el Ejemplo 4-5.
Ejemplo 4-5. Crear un actor con nombre
account_actor
=
Account
.
options
(
name
=
'Account'
)
\.
remote
(
balance
=
100.
,
minimal_balance
=
20.
)
Una vez que el actor tiene un nombre, puedes utilizarlo para obtener el manejador del actor desde cualquier lugar del código:
ray
.
get_actor
(
'Account'
)
Como se ha definido anteriormente, el ciclo de vida del actor por defecto está vinculado a que el manejador del actor esté en el ámbito.
El tiempo de vida de un actor puede desacoplarse de que su manejador esté en scope, lo que permite que un actor persista incluso después de que salga el proceso del controlador. Puedes crear un actor desacoplado especificando el parámetro de tiempo de vida como detached
(Ejemplo 4-6).
Ejemplo 4-6. Hacer un actor independiente
account_actor
=
Account
.
options
(
name
=
'Account'
,
lifetime
=
'detached'
)
\.
remote
(
balance
=
100.
,
minimal_balance
=
20.
)
En teoría, puedes hacer que un actor se desvincule sin especificar su nombre, pero como ray.get_actor
funciona por nombres, los actores desvinculados tienen más sentido con un nombre. Deberías nombrar a tus actores separados para poder acceder a ellos, incluso después de que el manejador del actor esté fuera del ámbito. El propio actor separado puede ser propietario de otras tareas y objetos.
Además, puedes eliminar manualmente actores de desde dentro de un actor, utilizando ray.actor.exit_actor
, o utilizando el manejador de un actor ray.kill(account_actor)
. Esto puede ser útil si sabes que ya no necesitas determinados actores y quieres recuperar los recursos.
Como se muestra aquí, crear un actor Rayo básico y gestionar su ciclo de vida es bastante fácil, pero ¿qué ocurre si el nodo Rayo en el que se está ejecutando el actor se cae por algún motivo?1 La anotación @ray.remote
te permite especificar dos parámetros que controlan el comportamiento en este caso:
max_restarts
-
Especifica el número máximo de veces que debe reiniciarse el actor cuando muere inesperadamente. El valor mínimo válido es
0
(por defecto), que indica que el actor no necesita reiniciarse. Un valor de-1
indica que un actor debe reiniciarse indefinidamente. max_task_retries
-
Especifica el número de veces que se debe reintentar la tarea de un actor si la tarea falla debido a un error del sistema. Si se establece en
-1
, el sistema reintentará la tarea fallida hasta que la tarea tenga éxito, o el actor haya alcanzado su límite demax_restarts
. Si se establece enn > 0
, el sistema reintentará la tarea fallida hasta n veces, después de lo cual la tarea lanzará una excepciónRayActorError
sobreray.get
.
Como se explica con más detalle en el capítulo siguiente y en la documentación sobre tolerancia a fallos de Ray, cuando se reinicia un actor, Ray recreará su estado volviendo a ejecutar su constructor. Por lo tanto, si se cambió un estado durante la ejecución del actor, se perderá. Para preservar dicho estado, un actor tiene que implementar su persistencia personalizada.
En nuestro caso de ejemplo, el estado del actor se pierde en caso de fallo, ya que no hemos utilizado la persistencia de actor. Esto puede estar bien para algunos casos de uso, pero no es aceptable para otros; consulta también la documentación de Ray sobre patrones de diseño. En la siguiente sección, aprenderás a implementar mediante programación la persistencia personalizada del actor .
Implementación de la Persistencia del Actor
En esta implementación de, el estado se guarda como un todo, lo que funciona suficientemente bien si el tamaño del estado es relativamente pequeño y los cambios de estado son relativamente raros. Además, para simplificar nuestro ejemplo, utilizamos la persistencia en disco local. En realidad, para un caso de Rayo distribuido, deberías considerar el uso de un sistema de archivos de red (NFS), Amazon Simple Storage Service (S3) o una base de datos para permitir el acceso a los datos del actor desde cualquier nodo del clúster de Rayo.
En el Ejemplo 4-7 se presenta un actor persistente Account
.2
Ejemplo 4-7. Definir un actor persistente, utilizando la persistencia del sistema de archivos
@ray
.
remote
class
Account
:
def
__init__
(
self
,
balance
:
float
,
minimal_balance
:
float
,
account_key
:
str
,
basedir
:
str
=
'.'
):
self
.
basedir
=
basedir
self
.
key
=
account_key
if
not
self
.
restorestate
():
if
balance
<
minimal_balance
:
raise
Exception
(
"Starting balance is less than minimal balance"
)
self
.
balance
=
balance
self
.
minimal
=
minimal_balance
self
.
storestate
()
def
balance
(
self
)
->
float
:
return
self
.
balance
def
deposit
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot deposit negative amount"
)
self
.
balance
=
self
.
balance
+
amount
self
.
storestate
()
return
self
.
balance
def
withdraw
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot withdraw negative amount"
)
balance
=
self
.
balance
-
amount
if
balance
<
self
.
minimal
:
raise
Exception
(
"Withdrawal is not supported by current balance"
)
self
.
balance
=
balance
self
.
storestate
()
return
balance
def
restorestate
(
self
)
->
bool
:
if
exists
(
self
.
basedir
+
'/'
+
self
.
key
):
with
open
(
self
.
basedir
+
'/'
+
self
.
key
,
"rb"
)
as
f
:
bytes
=
f
.
read
()
state
=
ray
.
cloudpickle
.
loads
(
bytes
)
self
.
balance
=
state
[
'balance'
]
self
.
minimal
=
state
[
'minimal'
]
return
True
else
:
return
False
def
storestate
(
self
):
bytes
=
ray
.
cloudpickle
.
dumps
(
{
'balance'
:
self
.
balance
,
'minimal'
:
self
.
minimal
})
with
open
(
self
.
basedir
+
'/'
+
self
.
key
,
"wb"
)
as
f
:
f
.
write
(
bytes
)
Si comparamos esta implementación con la original del Ejemplo 4-1, notaremos varios cambios importantes:
-
Aquí el constructor tiene dos parámetros adicionales:
account_key
ybasedir
. La clave de la cuenta es un identificador único para la cuenta que también se utiliza como nombre del archivo de persistencia. El parámetrobasedir
indica un directorio base utilizado para almacenar los archivos de persistencia. Cuando se invoca al constructor, primero comprobamos si hay guardado un estado persistente para esta cuenta y, si lo hay, ignoramos el saldo y el saldo mínimo pasados y los restauramos a partir del estado persistente. -
Se añaden dos métodos adicionales a la clase:
store_state
yrestore_state
. Elstore_states
es un método que almacena el estado de un actor en un archivo. La información de estado se representa como un diccionario con claves como nombres de los elementos de estado y valores como los elementos de estado, valores. Estamos utilizando la implementación de Ray del decapado en la nube para convertir este diccionario en la cadena de bytes y luego escribir esta cadena de bytes en el archivo, definido por la clave de la cuenta y el directorio base.(El Capítulo 5 proporciona una discusión detallada del decapado en la nube.) El métodorestore_states
restaura el estado desde un archivo definido por una clave de cuenta y un directorio base. El método lee una cadena binaria del archivo y utiliza la implementación de Ray del decapado en la nube para convertirla al diccionario. Luego utiliza el contenido del diccionario para rellenar el estado. -
Por último, los métodos
deposit
ywithdraw
, que están cambiando el estado, utilizan el métodostore_state
para actualizar la persistencia.
La implementación mostrada en el Ejemplo 4-7 funciona bien, pero nuestra implementación de actor de cuenta contiene ahora demasiado código específico de persistencia y está estrechamente acoplada a la persistencia de archivos. Una solución mejor es separar el código específico de la persistencia en una clase aparte.
Empezaremos creando una clase abstracta que defina los métodos que debe implementar cualquier clase de persistencia(Ejemplo 4-8).
Ejemplo 4-8. Definir una clase base de persistencia
class
BasePersitence
:
def
exists
(
self
,
key
:
str
)
->
bool
:
pass
def
save
(
self
,
key
:
str
,
data
:
dict
):
pass
def
restore
(
self
,
key
:
str
)
->
dict
:
pass
Esta clase define todos los métodos que debe implementar una implementación de persistencia concreta. Con esto, se puede definir una clase de persistencia de archivos que implemente la persistencia base, como se muestra en el Ejemplo 4-9.
Ejemplo 4-9. Definir una clase de persistencia de archivos
class
FilePersistence
(
BasePersitence
):
def
__init__
(
self
,
basedir
:
str
=
'.'
):
self
.
basedir
=
basedir
def
exists
(
self
,
key
:
str
)
->
bool
:
return
exists
(
self
.
basedir
+
'/'
+
key
)
def
save
(
self
,
key
:
str
,
data
:
dict
):
bytes
=
ray
.
cloudpickle
.
dumps
(
data
)
with
open
(
self
.
basedir
+
'/'
+
key
,
"wb"
)
as
f
:
f
.
write
(
bytes
)
def
restore
(
self
,
key
:
str
)
->
dict
:
if
not
self
.
exists
(
key
):
return
None
else
:
with
open
(
self
.
basedir
+
'/'
+
key
,
"rb"
)
as
f
:
bytes
=
f
.
read
()
return
ray
.
cloudpickle
.
loads
(
bytes
)
Esta implementación elimina la mayor parte del código específico de persistencia de nuestra implementación original del Ejemplo 4-7. Ahora es posible simplificar y generalizar la implementación de una cuenta; véase el Ejemplo 4-10.
Ejemplo 4-10. Implementar un actor persistente con persistencia conectable
@ray
.
remote
class
Account
:
def
__init__
(
self
,
balance
:
float
,
minimal_balance
:
float
,
account_key
:
str
,
persistence
:
BasePersitence
):
self
.
persistence
=
persistence
self
.
key
=
account_key
if
not
self
.
restorestate
():
if
balance
<
minimal_balance
:
raise
Exception
(
"Starting balance is less than minimal balance"
)
self
.
balance
=
balance
self
.
minimal
=
minimal_balance
self
.
storestate
()
def
balance
(
self
)
->
float
:
return
self
.
balance
def
deposit
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot deposit negative amount"
)
self
.
balance
=
self
.
balance
+
amount
self
.
storestate
()
return
self
.
balance
def
withdraw
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot withdraw negative amount"
)
balance
=
self
.
balance
-
amount
if
balance
<
self
.
minimal
:
raise
Exception
(
"Withdrawal is not supported by current balance"
)
self
.
balance
=
balance
self
.
storestate
()
return
balance
def
restorestate
(
self
)
->
bool
:
state
=
self
.
persistence
.
restore
(
self
.
key
)
if
state
!=
None
:
self
.
balance
=
state
[
'balance'
]
self
.
minimal
=
state
[
'minimal'
]
return
True
else
:
return
False
def
storestate
(
self
):
self
.
persistence
.
save
(
self
.
key
,
{
'balance'
:
self
.
balance
,
'minimal'
:
self
.
minimal
})
Aquí sólo se muestran los cambios en el código respecto a nuestra implementación original del actor persistente(Ejemplo 4-7). Observa que el constructor toma ahora la clase BasePersistence
, lo que permite cambiar fácilmente la implementación de la persistencia sin cambiar el código del actor. Además, los métodos restore_state
y savestate
se generalizan para trasladar todo el código específico de la persistencia a la clase de persistencia.
Esta implementación es lo suficientemente flexible como para admitir diferentes implementaciones de persistencia, pero si una implementación de persistencia requiere conexiones permanentes a una fuente de persistencia (por ejemplo, una conexión a una base de datos), puede volverse inescalable al mantener simultáneamente demasiadas conexiones. En este caso, podemos implementar la persistencia como un actor adicional. Pero esto requiere escalar este actor. Echemos un vistazo a las opciones que ofrece Ray para escalar actores en .
Escalar actores remotos de rayos
El modelo de actor original de descrito anteriormente en este capítulo suele suponer que los actores son ligeros (por ejemplo, contienen una única pieza de estado) y no requieren escalado o paralelización. En Ray y sistemas similares (incluido Akka), los actores se utilizan a menudo para implementaciones de grano más grueso y pueden requerir escalado.3
Al igual que con las funciones remotas de Ray, puedes escalar actores tanto horizontalmente (a través de procesos/máquinas) con pools, como verticalmente (con más recursos). En "Recursos / Escalado vertical" se explica cómo solicitar más recursos, pero por ahora vamos a centrarnos en el escalado horizontal.
Puedes añadir más procesos para escalar horizontalmente con el pool de actores de Ray, proporcionado por el módulo ray.util
. Esta clase es similar a un pool multiproceso y te permite programar tus tareas sobre un pool fijo de actores.
La reserva de actores utiliza efectivamente un conjunto fijo de actores como una entidad única y gestiona qué actor de la reserva recibe la siguiente solicitud. Ten en cuenta que los actores de la reserva siguen siendo actores individuales y su estado no se fusiona. Por tanto, esta opción de escalado sólo funciona cuando el estado de un actor se crea en el constructor y no cambia durante la ejecución del actor.
Veamos cómo utilizar un pool de actores para mejorar la escalabilidad de nuestra clase cuenta añadiendo un pool de actores en el Ejemplo 4-11.
Ejemplo 4-11. Utilizar un pool de actores para implementar la persistencia
pool
=
ActorPool
([
FilePersistence
.
remote
(),
FilePersistence
.
remote
(),
FilePersistence
.
remote
()])
@ray
.
remote
class
Account
:
def
__init__
(
self
,
balance
:
float
,
minimal_balance
:
float
,
account_key
:
str
,
persistence
:
ActorPool
):
self
.
persistence
=
persistence
self
.
key
=
account_key
if
not
self
.
restorestate
():
if
balance
<
minimal_balance
:
raise
Exception
(
"Starting balance is less than minimal balance"
)
self
.
balance
=
balance
self
.
minimal
=
minimal_balance
self
.
storestate
()
def
balance
(
self
)
->
float
:
return
self
.
balance
def
deposit
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot deposit negative amount"
)
self
.
balance
=
self
.
balance
+
amount
self
.
storestate
()
return
self
.
balance
def
withdraw
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot withdraw negative amount"
)
balance
=
self
.
balance
-
amount
if
balance
<
self
.
minimal
:
raise
Exception
(
"Withdrawal is not supported by current balance"
)
self
.
balance
=
balance
self
.
storestate
()
return
balance
def
restorestate
(
self
)
->
bool
:
while
(
self
.
persistence
.
has_next
()):
self
.
persistence
.
get_next
()
self
.
persistence
.
submit
(
lambda
a
,
v
:
a
.
restore
.
remote
(
v
),
self
.
key
)
state
=
self
.
persistence
.
get_next
()
if
state
!=
None
:
(
f
'Restoring state
{
state
}
'
)
self
.
balance
=
state
[
'balance'
]
self
.
minimal
=
state
[
'minimal'
]
return
True
else
:
return
False
def
storestate
(
self
):
self
.
persistence
.
submit
(
lambda
a
,
v
:
a
.
save
.
remote
(
v
),
(
self
.
key
,
{
'balance'
:
self
.
balance
,
'minimal'
:
self
.
minimal
}))
account_actor
=
Account
.
options
(
name
=
'Account'
)
.
remote
(
balance
=
100.
,
minimal_balance
=
20.
,
account_key
=
'1234567'
,
persistence
=
pool
)
Aquí sólo se muestran los cambios en el código respecto a nuestra implementación original. El código comienza creando un pool de tres actores de persistencia de archivos idénticos, y luego este pool se pasa a una implementación de cuenta.
La sintaxis de una ejecución basada en un pool es una función lambda que toma dos parámetros: una referencia a un actor y un valor que debe someterse a la función. La limitación aquí es que el valor es un único objeto. Una de las soluciones para las funciones con varios parámetros es utilizar una tupla que puede contener un número arbitrario de componentes. La función en sí se define como una función remota sobre el método del actor requerido.
La ejecución en el pool es asíncrona (dirige internamente las peticiones a uno de los actores remotos). Esto permite una ejecución más rápida del método store_state
, que no necesita los resultados del almacenamiento de datos. En este caso, la implementación no espera a que se complete el almacenamiento del estado del resultado, sino que simplemente inicia la ejecución. El método restore_state
, en cambio, necesita el resultado de la invocación al pool para proceder. Una implementación de pool gestiona internamente el proceso de esperar a que los resultados de la ejecución estén listos y expone esta funcionalidad a través de la función get_next
(ten en cuenta que se trata de una llamada bloqueante). La implementación del pool gestiona una cola de resultados de ejecución (en el mismo orden que las peticiones). Por lo tanto, siempre que necesitemos obtener un resultado del pool, primero debemos vaciar la cola de resultados del pool para asegurarnos de que obtenemos el resultado correcto.
Además del escalado basado en el multiprocesamiento proporcionado por el conjunto de actores, Ray admite el escalado de la ejecución del actor mediante la concurrencia. Ray ofrece dos tipos de concurrencia dentro de un actor: hilos y ejecución asíncrona.
Cuando utilices la concurrencia dentro de los actores, ten en cuenta que el bloqueo global del intérprete (GIL) de Python sólo permitirá que se ejecute un hilo de código Python a la vez. Python puro no proporcionará verdadero paralelismo. Por otro lado, si invocas código NumPy, Cython, TensorFlow o PyTorch, estas bibliotecas liberarán el GIL al llamar a funciones C/C++. Al solapar el tiempo de espera de E/S o de trabajo en bibliotecas nativas, tanto la ejecución de hilos como la ejecución asíncrona de actores pueden conseguir cierto paralelismo.
Se puede pensar en la biblioteca asyncio como una multitarea cooperativa: tu código o biblioteca necesita señalar explícitamente que está esperando un resultado, y Python puede seguir adelante y ejecutar otra tarea cambiando explícitamente el contexto de ejecución. asyncio funciona teniendo un único proceso ejecutándose a través de un bucle de eventos y cambiando qué tarea está ejecutando cuando una tarea cede/espera. asyncio tiende a tener una sobrecarga menor que la ejecución multihilo y puede ser un poco más fácil de razonar. Los actores Ray, pero no las funciones remotas, se integran con asyncio, permitiéndote escribir métodos de actor asíncronos.
Debes utilizar la ejecución por hilos cuando tu código pase mucho tiempo bloqueándose pero sin ceder el control llamando a await
. Los hilos son gestionados por el sistema operativo, que decide cuándo ejecutar cada hilo. Utilizar la ejecución roscada puede implicar menos cambios en el código, ya que no necesitas indicar explícitamente dónde cede el control tu código. Esto también puede hacer que la ejecución por hilos sea más difícil de razonar.
Debes tener cuidado y utilizar selectivamente los bloqueos cuando accedas o modifiques objetos tanto con hilos como con asyncio. En ambos enfoques, tus objetos comparten la misma memoria. Al utilizar bloqueos, te aseguras de que sólo un hilo o tarea pueda acceder a la memoria específica. Los bloqueos tienen cierta sobrecarga (que aumenta a medida que más procesos o hilos esperan un bloqueo). Como resultado, la concurrencia de un actor es aplicable sobre todo para casos de uso en los que un estado se rellena en un constructor y nunca cambia.
Para crear un actor que utilice asyncio, necesitas definir al menos un método async. En este caso, Ray creará un bucle de eventos asyncio para ejecutar los métodos del actor. Enviar tareas a estos actores es lo mismo, desde la perspectiva del que llama, que enviar tareas a un actor normal. La única diferencia es que cuando la tarea se ejecuta en el actor, se envía a un bucle de eventos asyncio que se ejecuta en un subproceso o grupo de subprocesos en segundo plano, en lugar de ejecutarse directamente en el subproceso principal. (Ten en cuenta que no está permitido utilizar llamadas ray.get
o ray.wait
bloqueantes dentro de un método asíncrono de actor, porque bloquearán la ejecución del bucle de eventos).
El Ejemplo 4-12 presenta un ejemplo de actor asíncrono sencillo.
Ejemplo 4-12. Creación de un actor asíncrono simple
@ray
.
remote
class
AsyncActor
:
async
def
computation
(
self
,
num
):
(
f
'Actor waiting for
{
num
}
sec'
)
for
x
in
range
(
num
):
await
asyncio
.
sleep
(
1
)
(
f
'Actor slept for
{
x
+
1
}
sec'
)
return
num
Como el método computation
está definido como async
, Ray creará un actor asíncrono. Ten en cuenta que, a diferencia de los métodos async
ordinarios, que requieren await
para invocarlos, el uso de actores asíncronos en Ray no requiere ninguna semántica de invocación especial. Además, Ray te permite especificar la concurrencia máxima para la ejecución del actor asíncrono durante la creación del actor:
actor
=
AsyncActor
.
options
(
max_concurrency
=
5
)
.
remote
()
Para crear un actor roscado, debes especificar max_concurrency
durante la creación del actor(Ejemplo 4-13).
Ejemplo 4-13. Crear un actor roscado simple
@ray
.
remote
class
ThreadedActor
:
def
computation
(
self
,
num
):
(
f
'Actor waiting for \
{
num
}
sec'
)
for
x
in
range
(
num
):
sleep
(
1
)
(
f
'Actor slept for \
{
x
+
1
}
sec'
)
return
num
actor
=
ThreadedActor
.
options
(
max_concurrency
=
3
)
.
remote
()
Consejo
Como tanto los actores asíncronos como los roscados utilizan max_concurrency
, el tipo de actor creado puede resultar un poco confuso. Lo que hay que recordar es que si se utiliza max_concurrency
, el actor puede ser asíncrono o roscado. Si al menos uno de los métodos del actor es asíncrono, el actor es asíncrono; en caso contrario, es un roscado.
Entonces, ¿qué enfoque de escalado deberíamos utilizar para nuestra implementación? "Multiprocessing vs. Threading vs. AsyncIO in Python", de Lei Mao, ofrece un buen resumen de las características de los distintos enfoques de (Tabla 4-1).
Enfoque a escala | Función | Criterios de uso |
---|---|---|
Pool de actores |
Múltiples procesos, alta utilización de la CPU |
Limitado por la CPU |
Actor asíncrono |
Proceso único, hilo único, multitarea cooperativa, las tareas deciden cooperativamente sobre la conmutación |
Límite de E/S lenta |
Actor roscado |
Proceso único, múltiples hilos, multitarea preferente, el SO decide sobre el cambio de tareas |
Bibliotecas de E/S rápida vinculadas y no asíncronas que no controlas |
Buenas prácticas de los actores remotos del rayo
Dado que los actores remotos de Ray son efectivamente funciones remotas, todas las buenas prácticas remotas de Ray descritas en el capítulo anterior son aplicables. Además, Ray tiene algunas buenas prácticas específicas para los actores.
Como ya se ha mencionado, Ray ofrece soporte para la tolerancia a fallos de los actores. Específicamente para los actores, puedes especificar max_restarts
para activar automáticamente el reinicio de los actores de Ray. Cuando tu actor o el nodo que aloja ese actor se bloquee, el actor se reconstruirá automáticamente. Sin embargo, esto no te proporciona formas de restaurar estados a nivel de aplicación en tu actor. Considera los enfoques de persistencia del actor, descritos en este capítulo para garantizar también la restauración de los estados a nivel de ejecución.
Si tus aplicaciones tienen variables globales que debes modificar, no las cambies en funciones remotas. En su lugar, utiliza actores para encapsularlas y acceder a ellas a través de los métodos del actor. Esto se debe a que las funciones remotas se ejecutan en procesos diferentes y no comparten el mismo espacio de direcciones. Como resultado, estos cambios no se reflejan en el controlador Ray ni en las funciones remotas.
Uno de los casos de uso habituales en las aplicaciones es la ejecución de la misma función remota muchas veces para distintos conjuntos de datos. Utilizar las funciones remotas directamente puede causar retrasos debido a la creación de nuevos procesos para la función. Este enfoque también puede saturar el clúster Ray con un gran número de procesos. Una opción más controlada es utilizar el pool de actores. En este caso, un pool proporciona un conjunto controlado de trabajadores que están fácilmente disponibles (sin retraso en la creación de procesos) para su ejecución. Como el pool mantiene su cola de peticiones, el modelo de programación de esta opción es idéntico al de iniciar funciones remotas independientes, pero proporciona un entorno de ejecución mejor controlado.
Conclusión
En este capítulo has aprendido a utilizar los actores remotos de Ray para implementar la ejecución con estado en Ray. Has aprendido sobre el modelo de actor y cómo implementar actores remotos Ray. Ten en cuenta que Ray se basa internamente en gran medida en el uso de actores; por ejemplo, para la sincronización multinodo, el streaming (ver Capítulo 6) y la implementación de microservicios (ver Capítulo 7). También se utiliza ampliamente para implementaciones ML; véase, por ejemplo, el uso de actores para implementar un servidor de parámetros.
También aprendiste a mejorar la fiabilidad de un actor implementando la persistencia de un actor y viste un ejemplo sencillo de implementación de la persistencia.
Por último, has aprendido sobre las opciones que ofrece Ray para escalar actores, su implementación y las ventajas y desventajas.
En el próximo capítulo, hablaremos de otros detalles del diseño del Rayo.
1 Las excepciones de Python no se consideran errores del sistema y no provocarán reinicios. En su lugar, la excepción se guardará como resultado de la llamada, y el actor seguirá ejecutándose normalmente.
2 En esta implementación, estamos utilizando la persistencia del sistema de archivos, pero puedes utilizar el mismo enfoque con otros tipos de persistencia, como S3 o bases de datos.
3 Un actor de granularidad gruesa es un único actor que puede contener múltiples piezas de estado. En cambio, en un enfoque de grano fino, cada pieza de estado se representaría como un actor independiente. Esto es similar al concepto de bloqueo de grano grueso.
Get Escalando Python con Ray now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.