Capítulo 4. Aplicación de la programación reactiva a aplicaciones existentes
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
Introducir una nueva biblioteca, tecnología o paradigma en una aplicación, ya sea greenfield o una base de código heredada, debe ser una decisión cuidadosa. RxJava no es una excepción. En este capítulo, revisaremos algunos patrones y arquitecturas que se encuentran en las aplicaciones Java corrientes y veremos cómo puede ayudar Rx. Este proceso no es sencillo y requiere un importante cambio de mentalidad, por lo que nos transformaremos cuidadosamente del estilo imperativo al funcional y reactivo. Muchas bibliotecas de los proyectos Java actuales simplemente añaden hinchazón sin aportar nada a cambio. Sin embargo, verás cómo RxJava no sólo simplifica los proyectos tradicionales, sino qué tipo de ventajas aporta a las plataformas heredadas.
Estoy bastante seguro de que ya estás muy entusiasmado con RxJava. Los operadores incorporados y su simplicidad hacen de Rx una herramienta asombrosamente potente para transformar flujos de eventos. Sin embargo, si mañana vuelves a tu oficina, te darás cuenta de que no hay flujos, ni eventos en tiempo real de la bolsa de valores. Apenas puedes encontrar eventos en tus aplicaciones; es sólo un amasijo de peticiones web, bases de datos y API externas. Estás tan ansioso por probar esta nueva cosa RxJava en algún lugar más allá de Hello world. Sin embargo, parece que simplemente no hay casos de uso en la vida real que justifiquen el uso de Rx. Aun así, RxJava puede ser un importante paso adelante en términos de coherencia arquitectónica y robustez. No necesitas comprometerte con el estilo reactivo de arriba abajo; esto es demasiado arriesgado y requiere demasiado trabajo al principio. Pero Rx puede introducirse en cualquier capa, sin romper una aplicación en su conjunto.
Te llevaremos a través de algunos patrones de aplicación comunes y las formas en que puedes mejorarlos con RxJava de forma no invasiva, centrándonos en la consulta de bases de datos, el almacenamiento en caché, la gestión de errores y las tareas periódicas. Cuanto más RxJava añadas en diversos lugares de tu pila, más coherente será tu arquitectura.
De las colecciones a los observables
A menos que tu plataforma se haya construido recientemente en frameworks de JVM como Play, Akka actors, o quizás Vert.x, probablemente estés en una pila con un contenedor de servlets por un lado, y JDBC o servicios web por otro. Entre ambos, hay un número variable de capas que implementan la lógica de negocio, que no refactorizaremos todas a la vez; en su lugar, empecemos con un ejemplo sencillo. La siguiente clase representa un repositorio trivial que nos abstrae de una base de datos:
class
PersonDao
{
List
<
Person
>
listPeople
()
{
return
query
(
"SELECT * FROM PEOPLE"
);
}
private
List
<
Person
>
query
(
String
sql
)
{
//...
}
}
Dejando a un lado los detalles de implementación, ¿cómo se relaciona esto con Rx? Hasta ahora hemos estado hablando de eventos asíncronos empujados desde sistemas ascendentes o, en el mejor de los casos, cuando alguien se suscribe. ¿Qué relevancia tiene aquí el mundano Dao
?Observable
no es sólo una tubería que empuja eventos aguas abajo. Puedes tratar Observable<T>
como una estructura de datos, dual a Iterable<T>
. Ambas contienen elementos de tipo T
, pero proporcionan una interfaz radicalmente distinta. Por tanto, no debería sorprenderte que puedas sustituir simplemente una por otra:
Observable
<
Person
>
listPeople
()
{
final
List
<
Person
>
people
=
query
(
"SELECT * FROM PEOPLE"
);
return
Observable
.
from
(
people
);
}
En este punto, hemos realizado un cambio de ruptura en la API existente. Dependiendo de lo grande que sea tu sistema, esa incompatibilidad podría ser un problema importante. Por eso, es importante que incorpores RxJava a tu API lo antes posible. Obviamente, estamos trabajando con una aplicación existente, así que no puede ser el caso.
BloqueoObservable: Salir del mundo reactivo
Si estás combinando RxJava con código existente, bloqueante e imperativo, puede que tengas que traducir Observable
a una colección simple. Esta transformación es bastante desagradable, ya que requiere bloquearse en Observable
a la espera de que se complete. Hasta que Observable
no se complete, no podremos crear una colección.BlockingObservable
es un tipo especial que facilita el trabajo con Observable
en un entorno no reactivo.BlockingObservable
debería ser tu última opción cuando trabajes con RxJava, pero es inevitable cuando se combina código bloqueante y no bloqueante.
En el capítulo 3, refactorizamos el método listPeople()
para que devolviera Observable<People>
en lugar de List
.Observable
no es un Iterable
en ningún sentido, por lo que nuestro código ya no compila. Queremos dar pasos de bebé en lugar de una refactorización masiva, así que mantengamos el alcance de los cambios lo más reducido posible. El código del cliente podría tener este aspecto:
List
<
Person
>
people
=
pesonDao
.
listPeople
();
String
json
=
marshal
(
people
);
Podemos imaginar el método marshal()
extrayendo datos de la colección people
y serializándolos a JSON. Eso ya no es así, no podemos simplemente tirar de elementos de Observable
cuando queramos.Observable
se encarga de producir(empujar) elementos y notificar a los suscriptores si los hay. Este cambio radical puede sortearse fácilmente con BlockingObservable
.
Esta práctica clase es totalmente independiente de Observable
y puede obtenerse mediante el método Observable.toBlocking()
. La variante bloqueante de Observable
tiene métodos superficialmente similares, como single()
o subscribe()
. Sin embargo, BlockingObservable
es mucho más conveniente en entornos bloqueantes que no están preparados intrínsecamente para la naturaleza asíncrona de Observable
.
Los operadores de BlockingObservable
suelen bloquearse (esperar) hasta que se completa el Observable
subyacente. Esto contradice fuertemente el concepto principal de Observable
s de que todo es probablemente asíncrono, perezoso y se procesa sobre la marcha. Por ejemplo, Observable.forEach()
recibirá asíncronamente los eventos de Observable
a medida que lleguen, mientras que BlockingObservable.forEach()
se bloqueará hasta que se procesen todos los eventos y se complete el flujo. Además, las excepciones ya no se propagan como valores (eventos), sino que se vuelven a lanzar en el hilo de llamada.
En nuestro caso, queremos volver a transformar Observable<Person>
en List<Person>
para limitar el alcance de la refactorización:
Observable
<
Person
>
peopleStream
=
personDao
.
listPeople
();
Observable
<
List
<
Person
>>
peopleList
=
peopleStream
.
toList
();
BlockingObservable
<
List
<
Person
>>
peopleBlocking
=
peopleList
.
toBlocking
();
List
<
Person
>
people
=
peopleBlocking
.
single
();
He dejado intencionadamente explícitos todos los tipos intermedios para explicar lo que ocurre. Tras la refactorización a Rx, nuestra API devuelveObservable<Person> peopleStream
. Este flujo puede ser potencialmente totalmente reactivo, asíncrono y dirigido por eventos, lo que no se ajusta en absoluto a lo que necesitamos: un List
estático. Como primer paso, convertimos Observable<Person>
en Observable<List<Person>>
. Este operador perezoso almacenará en búfer todos los eventosPerson
y los mantendrá en memoria hasta que se reciba el evento onCompleted()
. En ese momento, se emitirá un único evento de tipo List<Person>
, que contendrá todos los eventos vistos a la vez, como se ilustra en el siguiente diagrama de mármol:
El flujo resultante se completa inmediatamente después de emitir un único elemento List
. De nuevo, este operador es asíncrono; no espera a que lleguen todos los eventos, sino que almacena perezosamente todos los valores. El incómodo aspecto de Observable<List<Person>> peopleList
se convierte entonces enBlockingObservable<List<Person>> peopleBlocking
. BlockingObservable
es una buena idea sólo cuando debes proporcionar una vista estática y bloqueante de tu, por lo demás, asíncrono Observable
. Mientras que Observable.from(List<T>)
convierte la colección normal basada en pull en Observable
, toBlocking()
hace algo totalmente opuesto. Quizá te preguntes por qué necesitamos dos abstracciones para operadores bloqueantes y no bloqueantes. Los autores de RxJava se dieron cuenta de que ser explícito sobre la naturaleza síncrona o asíncrona del operador subyacente es demasiado crucial como para dejarlo para JavaDoc. Tener dos tipos no relacionados garantiza que siempre trabajes con la estructura de datos adecuada. Además, BlockingObservable
es tu arma de último recurso; normalmente, deberías componer y encadenar Observable
s sencillos el mayor tiempo posible. Sin embargo, a efectos de este ejercicio, vamos a escapar de Observable
de inmediato. El último operador single()
abandona por completo los observables y extrae un elemento, y sólo uno, que esperamos recibir deBlockingObservable<T>
. Un operador similar, first()
, devolverá un valor de T
y descartará lo que le quede. single()
, por otro lado, se asegura de que no haya más eventos pendientes en el Observable
subyacente antes de terminar. Esto significa que single()
se bloqueará esperando la devolución de llamada de onCompleted()
. Aquí tienes el mismo fragmento de código que antes, esta vez con todos los operadores encadenados:
List
<
Person
>
people
=
personDao
.
listPeople
()
.
toList
()
.
toBlocking
()
.
single
();
Puede que pienses que nos hemos tomado la molestia de envolver y desenvolver Observable
sin ninguna razón aparente. Recuerda que éste ha sido sólo el primer paso. La siguiente transformación introducirá algo de pereza. Nuestro código, tal y como está ahora, siempre ejecuta query("...")
y lo envuelve con Observable
.
Como ya sabrás a estas alturas, los Observable
(especialmente los fríos) son perezosos por definición. Mientras nadie se suscriba, sólo representan un flujo que nunca tuvo la oportunidad de empezar a emitir valores. La mayoría de las veces puedes llamar a métodos que devuelvan Observable
y, mientras no te suscribas, no se hará ningún trabajo.
Observable
es como un Future
porque promete que un valor aparecerá en el futuro. Pero mientras no lo solicites, un Observable
frío ni siquiera empezará a emitir. Desde esa perspectiva, Observable
es más parecido a java.util.function.Supplier<T>
, generando valores del tipo T
bajo demanda.
Los Observable
en caliente son diferentes porque emiten valores tanto si estás escuchando como si no, pero no los estamos considerando ahora. La mera existencia de Observable
no indica un trabajo en segundo plano ni ningún efecto secundario, a diferencia de Future
, que casi siempre sugiere alguna operación ejecutándose simultáneamente.
Abrazar la pereza
Entonces, ¿cómo hacemos que nuestro Observable
sea perezoso? La técnica más sencilla es envolver un Observable
ansioso con defer()
:
public
Observable
<
Person
>
listPeople
()
{
return
Observable
.
defer
(()
->
Observable
.
from
(
query
(
"SELECT * FROM PEOPLE"
)));
}
Observable.defer()
toma una expresión lambda (una fábrica) que puede producir Observable
. La subyacente Observable
es ansiosa, por lo que queremos posponer su creación. defer()
esperará hasta el último momento posible para crear realmente Observable
; es decir, hasta que alguien se suscriba realmente a ella. Esto tiene algunas implicaciones interesantes. ComoObservable
es perezoso, llamar a listPeople()
no tiene efectos secundarios y casi no afecta al rendimiento. Todavía no se consulta ninguna base de datos. Puedes tratar Observable<Person>
como una promesa, pero sin que se produzca todavía ningún procesamiento en segundo plano. Observa que no hay comportamiento asíncrono en este momento, sólo evaluación perezosa. Esto es similar a cómo los valores en el lenguaje de programación Haskell se evalúan perezosamente sólo cuando es absolutamente necesario.
Si nunca has programado en lenguajes funcionales, puede que estés bastante confuso sobre por qué la pereza es tan importante y rompedora. Resulta que ese comportamiento es bastante útil y puede mejorar bastante la calidad y la libertad de tu implementación. Por ejemplo, ya no tienes que prestar atención a qué recursos se obtienen, cuándo y en qué orden. RxJava los cargará sólo cuando sean absolutamente necesarios.
Como ejemplo, tomemos este trivial mecanismo de retroceso que todos hemos visto tantas veces:
void
bestBookFor
(
Person
person
)
{
Book
book
;
try
{
book
=
recommend
(
person
);
}
catch
(
Exception
e
)
{
book
=
bestSeller
();
}
display
(
book
.
getTitle
());
}
void
display
(
String
title
)
{
//...
}
Probablemente pienses que no hay nada malo en tal construcción. En este ejemplo, intentamos recomendar el mejor libro para una persona determinada, pero en caso de fallo, nos degradamos con elegancia y mostramos el más vendido. Se supone que obtener un best seller es más rápido y puede almacenarse en caché. Pero, ¿y si pudieras añadir la gestión de errores de forma declarativa, de modo que los bloques try
-catch
no estuvieran ocultando la lógica real?
void
bestBookFor
(
Person
person
)
{
Observable
<
Book
>
recommended
=
recommend
(
person
);
Observable
<
Book
>
bestSeller
=
bestSeller
();
Observable
<
Book
>
book
=
recommended
.
onErrorResumeNext
(
bestSeller
);
Observable
<
String
>
title
=
book
.
map
(
Book:
:
getTitle
);
title
.
subscribe
(
this
::
display
);
}
De momento sólo estamos explorando RxJava, por lo que he dejado todos estos valores y tipos intermedios. En la vida real, bestBookFor()
se parecería más a esto:
void
bestBookFor
(
Person
person
)
{
recommend
(
person
)
.
onErrorResumeNext
(
bestSeller
())
.
map
(
Book:
:
getTitle
)
.
subscribe
(
this
::
display
);
}
Este código es maravillosamente conciso y legible. Primero busca una recomendación para person
. En caso de error (onErrorResumeNext
), sigue con un superventas. Independientemente de cuál haya tenido éxito, map
devuelve un valor extrayendo el título y luego lo muestra. onErrorResumeNext()
es un potente operador que intercepta las excepciones que se producen aguas arriba, se las traga y se suscribe a la copia de seguridad proporcionada Observable
. Así es como Rx implementa una cláusula try
-catch
. Dedicaremos mucho más tiempo a la gestión de errores más adelante en este libro (véase "Sustitución declarativa try-catch"). De momento, fíjate en cómo podemos llamar perezosamente a bestSeller()
sin preocuparnos de que la búsqueda de best seller se produzca incluso cuando una recomendación real haya ido bien.
Composición de observables
SELECT * FROM PEOPLE
no es realmente una consulta SQL de última generación. En primer lugar, no debes obtener todas las columnas a ciegas, pero obtener todas las filas es aún más perjudicial. Nuestra antigua API no es capaz de paginar los resultados, visualizando sólo un subconjunto de una tabla. Podría tener este aspecto, de nuevo en una aplicación empresarial tradicional:
List
<
Person
>
listPeople
(
int
page
)
{
return
query
(
"SELECT * FROM PEOPLE ORDER BY id LIMIT ? OFFSET ?"
,
PAGE_SIZE
,
page
*
PAGE_SIZE
);
}
Esto no es un libro de SQL, así que vamos a dejar a un lado los detalles de implementación. El autor de esta API fue despiadado: no tenemos libertad para elegir ningún rango de registros, sólo podemos operar sobre números de página basados en 0. Sin embargo, en RxJava, por pereza, podemos simular la lectura de toda una base de datos a partir de una página determinada:
import
static
rx
.
Observable
.
defer
;
import
static
rx
.
Observable
.
from
;
Observable
<
Person
>
allPeople
(
int
initialPage
)
{
return
defer
(()
->
from
(
listPeople
(
initialPage
)))
.
concatWith
(
defer
(()
->
allPeople
(
initialPage
+
1
)));
}
Este fragmento de código carga perezosamente la página inicial de registros de la base de datos, por ejemplo 10 elementos. Si nadie se suscribe, ni siquiera se invoca esta primera consulta. Si hay un suscriptor que sólo consume unos pocos elementos iniciales (por ejemplo, allPeople(0).take(3)
), RxJava se dará de baja automáticamente de nuestro flujo y no se ejecutarán más consultas. Entonces, ¿qué ocurre cuando solicitamos, digamos, 11 elementos, pero la primera llamada a listPeople()
sólo devuelve 10? Bueno, RxJava se da cuenta de que el Observable
inicial está agotado pero el consumidor sigue hambriento. Por suerte, ve el operadorconcatWith()
, que básicamente dice: cuando se complete el Observable
de la izquierda, en lugar de propagar la notificación de finalización a los suscriptores, suscríbete al Observable
de la derecha y continúa como si no pasara nada, como se muestra en el siguiente diagrama de mármol:
En otras palabras, concatWith()
puede unir dos Observable
s de modo que cuando el primero finaliza, el segundo toma el relevo. Ena.concatWith(b).subscribe(...)
, el suscriptor recibirá primero todos los eventos de a
, seguidos de todos los eventos de b
. En este caso, el suscriptor recibe primero 10 elementos iniciales, seguidos de otros 10. Sin embargo, fíjate bien, ¡hay una supuesta recursividad infinita en nuestro código!allPeople(initialPage)
llama a allPeople(initialPage + 1)
sin ninguna condición de parada. Esta es una receta para StackOverflowError
en la mayoría de los lenguajes, pero no aquí. De nuevo, llamar aallPeople()
es siempre perezoso, por lo tanto, en el momento en que dejes de escuchar (desuscribirte), esta recursividad habrá terminado. Técnicamente, concatWith()
todavía puede producir StackOverflowError
aquí. Espera a "Respetar la cantidad de datos solicitada", aprenderás cómo tratar la demanda variable de datos entrantes.
La técnica de cargar perezosamente los datos trozo a trozo es bastante útil porque te permite concentrarte en la lógica empresarial, no en la fontanería de bajo nivel. Ya vemos algunas ventajas de aplicar RxJava incluso a pequeña escala. Diseñar una API pensando en Rx no influye en toda la arquitectura, porque siempre podemos recurrir a BlockingObservable
y a las colecciones de Java. Pero es mejor disponer de un amplio abanico de posibilidades que podamos recortar si es necesario.
Paginación perezosa y concatenación
Hay más formas de implementar la paginación perezosa con RxJava. Si lo piensas, la forma más sencilla de cargar datos paginados es cargarlo todo y luego coger lo que necesitemos. Parece una tontería, pero gracias a la paginación perezosa es factible. Primero generamos todos los números de página posibles y luego solicitamos cargar todas y cada una de las páginas individualmente:
Observable
<
List
<
Person
>>
allPages
=
Observable
.
range
(
0
,
Integer
.
MAX_VALUE
)
.
map
(
this
::
listPeople
)
.
takeWhile
(
list
->
!
list
.
isEmpty
());
Si esto no fuera RxJava, el código anterior llevaría una enorme cantidad de tiempo y de memoria, básicamente cargando toda la base de datos en memoria. Pero como Observable
es perezoso, aún no ha aparecido ninguna consulta a la base de datos. Además, si encontramos una página vacía, significa que todas las demás páginas también lo están (hemos llegado al final de la tabla). Por lo tanto, utilizamos takeWhile()
en lugar de filter()
. Para aplanar allPages
a Observable<Person>
podemos utilizar concatMap()
(véase "Preservar el orden utilizando concatMap()"):
Observable
<
Person
>
people
=
allPages
.
concatMap
(
Observable:
:
from
);
concatMap()
requiere una transformación de List<Person>
a Observable<Person>
, ejecutada para cada página. También podemos probar con concatMapIterable()
, que hace lo mismo, pero la transformación debe devolver un Iterable<Person>
para cada valor ascendente (que resulta ser ya Iterable<Person>
):
Observable
<
Person
>
people
=
allPages
.
concatMapIterable
(
page
->
page
);
Independientemente del enfoque que elijas, todas las transformaciones del objeto Person
son perezosas. Siempre que limites el número de registros que quieres procesar (por ejemplo, con people.take(15)
), Observable<Person>
invocará a listPeople()
lo más tarde posible.
Concurrencia imperativa
No suelo ver concurrencia explícita en las aplicaciones empresariales. La mayoría de las veces, una única solicitud es gestionada por un único subproceso. El mismo hilo hace lo siguiente
-
Acepta la conexión TCP/IP
-
Analiza las solicitudes HTTP
-
Llama a un controlador o servlet
-
Bloquea la llamada a la base de datos
-
Resultados de los procesos
-
Codifica la respuesta (por ejemplo, en JSON)
-
Envía bytes sin procesar al cliente
Este modelo por capas afecta a la latencia del usuario cuando el backend realiza varias peticiones independientes, por ejemplo, a la base de datos. Se realizan de forma secuencial, cuando se podrían paralelizar fácilmente. Además, la escalabilidad se ve afectada. Por ejemplo, en Tomcat hay 200 hilos por defecto en los ejecutores que se encargan de gestionar las peticiones. Esto significa que no podemos manejar más de 200 conexiones concurrentes. En caso de una repentina pero breve ráfaga de tráfico, las conexiones entrantes se ponen en cola y el servidor responde con mayor latencia. Sin embargo, esta situación no puede durar eternamente, y Tomcat acabará por empezar a rechazar el tráfico entrante. Dedicaremos gran parte del próximo capítulo (ver "Servidor HTTP no bloqueante con Netty y RxNetty") a cómo solucionar esta deficiencia bastante embarazosa. De momento, quedémonos con la arquitectura tradicional. Ejecutar cada paso de la gestión de solicitudes dentro de un único hilo tiene algunas ventajas, por ejemplo, una mejor localización de la caché y una mínima sobrecarga de sincronización.1 Por desgracia, en las aplicaciones clásicas, como la latencia global es la suma de las latencias de cada capa, un componente que funcione mal puede tener un impacto negativo en la latencia total.2 Además, a veces hay muchos pasos que son independientes entre sí y pueden ejecutarse simultáneamente. Por ejemplo, invocamos varias API externas o ejecutamos varias consultas SQL independientes.
El JDK tiene un soporte bastante bueno para la concurrencia, especialmente desde Java 5 con ExecutorService
y Java 8 con CompletableFuture
. Sin embargo, no se utiliza tan ampliamente como podría. Por ejemplo, veamos el siguiente programa sin ningún tipo de concurrencia:
Flight
lookupFlight
(
String
flightNo
)
{
//...
}
Passenger
findPassenger
(
long
id
)
{
//...
}
Ticket
bookTicket
(
Flight
flight
,
Passenger
passenger
)
{
//...
}
SmtpResponse
sendEmail
(
Ticket
ticket
)
{
//...
}
Y en el lado del cliente:
Flight
flight
=
lookupFlight
(
"LOT 783"
);
Passenger
passenger
=
findPassenger
(
42
);
Ticket
ticket
=
bookTicket
(
flight
,
passenger
);
sendEmail
(
ticket
);
De nuevo, un código de bloqueo bastante típico y clásico, similar al que puedes encontrar en muchas aplicaciones. Pero si te fijas bien desde el punto de vista de la latencia, el fragmento de código anterior tiene cuatro pasos; sin embargo, los dos primeros son independientes entre sí. Sólo el tercer paso (bookTicket()
) necesita los resultados de lookupFlight()
yfindPassenger()
. Existe una oportunidad evidente de aprovechar la concurrencia. Sin embargo, muy pocos desarrolladores seguirán realmente este camino porque requiere incómodos grupos de hilos, Future
s y devoluciones de llamada. ¿Y si la API ya fuera compatible con Rx? Recuerda que puedes simplemente envolver el código bloqueante heredado en Observable
, como hicimos al principio de este capítulo:
Observable
<
Flight
>
rxLookupFlight
(
String
flightNo
)
{
return
Observable
.
defer
(()
->
Observable
.
just
(
lookupFlight
(
flightNo
)));
}
Observable
<
Passenger
>
rxFindPassenger
(
long
id
)
{
return
Observable
.
defer
(()
->
Observable
.
just
(
findPassenger
(
id
)));
}
Semánticamente, los métodos de rx-
hacen exactamente lo mismo y de la misma forma; es decir, son bloqueantes por defecto. Todavía no hemos ganado nada, aparte de una API más verbosa desde la perspectiva del cliente:
Observable
<
Flight
>
flight
=
rxLookupFlight
(
"LOT 783"
);
Observable
<
Passenger
>
passenger
=
rxFindPassenger
(
42
);
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
(
f
,
p
)
->
bookTicket
(
f
,
p
));
ticket
.
subscribe
(
this
::
sendEmail
);
Tanto los programas de bloqueo tradicionales como el que tiene Observable
funcionan exactamente igual. Es más perezoso por defecto, pero el orden de las operaciones es esencialmente el mismo. Primero, creamos Observable<Flight>
, que como ya sabes, no hace nada por defecto. A menos que alguien pida explícitamente un Flight
, este Observable
es sólo un perezoso marcador de posición. Ya aprendimos que ésta es una valiosa propiedad de los Observable
fríos. Lo mismo ocurre con Observable<Passenger>
; tenemos dos marcadores de posición del tipoFlight
y Passenger
, pero aún no se ha realizado ningún efecto secundario. No hay consulta a la base de datos ni llamada al servicio web. Si decidimos detener el procesamiento aquí, no se ha realizado ningún trabajo superfluo.
Para proceder con bookTicket()
, necesitamos instancias concretas tanto de Flight
como de Passenger
. Resulta tentador limitarse a bloquear en estas dos Observable
s utilizando el operador toBlocking()
. Sin embargo, nos gustaría evitar el bloqueo en la medida de lo posible para reducir el consumo de recursos (especialmente de memoria) y permitir una mayor concurrencia. Otra solución poco adecuada es.subscribe()
en las flight
y passenger
Observable
s y esperar de algún modo a que finalicen ambas devoluciones de llamada. Es bastante sencillo cuandoObservable
está bloqueando, pero si las devoluciones de llamada aparecen de forma asíncrona y necesitas sincronizar algún estado global esperando a ambas, esto se convierte rápidamente en una pesadilla. Además, un subscribe()
anidado no es idiomático, y normalmente quieres una única suscripción para un flujo de mensajes (caso de uso). La única razón por la que las devoluciones de llamada funcionan de forma algo decente en JavaScript es porque sólo hay un hilo. La forma idiomática de suscribirse a varios Observable
s al mismo tiempo es zip
y zipWith
. Podrías percibir zip
como una forma de unir dos flujos de datos independientes por pares. Pero mucho más a menudo, zip
se utiliza simplemente para unir dos Observable
s de un solo elemento.ob1.zip(ob2).subscribe(...)
significa esencialmente que se recibe un evento cuando tanto ob1
como ob2
han terminado (emiten un evento por sí solas). Así que siempre que veas zip
, lo más probable es que alguien simplemente esté haciendo un paso de uniónen dos o más Observable
s que tenían caminos de ejecución bifurcados.zip
es una forma de esperar asíncronamente dos o más valores, sin importar cuál aparezca en último lugar.
Así que volvamos a flight.zipWith(passenger, this::bookTicket)
(una sintaxis más corta que utiliza referencia a método en lugar de lambda explícita, como en el ejemplo de código). La razón por la que mantengo toda la información de tipos en lugar de unir expresiones de forma fluida es porque quiero que prestes atención a los tipos de retorno. flight.zipWith(passenger, ...)
no invoca simplemente una llamada de retorno cuando flight
y passenger
han terminado; devuelve un nuevoObservable
que deberías reconocer inmediatamente como un marcador de posición perezoso para datos. Sorprendentemente, en este momento tampoco se ha iniciado ningún cálculo. Simplemente envolvimos unas cuantas estructuras de datos, pero no se desencadenó ningún comportamiento. Mientras nadie se suscriba a Observable<Ticket>
, RxJava no ejecutará ningún código backend. Esto es lo que ocurre finalmente en la última declaración: ticket.subscribe()
solicita explícitamente Ticket
.
¿Dónde suscribirse?
Presta atención a dónde ves subscribe()
en el código del dominio. A menudo, tu lógica empresarial sólo está componiendo Observable
s hasta el final y devolviéndolos a algún tipo de framework o capa de andamiaje. La suscripción real ocurre entre bastidores en un framework web o en algún código de pegamento. No es una mala práctica llamar tú mismo a subscribe()
, pero intenta empujarlo hacia fuera tanto como sea posible.
Para comprender el flujo de ejecución, es útil mirar de abajo arriba. Nos suscribimos a ticket
, por lo que RxJava debe suscribirse de forma transparente tanto a flight
como a passenger
. En este punto ocurre la verdadera lógica. Como ambos Observable
están fríos y aún no hay concurrencia implicada, la primera suscripción a flight
invoca al método de bloqueo lookupFlight()
justo en el hilo de llamada. Cuando lookupFlight()
termina, RxJava puede suscribirse a passenger
. Sin embargo, ya ha recibido una instancia de Flight
de flight
síncrono. rxFindPassenger()
llama a findPassenger()
de forma bloqueante y recibe una instancia de Passenger
. En este momento, los datos vuelven a fluir en sentido descendente. Las instancias de Flight
y Passenger
se combinan utilizando la lambda proporcionada (bookTicket
) y se pasan a ticket.subscribe()
.
Esto parece mucho trabajo teniendo en cuenta que se comporta y funciona esencialmente igual que nuestro código de bloqueo del principio. Pero ahora podemos aplicar la concurrencia de forma declarativa sin cambiar ninguna lógica. Si nuestros métodos de negocio devolvieran Future<Flight>
(oCompletableFuture<Flight>
, en realidad no importa), se habrían tomado dos decisiones por nosotros:
-
La invocación subyacente de
lookupFlight()
ya ha comenzado y no hay lugar para la pereza. No bloqueamos dicho método, pero el trabajo ya ha comenzado. -
No tenemos control alguno sobre la concurrencia, es la implementación del método la que decide si se invoca una tarea
Future
en un grupo de hilos, un nuevo hilo por petición, etc.
RxJava da más control a los usuarios. El hecho de que Observable<Flight>
no se implementara pensando en la concurrencia no significa que no podamos aplicarla más adelante. Los Observable
del mundo real ya suelen ser asíncronos, pero en raras ocasiones hay que añadir concurrencia a un Observable
existente. Los consumidores de nuestra API, no los implementadores, son libres de elegir el mecanismo de subprocesamiento en el caso del Observable
síncrono. Todo esto se consigue utilizando el operador subscribeOn()
:
Observable
<
Flight
>
flight
=
rxLookupFlight
(
"LOT 783"
).
subscribeOn
(
Schedulers
.
io
());
Observable
<
Passenger
>
passenger
=
rxFindPassenger
(
42
).
subscribeOn
(
Schedulers
.
io
());
En cualquier momento antes de suscribirnos, podemos inyectar el operador subscribeOn()
y proporcionar una instancia llamada Scheduler
. En este caso, he utilizado el método de fábrica Schedulers.io()
, pero también podemos utilizar un ExecutorService
personalizado y envolverlo rápidamente con Scheduler
. Cuando se produce la suscripción, la expresión lambda pasada aObservable.create()
se ejecuta dentro del Scheduler
suministrado en lugar de en el hilo del cliente. Todavía no es necesario, pero examinaremos los programadores en profundidad en la sección "¿Qué es un programador?". De momento, trata a un Scheduler
como un grupo de hilos.
¿Cómo cambia Scheduler
el comportamiento en tiempo de ejecución de nuestro programa? Recuerda que el operador zip()
se suscribe a dos o másObservable
s y espera pares (o tuplas). Cuando la suscripción se produce de forma asíncrona, todos los Observable
s ascendentes pueden llamar a su código de bloqueo subyacente de forma concurrente. Si ahora ejecutas tu programa, lookupFlight()
y findPassenger()
comenzarán a ejecutarse inmediata y concurrentemente cuando se invoque a ticket.subscribe()
. Entonces,bookTicket()
se aplicará en cuanto el más lento de los mencionados Observable
s emita un valor.
Hablando de lentitud, también puedes aplicar declarativamente un tiempo de espera, cuando un determinado Observable
no emita ningún valor en el tiempo especificado:
rxLookupFlight
(
"LOT 783"
)
.
subscribeOn
(
Schedulers
.
io
())
.
timeout
(
100
,
TimeUnit
.
MILLISECONDS
)
Como siempre, en caso de errores, éstos se propagan aguas abajo en lugar de lanzarse arbitrariamente. Así que si el método lookupFlight()
tarda más de 100 milisegundos, acabarás con TimeoutException
en lugar de un valor emitido enviado aguas abajo a cada suscriptor. El operador timeout()
se explica exhaustivamente en "Temporización cuando no se producen eventos".
Acabamos con dos métodos que se ejecutan concurrentemente sin mucho esfuerzo, suponiendo que tu API ya esté orientada a Rx. Pero hemos hecho un poco de trampa, ya que bookTicket()
sigue devolviendo Ticket
, lo que definitivamente significa que es bloqueante. Aunque reservar billete fuera extremadamente rápido, merece la pena declararlo como tal, sólo para facilitar la evolución de la API. La evolución podría significar añadir concurrencia o utilizarla en entornos totalmente no bloqueantes (consulta el Capítulo 5).
Recuerda que convertir una API no bloqueante en una bloqueante es tan fácil como llamar a toBlocking()
. Lo contrario suele ser complicado y requiere muchos recursos adicionales. Además, es muy difícil predecir la evolución de métodos como rxBookTicket()
, si alguna vez tocan la red o el sistema de archivos, por no hablar de la base de datos, merece la pena envolverlos con un Observable
que indique la posible latencia a nivel de tipo:
Observable
<
Ticket
>
rxBookTicket
(
Flight
flight
,
Passenger
passenger
)
{
//...
}
Pero ahora zipWith()
devuelve un incómodo Observable<Observable<Ticket>>
y el código ya no compila. Una buena regla general es que siempre que veas un tipo con doble envoltorio (por ejemplo Optional<Optional<...>>
) falta una invocación a flatMap()
en alguna parte. Ese es también el caso aquí. zipWith()
toma un par (o más generalmente una tupla) de eventos, aplica una función tomando estos eventos como argumentos, y pone el resultado en el Observable
descendente tal cual. Por eso vimos primero Observable<Ticket>
pero ahora es Observable<Observable<Ticket>>
, dondeObservable<Ticket>
es el resultado de nuestra función suministrada. Hay dos formas de superar este problema. Una forma es utilizando un par intermedio devuelto porzipWith
:
import
org.apache.commons.lang3.tuple.Pair
;
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
(
Flight
f
,
Passenger
p
)
->
Pair
.
of
(
f
,
p
))
.
flatMap
(
pair
->
rxBookTicket
(
pair
.
getLeft
(),
pair
.
getRight
()));
Si utilizar un Pair
explícito de una biblioteca de terceros no oscureciera lo suficiente el flujo, en realidad funcionaría la referencia a métodos: Pair::of
, pero, de nuevo, decidimos que la información de tipo visible es más valiosa que ahorrar unas pulsaciones. Al fin y al cabo, leemos código mucho más tiempo del que lo escribimos. Una alternativa a un par intermedio es aplicar un flatMap
con una función de identidad:
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
this
::
rxBookTicket
)
.
flatMap
(
obs
->
obs
);
Esta expresión lambda obs -> obs
aparentemente no hace nada, al menos si fuera un operador map()
. Pero recuerda que flatMap()
aplica una función a cada valor dentro de Observable
, por lo que esta función tomaObservable<Ticket>
como argumento en nuestro caso. Después, el resultado no se coloca directamente en el flujo resultante, como con map()
. En su lugar, el valor de retorno (de tipo Observable<T>
) se "aplana", dando lugar a un Observable<T>
en lugar de Observable<Observable<T>>
. Cuando se trata de programadores, el operador flatMap()
se vuelve aún más potente. Puede que percibas flatMap()
como un mero truco sintáctico para evitar un problema de Observable<Observable<...>>
anidado, pero es mucho más fundamental que esto.
Casos de uso de Observable.subscribeOn()
Es tentador pensar que subscribeOn()
es la herramienta adecuada para la concurrencia en RxJava. Este operador funciona, pero no deberías ver con frecuencia el uso de subscribeOn()
(y aún por describir observeOn()
). En la vida real, Observable
s proceden de fuentes asíncronas, por lo que la programación personalizada no es necesaria en absoluto.
Utilizamos subscribeOn()
a lo largo de este capítulo para mostrar explícitamente cómo actualizar las aplicaciones existentes para que utilicen los principios reactivos de forma selectiva. Pero en la práctica, Scheduler
s y subscribeOn()
son armas de último recurso, no algo que se vea habitualmente.
flatMap() como operador de encadenamiento asíncrono
En nuestra aplicación de ejemplo, ahora debemos enviar una lista de Ticket
s por correo electrónico. Pero debemos tener en cuenta lo siguiente:
-
La lista puede ser potencialmente muy larga.
-
Enviar un correo electrónico puede llevar varios milisegundos o incluso segundos.
-
La aplicación debe seguir funcionando con elegancia en caso de fallos, pero informar al final de qué tickets no se entregaron.
El último requisito descarta rápidamente el simpletickets.forEach(this::sendEmail)
, porque lanza una excepción y no continúa con las entradas que aún no se han entregado. En realidad, las excepciones son una desagradable puerta trasera al sistema de tipos y, al igual que las devoluciones de llamada, no son muy amigables cuando quieres gestionarlas de una forma más robusta. Por eso RxJava las modela explícitamente como notificaciones especiales, pero ten paciencia, ya llegaremos a eso. Teniendo en cuenta el requisito de gestión de errores, nuestro código se parece más o menos a esto:
List
<
Ticket
>
failures
=
new
ArrayList
<>();
for
(
Ticket
ticket:
tickets
)
{
try
{
sendEmail
(
ticket
);
}
catch
(
Exception
e
)
{
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
);
failures
.
add
(
ticket
);
}
}
Sin embargo, no se abordan los dos primeros requisitos o directrices. No hay ninguna razón para que sigamos enviando correos electrónicos de un hilo secuencialmente. Tradicionalmente, podríamos utilizar un ExecutorService pool
para ello, enviando cada correo electrónico como una tarea independiente:
List
<
Pair
<
Ticket
,
Future
<
SmtpResponse
>>>
tasks
=
tickets
.
stream
()
.
map
(
ticket
->
Pair
.
of
(
ticket
,
sendEmailAsync
(
ticket
)))
.
collect
(
toList
());
List
<
Ticket
>
failures
=
tasks
.
stream
()
.
flatMap
(
pair
->
{
try
{
Future
<
SmtpResponse
>
future
=
pair
.
getRight
();
future
.
get
(
1
,
TimeUnit
.
SECONDS
);
return
Stream
.
empty
();
}
catch
(
Exception
e
)
{
Ticket
ticket
=
pair
.
getLeft
();
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
);
return
Stream
.
of
(
ticket
);
}
})
.
collect
(
toList
());
//------------------------------------
private
Future
<
SmtpResponse
>
sendEmailAsync
(
Ticket
ticket
)
{
return
pool
.
submit
(()
->
sendEmail
(
ticket
));
}
Es una buena cantidad de código con el que todos los programadores Java deberían estar familiarizados. Sin embargo, parece demasiado verboso y accidentalmente complejo. En primer lugar, iteramos sobre tickets
y los enviamos a un grupo de hilos. Para ser precisos, llamamos al método de ayuda sendEmailAsync()
que envía la invocaciónsendEmail()
envuelta en Callable<SmtpResponse>
a un hilopool
. Las instancias aún más precisas de Callable
se colocan primero en una cola ilimitada (por defecto) frente a un grupo de hilos. La falta de mecanismos que ralenticen el envío demasiado rápido de tareas si no se pueden procesar a tiempo condujo a los flujos reactivos y al esfuerzo de contrapresión (véase "Contrapresión").
Como más adelante necesitaremos una instancia de Ticket
en caso de fallo, debemos llevar la cuenta de qué Future
era responsable de quéTicket
, de nuevo en un Pair
. En código de producción real, deberías considerar un contenedor más significativo y dedicado, como un objeto de valor TicketAsyncTask
. Recopilamos todos esos pares y pasamos a la siguiente iteración. En este punto, el grupo de hilos ya está ejecutando varias invocaciones a sendEmail()
simultáneamente, que es precisamente lo que pretendíamos. El segundo bucle recorre todos los Future
s e intenta desreferenciarlos bloqueándolos (get()
) y esperando a que se completen. Si get()
regresa con éxito, nos saltamos dicha Ticket
. Sin embargo, si se produce una excepción, devolvemos la instancia Ticket
que estaba asociada a esta tarea: sabemos que ha fallado y queremos informar de ello más adelante. Stream.flatMap()
nos permite devolver cero o un elemento (o en realidad cualquier número), al contrario que Stream.map()
, que siempre requiere uno.
Quizá te preguntes por qué necesitamos dos bucles en lugar de uno solo como éste:
//WARNING: code is sequential despite utilizing thread pool
List
<
Ticket
>
failures
=
tickets
.
stream
()
.
map
(
ticket
->
Pair
.
of
(
ticket
,
sendEmailAsync
(
ticket
)))
.
flatMap
(
pair
->
{
//...
})
.
collect
(
toList
());
Se trata de un error interesante que es realmente difícil de encontrar si no entiendes cómo funciona Stream
s en Java 8. Dado que los flujos -al igual que Observable
s- son perezosos, evalúan la colección subyacente elemento a elemento y sólo cuando se ha solicitado una operación terminal (por ejemplo,collect(toList())
). Esto significa que una operación map()
que inicie tareas en segundo plano no se ejecuta en todas las entradas inmediatamente, sino que se hace de una en una, utilizando alternativamente una operación flatMap()
. Además, realmente iniciamos un Future
, bloqueamos su espera, iniciamos un segundo Future
, bloqueamos su espera, y así sucesivamente. Se necesita una colección intermedia para forzar la evaluación, no por claridad o legibilidad. Al fin y al cabo, el tipoList<Pair<Ticket, Future<SmtpResponse>>>
es difícilmente más legible.
Eso es mucho trabajo y la posibilidad de error es alta, por lo que no es de extrañar que los desarrolladores sean reacios a aplicar código concurrente a diario. El poco conocido ExecutorCompletionService
del JDK se utiliza a veces cuando hay un conjunto de tareas asíncronas y queremos procesarlas a medida que se completan.
Además, Java 8 trae CompletableFuture
(ver "CompletableFuture y Streams") que es totalmente reactivo y no bloqueante. Pero, ¿cómo puede ayudar aquí RxJava? En primer lugar, supongamos que una API para enviar un correo electrónico ya está adaptada para utilizar RxJava:
import
static
rx
.
Observable
.
fromCallable
;
Observable
<
SmtpResponse
>
rxSendEmail
(
Ticket
ticket
)
{
//unusual synchronous Observable
return
fromCallable
(()
->
sendEmail
())
}
No hay concurrencia de por medio, simplemente se envuelve sendEmail()
dentro de un Observable
. Se trata de un Observable
raro; normalmente se utilizaría subscribeOn()
en la implementación para que el Observable
sea asíncrono por defecto. Llegados a este punto, podemos iterar sobre todos los tickets
como antes:
List
<
Ticket
>
failures
=
Observable
.
from
(
tickets
)
.
flatMap
(
ticket
->
rxSendEmail
(
ticket
)
.
flatMap
(
response
->
Observable
.<
Ticket
>
empty
())
.
doOnError
(
e
->
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
))
.
onErrorReturn
(
err
->
ticket
))
.
toList
()
.
toBlocking
()
.
single
();
Observable.ignorarElementos()
Es fácil ver que el operador interno flatMap()
de nuestro ejemplo ignora response
y devuelve un flujo vacío. En estos casos, flatMap()
es una exageración; el operador ignoreElements()
es mucho más eficiente.ignoreElements()
simplemente ignora todos los valores emitidos y reenvía las notificaciones onCompleted()
o onError()
. Como estamos ignorando la respuesta real y sólo nos ocupamos de los errores, ignoreElements()
funciona muy bien aquí.
Todo lo que nos interesa está dentro del exterior flatMap()
. Si sólo se tratara deflatMap(this::rxSendEmail)
, el código funcionaría; sin embargo, cualquier fallo emitido desde rxSendEmail
terminaría con todo el flujo. Pero queremos "atrapar" todos los errores emitidos y recogerlos para su posterior consumo. Utilizamos un truco similar al de Stream.flatMap()
: si response
se emitió con éxito, lo transformamos en un Observable
vacío. Esto significa básicamente que descartamos las entradas con éxito. Sin embargo, en caso de fallos, devolvemos un ticket
que ha emitido una excepción. Una llamada de retorno adicional adoOnError()
nos permite registrar la excepción; por supuesto, también podemos añadir el registro al operador onErrorReturn()
, pero me pareció más funcional esta separación de preocupaciones.
Para seguir siendo compatibles con las implementaciones anteriores, transformamos Observable
en Observable<List<Ticket>>
, BlockingObservable<List<Ticket>>
, toBlocking()
, y finalmente List<Ticket>
(single()
). Curiosamente, incluso BlockingObservable
sigue siendo perezoso. Un operador toBlocking()
por sí solo no fuerza la evaluación suscribiéndose al flujo subyacente y ni siquiera se bloquea. La suscripción y, por tanto, la iteración y el envío de correos electrónicos se posponen hasta que se invoca single()
.
Observa que si sustituimos el flatMap()
externo por concatMap()
(ver "Formas de combinar flujos: concat(), merge() y switchOnNext()" y "Preservar el orden utilizando concatMap()"), nos encontraremos con un fallo similar al mencionado con el Stream
del JDK.
A diferencia de flatMap()
(o merge
) que se suscriben inmediatamente a todos los flujos internos, concatMap
(o concat
) se suscribe a un Observable
interno tras otro. Y mientras nadie se suscriba a Observable
, ni siquiera se iniciará el trabajo.
Hasta ahora, se ha sustituido un simple bucle for
con un operador try
-catch
por otro menos legible y más complejo Observable
. Sin embargo, para convertir nuestro código secuencial en computación multihilo apenas necesitamos añadir un operador más:
Observable
.
from
(
tickets
)
.
flatMap
(
ticket
->
rxSendEmail
(
ticket
)
.
ignoreElements
()
.
doOnError
(
e
->
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
))
.
onErrorReturn
(
err
->
ticket
)
.
subscribeOn
(
Schedulers
.
io
()))
Es tan poco invasivo que puede resultarte difícil detectarlo. Un operadorsubscribeOn()
adicional hace que cada rxSendMail()
individual se ejecute en un Scheduler
especificado (io()
, en este caso). Éste es uno de los puntos fuertes de RxJava: no tiene reparos en lo que respecta a los subprocesos, ya que por defecto se ejecuta de forma síncrona, pero permite el multihilo sin fisuras y de forma casi transparente. Por supuesto, esto no significa que puedas inyectar programadores de forma segura en lugares arbitrarios. Pero al menos la API es menos verbosa y de más alto nivel. Exploraremos los programadores con mucho más detalle más adelante en "Multihilo en RxJava". De momento, recuerda que Observable
s es síncrono por defecto; sin embargo, podemos cambiarlo fácilmente y aplicar la concurrencia en los lugares donde menos se esperaba. Esto es especialmente valioso en aplicaciones heredadas existentes, que puedes optimizar sin mucha complicación.
Para terminar, si estás implementando Observable
s desde cero, hacerlos asíncronos por defecto es más idiomático. Eso significa colocar subscribeOn()
directamente dentro de rxSendEmail()
en lugar de externamente. De lo contrario, corres el riesgo de envolver flujos ya asíncronos con otra capa más de programadores.
Por supuesto, si el productor que hay detrás de Observable
ya es asíncrono, es incluso mejor, porque tu flujo no se vincula a ningún hilo en particular. Además, debes posponer la suscripción a un Observable
lo más tarde posible, normalmente cerca del marco web de nuestro mundo exterior. Esto cambia significativamente tu forma de pensar. Toda tu lógica empresarial es perezosa hasta que alguien quiera ver realmente los resultados.3
Sustituir Callbacks por Streams
Las API tradicionales son bloqueantes la mayor parte del tiempo, lo que significa que te obligan a esperar los resultados de forma sincrónica. Este enfoque funciona relativamente bien, al menos antes de que conocieras RxJava. Pero una API bloqueante es especialmente problemática cuando hay que enviar datos desde el productor de la API a los consumidores: éste es un ámbito en el que RxJava brilla de verdad. Hay numerosos ejemplos de estos casos y los diseñadores de API adoptan diversos enfoques. Normalmente, necesitamos proporcionar algún tipo de llamada de retorno que la API invoque, a menudo llamados escuchadores de eventos. Uno de los escenarios más comunes de este tipo esJava Message Service (JMS). Consumir JMS suele implicar implementar una clase a la que el servidor o contenedor de aplicaciones notifica cada mensaje entrante. Podemos sustituir con relativa facilidad tales escuchadores por un componible Observable
, que es mucho más robusto y versátil. El oyente tradicional tiene un aspecto similar al de esta clase, que aquí utiliza el soporte JMS enSpring framework, pero nuestra solución es agnóstica con respecto a la tecnología:
@Component
class
JmsConsumer
{
@JmsListener
(
destination
=
"orders"
)
public
void
newOrder
(
Message
message
)
{
//...
}
}
Cuando se recibe un mensaje JMS message
, la clase JmsConsumer
debe decidir qué hacer con él. Normalmente, se invoca cierta lógica empresarial dentro de un consumidor de mensajes. Cuando un nuevo componente quiere recibir notificaciones sobre dichos mensajes, debe modificar JmsConsumer
adecuadamente. En otras palabras, imagina Observable<Message>
al que puede suscribirse cualquiera. Además, hay disponible todo un universo de operadores RxJava, que permiten capacidades de mapeo, filtrado y combinación. La forma más sencilla de pasar de una API push basada en llamadas de retorno a Observable
es utilizarSubject
s. Cada vez que se entrega un nuevo mensaje JMS, empujamos ese mensaje a un PublishSubject
que parece unObservable
caliente ordinario desde el exterior:
private
final
PublishSubject
<
Message
>
subject
=
PublishSubject
.
create
();
@JmsListener
(
destination
=
"orders"
,
concurrency
=
"1"
)
public
void
newOrder
(
Message
msg
)
{
subject
.
onNext
(
msg
);
}
Observable
<
Message
>
observe
()
{
return
subject
;
}
Ten en cuenta que Observable<Message>
es caliente; empieza a emitir mensajes JMS en cuanto se consumen. Si no hay nadie suscrito en ese momento, los mensajes simplemente se pierden. ReplaySubject
es una alternativa, pero como almacena en caché todos los eventos desde el inicio de la aplicación, no es adecuada para procesos de larga duración. En caso de que tengas un suscriptor que deba recibir absolutamente todos los mensajes, asegúrate de que se suscribe antes de que se inicialice el listener de mensajes JMS. Además, nuestro listener de mensajes tiene un parámetro concurrency="1"
para asegurar que Subject
no sea invocado desde múltiples hilos. Como alternativa, puedes utilizar Subject.toSerialized()
.
Como nota al margen, Subject
s son más fáciles de poner en marcha, pero se sabe que pueden dar problemas al cabo de un tiempo. En este caso concreto, podemos sustituir fácilmente Subject
por el más idiomático RxJava Observable
, que utiliza directamente create()
:
public
Observable
<
Message
>
observe
(
ConnectionFactory
connectionFactory
,
Topic
topic
)
{
return
Observable
.
create
(
subscriber
->
{
try
{
subscribeThrowing
(
subscriber
,
connectionFactory
,
topic
);
}
catch
(
JMSException
e
)
{
subscriber
.
onError
(
e
);
}
});
}
private
void
subscribeThrowing
(
Subscriber
<?
super
Message
>
subscriber
,
ConnectionFactory
connectionFactory
,
Topic
orders
)
throws
JMSException
{
Connection
connection
=
connectionFactory
.
createConnection
();
Session
session
=
connection
.
createSession
(
true
,
AUTO_ACKNOWLEDGE
);
MessageConsumer
consumer
=
session
.
createConsumer
(
orders
);
consumer
.
setMessageListener
(
subscriber:
:
onNext
);
subscriber
.
add
(
onUnsubscribe
(
connection
));
connection
.
start
();
}
private
Subscription
onUnsubscribe
(
Connection
connection
)
{
return
Subscriptions
.
create
(()
->
{
try
{
connection
.
close
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Can't close"
,
e
);
}
});
}
La API JMS proporciona dos formas de recibir mensajes de un intermediario: sincrónica, mediante el método receive()
, que se bloquea, y no bloqueante, mediante MessageListener
. La API no bloqueante es beneficiosa por muchas razones; por ejemplo, consume menos recursos como hilos y memoria de pila. Además, se alinea perfectamente con el estilo de programación Rx. En lugar de crear una instancia de MessageListener
y llamar a nuestro suscriptor desde dentro de ella, podemos utilizar esta sintaxis escueta con referencia a métodos:
consumer
.
setMessageListener
(
subscriber:
:
onNext
)
Además, debemos ocuparnos de la limpieza de los recursos y de la gestión adecuada de los errores. Esta minúscula capa de transformación nos permite consumir fácilmente mensajes JMS sin preocuparnos de los aspectos internos de la API. Aquí tienes un ejemplo utilizando el popular broker de mensajería ActiveMQ ejecutándose localmente:
import
org.apache.activemq.ActiveMQConnectionFactory
;
import
org.apache.activemq.command.ActiveMQTopic
;
ConnectionFactory
connectionFactory
=
new
ActiveMQConnectionFactory
(
"tcp://localhost:61616"
);
Observable
<
String
>
txtMessages
=
observe
(
connectionFactory
,
new
ActiveMQTopic
(
"orders"
))
.
cast
(
TextMessage
.
class
)
.
flatMap
(
m
->
{
try
{
return
Observable
.
just
(
m
.
getText
());
}
catch
(
JMSException
e
)
{
return
Observable
.
error
(
e
);
}
});
JMS, al igual que JDBC, tiene fama de utilizar en gran medida JMSException
comprobado, incluso cuando se llama a getText()
en un TextMessage
. Para gestionar adecuadamente los errores (para más detalles, consulta "Gestión de errores" ) utilizamos flatMap()
y envolvemos las excepciones.
A partir de ahí, puedes tratar los mensajes JMS que fluyen como cualquier otro flujo asíncrono y no bloqueante. Y, por cierto, utilizamos el operador cast()
que, de forma optimista, convierte los eventos ascendentes a un tipo determinado, fallando con onError()
, en caso contrario.cast()
es básicamente un operador especializado map()
que se comporta como map(x -> (TextMessage)x)
.
Sondear periódicamente los cambios
La peor API de bloqueo con la que puedes trabajar requiere sondear los cambios. No proporciona ningún mecanismo para empujar los cambios hacia ti, ni siquiera con devoluciones de llamada o bloqueando indefinidamente. El único mecanismo que proporciona esta API es preguntar por el estado actual, y depende de ti averiguar si difiere del estado anterior o no. RxJava tiene unos cuantos operadores realmente potentes que puedes aplicar para adaptar una determinada API al estilo Rx. El primer caso que quiero que consideres es un método sencillo que proporciona un único valor que representa el estado, por ejemplolong getOrderBookLength()
. Para realizar un seguimiento de los cambios, debemos llamar a este método con suficiente frecuencia y capturar las diferencias. Puedes conseguirlo en RxJava con un operador de composición muy básico:
Observable
.
interval
(
10
,
TimeUnit
.
MILLISECONDS
)
.
map
(
x
->
getOrderBookLength
())
.
distinctUntilChanged
()
Primero producimos un valor sintético long
cada 10 milisegundos, que sirve como contador de tic-tac básico. Por cada valor de este tipo (es decir, cada 10 milisegundos), llamamos a getOrderBookLength()
. Sin embargo, el método mencionado no cambia tan a menudo, y no queremos inundar a nuestros suscriptores con montones de cambios de estado irrelevantes. Por suerte, podemos decir simplemente distinctUntilChanged()
y RxJava omitirá de forma transparente long
los valores devueltos por getOrderBookLength()
que no hayan cambiado desde la última invocación, como se demuestra en el siguiente diagrama de mármol:
Podemos aplicar este patrón aún más. Imagina que estás pendiente de los cambios en el sistema de archivos o en las tablas de la base de datos. El único mecanismo a tu disposición es tomar una instantánea actual de los archivos o registros de la base de datos. Estás construyendo una API que notificará a los clientes cada novedad. Obviamente, puedes utilizar java.nio.file.WatchService
o disparadores de base de datos, pero tómalo como un ejemplo didáctico. Esta vez, de nuevo, empezamos tomando periódicamente una instantánea del estado actual:
Observable
<
Item
>
observeNewItems
()
{
return
Observable
.
interval
(
1
,
TimeUnit
.
SECONDS
)
.
flatMapIterable
(
x
->
query
())
.
distinct
();
}
List
<
Item
>
query
()
{
//take snapshot of file system directory
//or database table
}
El operador distinct()
mantiene un registro de todos los elementos que han pasado por él (véase también "Eliminar duplicados utilizando distinct() y distinctUntilChanged()"). Si el mismo elemento aparece por segunda vez, simplemente se ignora. Por eso podemos empujar la misma lista de Item
s cada segundo. La primera vez se envían a todos los suscriptores. Sin embargo, cuando aparece exactamente la misma lista un segundo después, ya se han visto todos los elementos y, por tanto, se descartan. Si en algún momento la lista devuelta por query()
contiene unItem
de más, distinct()
lo dejará pasar, pero lo descartará la próxima vez. Este sencillo patrón nos permite sustituir un montón de invocaciones a Thread.sleep()
y el almacenamiento manual en caché por el sondeo periódico.Es aplicable en muchos ámbitos, como el sondeo del Protocolo de Transferencia de Archivos (FTP), el raspado web, etc.
Multihilo en RxJava
Hay APIs de terceros que se bloquean y sencillamente no podemos hacer nada al respecto. Puede que no dispongamos del código fuente y que reescribirlo suponga demasiado riesgo. En ese caso, debemos aprender a lidiar con el código bloqueante en lugar de luchar contra él.
Uno de los rasgos distintivos de RxJava es la concurrencia declarativa, en contraposición a la concurrencia imperativa. La creación y gestión manual de hilos es cosa del pasado (compárese con "Thread Pool de Conexiones") la mayoría de nosotros ya utilizamos pools de hilos gestionados (por ejemplo, con ExecutorService
). Pero RxJava va un paso más allá: Observable
puede ser no bloqueante, igual que CompletableFuture
en Java 8 (véase "CompletableFuture y Streams"), pero a diferencia del otro, también es perezoso. A menos que te suscribas, un Observable
que se comporte bien no realizará ninguna acción. Pero el poder de Observable
va incluso más allá.
Un Observable
asíncrono es el que llama a los métodos de devolución de llamada de tu Subscriber
s (como onNext()
) desde un hilo diferente. ¿Recuerdas "Dominar Observable.create()", en el que exploramos cuándo subscribe()
se bloquea, esperando a que lleguen todas las notificaciones? En la vida real, la mayoría de los Observable
s proceden de fuentes que son asíncronas por naturaleza. El capítulo 5 está enteramente dedicado a tales Observable
s. Pero incluso nuestro sencillo ejemplo JMS de "Sustituir las devoluciones de llamada por flujos", que utiliza una API integrada y no bloqueante de la especificación JMS (interfazMessageListener
). Esto no lo impone ni lo sugiere el sistema de tipos, pero muchos Observable
s son asíncronos desde el principio, y debes asumirlo. Un método subscribe()
bloqueante ocurre muy raramente, cuando una lambda dentro de Observable.create()
no está respaldada por ningún proceso o flujo asíncrono. Sin embargo, por defecto (con create()
) todo ocurre en el hilo del cliente (el que se suscribió). Si pulsas onNext()
directamente dentro de tu llamada de retorno a create()
, no hay multihilo ni concurrencia de ningún tipo.
Al encontrarnos con un Observable
tan inusual, podemos seleccionar declarativamente el llamado Scheduler
que se utilizará para emitir valores. En el caso de CompletableFuture
, no tenemos control sobre los hilos subyacentes, la API tomó la decisión y, en el peor de los casos, es imposible anularla. RxJava rara vez toma esas decisiones por sí solo y opta por el valor predeterminado seguro: hilo de cliente y sin multihilo de por medio. A efectos de este capítulo, utilizaremos una "biblioteca" de registro realmente sencilla4 que imprimirá un mensaje junto con el hilo actual y el número de milisegundos transcurridos desde el inicio del programa utilizando System.currentTimeMillis()
:
void
log
(
Object
label
)
{
System
.
out
.
println
(
System
.
currentTimeMillis
()
-
start
+
"\t| "
+
Thread
.
currentThread
().
getName
()
+
"\t| "
+
label
);
}
¿Qué es un Programador?
RxJava es agnóstico respecto a la concurrencia y, de hecho, no la introduce por sí mismo. Sin embargo, algunas abstracciones para tratar con hilos están expuestas al usuario final. Además, ciertos operadores no pueden funcionar correctamente sin concurrencia; consulta "Otros usos de los programadores" para conocer algunos de ellos. Por suerte, la clase Scheduler
, la única a la que debes prestar atención, es bastante sencilla. En principio, funciona de forma similar a ScheduledExecutorService
de java.util.concurrent
-ejecuta bloques arbitrarios de código, posiblemente en el futuro. Sin embargo, para cumplir el contrato Rx, ofrece algunas abstracciones más finas, de las que puedes ver más en la sección avanzada "Visión general de los detalles de implementación del programador".
Los programadores se utilizan junto con los operadores subscribeOn()
y observeOn()
, así como al crear determinados tipos de Observable
s. Un programador sólo crea instancias de Worker
s que se encargan de programar y ejecutar código. Cuando RxJava necesita programar algún código, primero pide a Scheduler
que le proporcione un Worker
y utiliza este último para programar las tareas posteriores. Más adelante encontrarás ejemplos de esta API, pero primero familiarízate con los programadores integrados disponibles:
Schedulers.newThread()
-
Este programador simplemente inicia un nuevo hilo cada vez que se solicita a través de
subscribeOn()
oobserveOn()
.newThread()
casi nunca es una buena opción, no sólo por la latencia que implica iniciar un hilo, sino también porque este hilo no se reutiliza. El espacio de la pila debe asignarse por adelantado (normalmente en torno a un megabyte, como controla el parámetro-Xss
de la JVM) y el sistema operativo debe iniciar un nuevo hilo nativo. Cuando termina laWorker
, el hilo simplemente termina. Este planificador sólo puede ser útil cuando las tareas son de grano grueso: tardan mucho tiempo en completarse, pero son muy pocas, por lo que es poco probable que los hilos se reutilicen en absoluto. Véase también "Hilo por conexión". En la práctica, seguirSchedulers.io()
es casi siempre una opción mejor. Schedulers.io()
-
Este planificador es similar a
newThread()
, pero los subprocesos ya iniciados se reciclan y posiblemente puedan gestionar futuras peticiones. Esta implementación funciona de forma similar aThreadPoolExecutor
dejava.util.concurrent
con un conjunto ilimitado de hilos. Cada vez que se solicita un nuevoWorker
, o bien se inicia un nuevo hilo (y después se mantiene inactivo durante algún tiempo) o se reutiliza el que está inactivo.El nombre
io()
no es una coincidencia. Considera la posibilidad de utilizar este programador para tareas ligadas a E/S que requieren muy pocos recursos de CPU. Sin embargo, suelen tardar bastante tiempo, esperando a la red o al disco. Por lo tanto, es una buena idea tener un conjunto relativamente grande de hilos. Aun así, ten cuidado con los recursos ilimitados de cualquier tipo: en caso de dependencias externas lentas o que no respondan, como los servicios web, el programadorio()
podría iniciar un número enorme de hilos, provocando que tu propia aplicación tampoco responda. Consulta " Gestión de fallos con Hystrix" para más detalles sobre cómo abordar este problema. Schedulers.computation()
-
Debes utilizar un programador de cálculo cuando las tareas estén totalmente ligadas a la CPU; es decir, que requieran potencia de cálculo y no tengan código de bloqueo (lectura del disco, red, suspensión, espera de bloqueo, etc.) Dado que se supone que cada tarea ejecutada en este programador debe utilizar completamente un núcleo de la CPU, ejecutar más tareas de este tipo en paralelo que núcleos disponibles no aportaría mucho valor. Por lo tanto, el planificador
computation()
limita por defecto el número de hilos que se ejecutan en paralelo al valor deavailableProcessors()
, que se encuentra en la clase de utilidadRuntime.getRuntime()
.Si por alguna razón necesitas un número de hilos distinto del predeterminado, siempre puedes utilizar la propiedad del sistema
rx.scheduler.max-computation-threads
. Tomando menos hilos te aseguras de que siempre haya uno o más núcleos de CPU ociosos, e incluso bajo una carga pesada, el conjunto de hilos decomputation()
no satura tu servidor. No es posible tener más hilos de cálculo que núcleos.computation()
El programador utiliza una cola ilimitada delante de cada hilo, de modo que si la tarea está programada pero todos los núcleos están ocupados, se ponen en cola. En caso de pico de carga, este programador mantendrá limitado el número de hilos. Sin embargo, la cola delante de cada hilo seguirá creciendo.Por suerte, los operadores incorporados, especialmente
observeOn()
que vamos a descubrir en "Concurrencia declarativa con observeOn()" garantizan que esteScheduler
no se sobrecargue. Schedulers.from(Executor executor)
-
Scheduler
s son internamente más complejas queExecutor
s dejava.util.concurrent
, por lo que se necesitaba una abstracción aparte. Pero como conceptualmente son bastante similares, no es de extrañar que exista una envoltura que pueda convertirExecutor
enScheduler
utilizando el método de fábricafrom()
:
import
com.google.common.util.concurrent.ThreadFactoryBuilder
;
import
rx.Scheduler
;
import
rx.schedulers.Schedulers
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.ThreadFactory
;
import
java.util.concurrent.ThreadPoolExecutor
;
//...
ThreadFactory
threadFactory
=
new
ThreadFactoryBuilder
()
.
setNameFormat
(
"MyPool-%d"
)
.
build
();
Executor
executor
=
new
ThreadPoolExecutor
(
10
,
//corePoolSize
10
,
//maximumPoolSize
0L
,
TimeUnit
.
MILLISECONDS
,
//keepAliveTime, unit
new
LinkedBlockingQueue
<>(
1000
),
//workQueue
threadFactory
);
Scheduler
scheduler
=
Schedulers
.
from
(
executor
);
Estoy utilizando intencionadamente esta sintaxis verbosa para crear ExecutorService
en lugar de la versión más sencilla:
import
java.util.concurrent.Executors
;
//...
ExecutorService
executor
=
Executors
.
newFixedThreadPool
(
10
);
Aunque tentadora, la clase de fábrica Executors
codifica de forma rígida varios valores predeterminados que son poco prácticos o incluso peligrosos en aplicaciones empresariales. Por ejemplo, utiliza LinkedBlockingQueue
sin límites que pueden crecer infinitamente, lo que resulta en OutOfMemoryError
para casos en los que hay un gran número de tareas pendientes. Además, por defecto ThreadFactory
utiliza nombres de hilos sin sentido como pool-5-thread-3
. Nombrar los hilos correctamente es una herramienta inestimable a la hora de crear perfiles o analizar los volcados de hilos. Implementar ThreadFactory
desde cero es un poco engorroso, así que utilizamos ThreadFactoryBuilder de Guava. Si te interesa afinar aún más y utilizar adecuadamente los thread pools, consulta "Thread Pool de Conexiones" y "Gestión de fallos con Hystrix". Crear planificadores a partir de Executor
que configuramos conscientemente es aconsejable para proyectos con mucha carga. Sin embargo, como RxJava no tiene control sobre los hilos creados independientemente en un Executor
, no puede fijar hilos (es decir, intentar mantener el trabajo de la misma tarea en el mismo hilo para mejorar la localidad de la caché). Este Scheduler
apenas se asegura de que un único Scheduler.Worker
(ver "Visión general de los detalles de implementación del programador") procese los eventos secuencialmente.
Schedulers.immediate()
-
Schedulers.immediate()
es un planificador especial que invoca una tarea dentro del subproceso del cliente de forma bloqueante, en lugar de asíncrona. Utilizarlo no tiene sentido a menos que alguna parte de tu API requiera proporcionar un planificador, mientras que estás absolutamente bien con el comportamiento por defecto deObservable
, que no implica ningún tipo de roscado. De hecho, suscribirse a unObservable
(más sobre esto en un segundo) a través deimmediate()
Scheduler
suele tener el mismo efecto que no suscribirse con ningún programador en particular. En general, evita este programador, bloquea el subproceso de llamada y es de uso limitado. Schedulers.trampoline()
-
El programador
trampoline()
es muy similar aimmediate()
porque también programa tareas en el mismo hilo, bloqueando de forma efectiva. Sin embargo, a diferencia deimmediate()
, la tarea siguiente se ejecuta cuando finalizan todas las tareas programadas anteriormente.immediate()
invoca una tarea determinada inmediatamente, mientras quetrampoline()
espera a que finalice la tarea actual. Trampolín es un patrón de la programación funcional que permite implementar la recursividad sin hacer crecer infinitamente la pila de llamadas. Esto se explica mejor con un ejemplo, primero conimmediate()
. Por cierto, fíjate en que no interactuamos directamente con una instancia deScheduler
, sino que primero creamos unWorker
. Esto tiene sentido, como verás rápidamente en "Visión general de los detalles de implementación del programador".Scheduler
scheduler
=
Schedulers
.
immediate
();
Scheduler
.
Worker
worker
=
scheduler
.
createWorker
();
log
(
"Main start"
);
worker
.
schedule
(()
->
{
log
(
" Outer start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Inner start"
);
sleepOneSecond
();
log
(
" Inner end"
);
});
log
(
" Outer end"
);
});
log
(
"Main end"
);
worker
.
unsubscribe
();
El resultado es el esperado; en realidad, podrías sustituir
schedule()
por una simple invocación a un método:1044 | main | Main start 1094 | main | Outer start 2097 | main | Inner start 3097 | main | Inner end 3100 | main | Outer end 3100 | main | Main end
Dentro del bloque
Outer
tenemos el bloqueschedule()
Inner
que se invoca inmediatamente, interrumpiendo la tareaOuter
. CuandoInner
termina, el control vuelve aOuter
. De nuevo, esto es simplemente una forma enrevesada de invocar una tarea de forma bloqueante indirectamente a través deimmediate()
Scheduler
. Pero, ¿qué ocurre si sustituimosSchedulers.immediate()
porSchedulers.trampoline()
? El resultado es muy diferente:1030 | main | Main start 1096 | main | Outer start 2101 | main | Outer end 2101 | main | Inner start 3101 | main | Inner end 3101 | main | Main end
¿Ves cómo
Outer
consigue terminar antes incluso de que empieceInner
? Esto se debe a que la tareaInner
estaba en cola dentro de la tareatrampoline()
Scheduler
, que ya estaba ocupada por la tareaOuter
. CuandoOuter
terminó, comenzó la primera tarea de la cola (Inner
). Podemos ir aún más lejos para asegurarnos de que entiendes la diferencia:log
(
"Main start"
);
worker
.
schedule
(()
->
{
log
(
" Outer start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Middle start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Inner start"
);
sleepOneSecond
();
log
(
" Inner end"
);
});
log
(
" Middle end"
);
});
log
(
" Outer end"
);
});
log
(
"Main end"
);
El
Worker
deimmediate()
Scheduler
produce lo siguiente:1029 | main | Main start 1091 | main | Outer start 2093 | main | Middle start 3095 | main | Inner start 4096 | main | Inner end 4099 | main | Middle end 4099 | main | Outer end 4099 | main | Main end
Frente al trabajador
trampoline()
:1041 | main | Main start 1095 | main | Outer start 2099 | main | Outer end 2099 | main | Middle start 3101 | main | Middle end 3101 | main | Inner start 4102 | main | Inner end 4102 | main | Main end
Schedulers.test()
-
Este
Scheduler
sólo se utiliza para pruebas, y nunca lo verás en código de producción. Su principal ventaja es la capacidad de avanzar arbitrariamente el reloj, simulando el paso del tiempo.TestScheduler
se describe en gran medida en "Programadores en pruebas unitarias".Scheduler
s por sí solos no son muy interesantes. Si quieres descubrir cómo funcionan internamente y cómo implementar el tuyo propio, consulta la siguiente sección.
Resumen de los detalles de implementación del programador
Nota
Esta sección es totalmente opcional, siéntete libre de saltar directamente a "Suscripción declarativa con subscribeOn()" si no te interesan los detalles de la implementación.
Scheduler
no sólo desacopla las tareas y su ejecución (normalmente ejecutándolas en otro hilo), sino que también abstrae el reloj, como aprenderemos en "Tiempo virtual". La API de Scheduler
es un poco más sencilla en comparación con, por ejemplo, ScheduledExecutorService
:
abstract
class
Scheduler
{
abstract
Worker
createWorker
();
long
now
();
abstract
static
class
Worker
implements
Subscription
{
abstract
Subscription
schedule
(
Action0
action
);
abstract
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
);
long
now
();
}
}
Cuando RxJava quiere programar una tarea (presumiblemente, pero no necesariamente en segundo plano), primero debe solicitar una instancia de Worker
. Es el Worker
el que permite programar la tarea sin ningún retraso o en algún momento en el tiempo. Tanto Scheduler
como Worker
tienen una fuente de tiempo anulable (métodonow()
) que utiliza para determinar cuándo debe ejecutarse una tarea determinada. Ingenuamente, puedes pensar en un Scheduler
como en un pool de hilos y en un Worker
como en un hilo dentro de ese pool.
La separación entre Scheduler
y Worker
es necesaria para aplicar fácilmente algunas de las directrices que impone el contrato Rx, como invocar el método de Subscriber
de forma secuencial, no concurrente. Worker
El contrato de Rx establece precisamente eso: dos tareas programadas en el mismo Worker
nunca se ejecutarán simultáneamente. Sin embargo, Worker
s independientes del mismo Scheduler
pueden ejecutar tareas simultáneamente sin problemas.
En lugar de repasar la API, vamos a analizar el código fuente de un Scheduler
existente, concretamente HandlerScheduler
, que se encuentra en el proyecto RxAndroid. Este Scheduler
simplemente ejecuta todas las tareas programadas en un hilo de la interfaz de usuario de Android. La actualización de la interfaz de usuario sólo se permite desde ese hilo (para más detalles, consulta "Desarrollo Android con RxJava" ). Esto es similar al Hilo de Despacho de Eventos (EDT) que se encuentra en Swing, donde la mayoría de las actualizaciones de ventanas y componentes deben ejecutarse dentro de un hilo dedicado (EDT). Como era de esperar, también existe el proyecto RxSwing5 para ello.
El fragmento de código que sigue es una clase despojada e incompleta de RxAndroid sólo con fines educativos:
package
rx
.
android
.
schedulers
;
import
android.os.Handler
;
import
android.os.Looper
;
import
rx.Scheduler
;
import
rx.Subscription
;
import
rx.functions.Action0
;
import
rx.internal.schedulers.ScheduledAction
;
import
rx.subscriptions.Subscriptions
;
import
java.util.concurrent.TimeUnit
;
public
final
class
SimplifiedHandlerScheduler
extends
Scheduler
{
@Override
public
Worker
createWorker
()
{
return
new
HandlerWorker
();
}
static
class
HandlerWorker
extends
Worker
{
private
final
Handler
handler
=
new
Handler
(
Looper
.
getMainLooper
());
@Override
public
void
unsubscribe
()
{
//Implementation coming soon...
}
@Override
public
boolean
isUnsubscribed
()
{
//Implementation coming soon...
return
false
;
}
@Override
public
Subscription
schedule
(
final
Action0
action
)
{
return
schedule
(
action
,
0
,
TimeUnit
.
MILLISECONDS
);
}
@Override
public
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
)
{
ScheduledAction
scheduledAction
=
new
ScheduledAction
(
action
);
handler
.
postDelayed
(
scheduledAction
,
unit
.
toMillis
(
delayTime
));
scheduledAction
.
add
(
Subscriptions
.
create
(()
->
handler
.
removeCallbacks
(
scheduledAction
)));
return
scheduledAction
;
}
}
}
Los detalles de la API de Android no son importantes por el momento. Lo que ocurre aquí es que cada vez que programamos algo en un HandlerWorker
, el bloque de código se pasa a un método especial postDelayed()
que lo ejecuta en un hilo dedicado de Android. Sólo hay uno de estos hilos, por lo que los eventos se serializan no sólo dentro de Worker
s, sino también entre ellos.
Antes de pasar action
para que se ejecute, lo envolvemos con ScheduledAction
, que implementa tanto Runnable
como Subscription
. RxJava es perezoso siempre que puede; esto también se aplica a la programación de tareas. Si por alguna razón decides que un determinado action
no debe ejecutarse después de todo (esto tiene sentido cuando la acción se programó en el futuro, no inmediatamente), simplemente ejecuta unsubscribe()
en el Subscription
devuelto por schedule()
. Es responsabilidad del Worker
manejar adecuadamente la desuscripción (al menos, el mejor esfuerzo).
El código cliente también puede decidir unsubscribe()
desde Worker
en su totalidad. Esto debería anular la suscripción de todas las tareas en cola, así como liberar el Worker
para que el hilo subyacente pueda reutilizarse potencialmente más adelante. El siguiente fragmento de código mejora el SimplifiedHandlerScheduler
añadiendo el flujo de desuscripción Worker
(sólo se incluyen los métodos modificados):
private
CompositeSubscription
compositeSubscription
=
new
CompositeSubscription
();
@Override
public
void
unsubscribe
()
{
compositeSubscription
.
unsubscribe
();
}
@Override
public
boolean
isUnsubscribed
()
{
return
compositeSubscription
.
isUnsubscribed
();
}
@Override
public
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
)
{
if
(
compositeSubscription
.
isUnsubscribed
())
{
return
Subscriptions
.
unsubscribed
();
}
final
ScheduledAction
scheduledAction
=
new
ScheduledAction
(
action
);
scheduledAction
.
addParent
(
compositeSubscription
);
compositeSubscription
.
add
(
scheduledAction
);
handler
.
postDelayed
(
scheduledAction
,
unit
.
toMillis
(
delayTime
));
scheduledAction
.
add
(
Subscriptions
.
create
(()
->
handler
.
removeCallbacks
(
scheduledAction
)));
return
scheduledAction
;
}
En "Control de los receptores mediante la suscripción y el suscriptor<T>", exploramos la interfaz Subscription
, pero no llegamos a examinar los detalles de su implementación. CompositeSubscription
es una de las muchas implementaciones disponibles, que en sí misma no es más que un contenedor para hijos Subscription
(un patrón de diseño compuesto ). Darse de baja de CompositeSubscription
significa darse de baja de todos los hijos. También puedes añadir y eliminar los hijos gestionados por CompositeSubscription
.
En nuestro Scheduler
personalizado, CompositeSubscription
se utiliza para rastrear todos los Subscription
s de las invocaciones anteriores a schedule()
(ver compositeSubscription.add(scheduledAction)
). Por otra parte, el ScheduledAction
hijo necesita conocer a su padre (ver: addParent()
) para poder eliminarse a sí mismo cuando la acción finalice o se cancele. De lo contrario, Worker
acumularía hijos Subscription
s obsoletos para siempre. Cuando el código cliente decide que ya no necesita una instancia de HandlerWorker
, se da de baja de ella. La baja se propaga a todos los Subscription
s hijos pendientes (si los hay).
Ésta ha sido una breve introducción a Scheduler
s en RxJava. Los detalles de su funcionamiento interno no son tan útiles en el trabajo diario; de hecho, están diseñados para que el uso de RxJava sea más intuitivo y predecible. Dicho esto, veamos rápidamente cómo Scheduler
s resuelve muchos problemas de concurrencia en Rx.
Suscripción declarativa con subscribeOn()
En "Dominar Observable.create()" vimos que subscribe()
utiliza por defecto el hilo del cliente. Para recapitular, aquí tienes la suscripción más sencilla que se te pueda ocurrir en la que no haya ningún tipo de subproceso implicado:
Observable
<
String
>
simple
()
{
return
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onCompleted
();
});
}
//...
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
final
Observable
<
String
>
obs2
=
obs
.
map
(
x
->
x
)
.
filter
(
x
->
true
);
log
(
"Transformed"
);
obs2
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Fíjate en dónde se colocan las sentencias de registro y estudia detenidamente la salida, sobre todo en lo que respecta a qué hilo invocó la sentencia print:
33 | main | Starting 120 | main | Created 128 | main | Transformed 133 | main | Subscribed 133 | main | Got A 133 | main | Got B 133 | main | Completed 134 | main | Exiting
Presta atención: el orden de las declaraciones es absolutamente predecible. En primer lugar, cada línea de código del fragmento de código anterior se ejecuta en el hilo main
, no hay grupos de hilos ni emisión asíncrona de eventos implicados. En segundo lugar, el orden de ejecución puede no estar del todo claro a primera vista.
Cuando se inicia el programa, imprime Starting
, lo cual es comprensible. Después de crear una instancia de Observable<String>
, vemos el mensaje Created
. Observa que Subscribed
aparece más tarde, cuando realmente nos suscribimos. Sin la invocación a subscribe()
, el bloque de código dentro de Observable.create()
nunca se ejecuta. Además, incluso los operadores map()
y filter()
no tienen efectos secundarios visibles, observa cómo el mensaje Transformed
se imprime incluso antes que Subscribed
.
Después, recibimos todos los eventos emitidos y la notificación de finalización. Por último, se imprime la declaración Exiting
y el programa puede volver. Ésta es una observación interesante: se supone quesubscribe()
registra una llamada de retorno cuando aparecen eventos de forma asíncrona. Esta es la suposición que deberías hacer por defecto. Sin embargo, en este caso no hay hilos implicados y subscribe()
en realidad está bloqueando. ¿Cómo es posible?
Existe una conexión inherente pero oculta entre subscribe()
y create()
. Cada vez que llamas a subscribe()
en un Observable
, se invoca su método de devolución de llamada OnSubscribe
(envolviendo la expresión lambda que pasaste a create()
). Recibe tu Subscriber
como argumento. Por defecto, esto ocurre en el mismo hilo y es bloqueante, así que cualquier cosa que hagas dentro de create()
bloqueará subscribe()
. Si tu método create()
duerme unos segundos, subscribe()
se bloqueará. Además, si hay operadores entre Observable.create()
y tu Subscriber
(lambda que actúa como devolución de llamada), todos estos operadores se invocan en nombre de la hebra que invocó subscribe()
. RxJava no inyecta por defecto ninguna facilidad de concurrencia entre Observable
y Subscriber
. La razón es que Observable
suele estar respaldada por otros mecanismos de concurrencia, como bucles de eventos o hebras personalizadas, por lo que Rx te permite tomar el control total en lugar de imponer ninguna convención.
Esta observación prepara el terreno para el operador subscribeOn()
. Al insertar subscribeOn()
en cualquier lugar entre un Observable
original y subscribe()
, seleccionas declarativamente Scheduler
donde se invocará el método de devolución de llamada OnSubscribe
. No importa lo que hagas dentro de create()
, este trabajo se descarga a un Scheduler
independiente y tu invocación a subscribe()
ya no se bloquea:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
35 | main | Starting 112 | main | Created 123 | main | Exiting 123 | Sched-A-0 | Subscribed 124 | Sched-A-0 | Got A 124 | Sched-A-0 | Got B 124 | Sched-A-0 | Completed
¿Ves cómo el hilo main
sale antes incluso de que Observable
empiece a emitir ningún valor? Técnicamente, el orden de los mensajes de registro ya no es tan predecible porque hay dos hilos ejecutándose simultáneamente: main
Sched-A-0
, que se suscribió y quiere salir, y , que emite eventos en cuanto alguien se suscribe. Tanto el hilo schedulerA
como el Sched-A-0
proceden de los siguientes programadores de ejemplo que construimos con fines ilustrativos:
import
static
java
.
util
.
concurrent
.
Executors
.
newFixedThreadPool
;
ExecutorService
poolA
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-A-%d"
));
Scheduler
schedulerA
=
Schedulers
.
from
(
poolA
);
ExecutorService
poolB
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-B-%d"
));
Scheduler
schedulerB
=
Schedulers
.
from
(
poolB
);
ExecutorService
poolC
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-C-%d"
));
Scheduler
schedulerC
=
Schedulers
.
from
(
poolC
);
private
ThreadFactory
threadFactory
(
String
pattern
)
{
return
new
ThreadFactoryBuilder
()
.
setNameFormat
(
pattern
)
.
build
();
}
Estos programadores se utilizarán en todos los ejemplos, pero son bastante fáciles de recordar. Tres programadores independientes, cada uno de los cuales gestiona 10 hilos de un ExecutorService
. Para que la salida sea más agradable, cada grupo de hilos tiene un patrón de nombres distinto.
Antes de empezar, debes comprender que en las aplicaciones maduras, en términos de adopción de Rx, subscribeOn()
se utiliza muy raramente. Normalmente, Observable
s proceden de fuentes que son naturalmente asíncronas (como RxNetty, véase "Servidor HTTP no bloqueante con Netty y RxNetty") o aplican la programación por sí mismas (como Hystrix, véase "Gestión de fallos con Hystrix"). Debes tratar subscribeOn()
sólo en casos especiales, cuando se sepa que el Observable
subyacente es síncrono (siendocreate()
bloqueante). Sin embargo, subscribeOn()
sigue siendo una solución mucho mejor que la creación manual de hilos dentro de create()
:
//Don't do this
Observable
<
String
>
obs
=
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
Runnable
code
=
()
->
{
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onCompleted
();
};
new
Thread
(
code
,
"Async"
).
start
();
});
El código anterior mezcla dos conceptos: la producción de eventos y la elección de la estrategia de concurrencia. Observable
sólo debe ser responsable de la lógica de producción, mientras que sólo el código del cliente puede tomar una decisión sensata sobre la concurrencia. Recuerda que Observable
es perezoso pero también inmutable, en el sentido de que subscribeOn()
sólo afecta a los suscriptores posteriores, si alguien se suscribe exactamente al mismo Observable
sin subscribeOn()
por medio, no habrá concurrencia por defecto.
Ten en cuenta que en este capítulo nos centramos en las aplicaciones existentes y en introducir RxJava gradualmente. El operador subscribeOn()
es bastante útil en tales circunstancias; sin embargo, después de comprender las extensiones reactivas y empezar a utilizarlas a gran escala, el valor de subscribeOn()
disminuye. En pilas de software totalmente reactivas, como las que se encuentran por ejemplo en Netflix , subscribeOn()
casi nunca se utiliza, y sin embargo todos los Observable
s son asíncronos.
La mayoría de las veces, Observable
s proceden de fuentes asíncronas y se tratan como asíncronas por defecto. Por tanto, el uso de subscribeOn()
es muy limitado, sobre todo cuando se adaptan API o bibliotecas existentes. En el Capítulo 5, escribimos aplicaciones verdaderamente asíncronas sin subscribeOn()
y Scheduler
s explícitas por completo.
subscribeOn() Concurrencia y comportamiento
Hay varios matices sobre cómo funciona subscribeOn()
. En primer lugar, el lector curioso se preguntará qué ocurre si aparecen dos invocaciones del subscribeOn()
entre Observable
y subscribe()
. La respuesta es sencilla: gana el subscribeOn()
más cercano al Observable
original. Esto tiene importantes implicaciones prácticas. Si estás diseñando una API y utilizas subscribeOn()
internamente, el código del cliente no tiene forma de anular el Scheduler
elegido. Esto puede ser una decisión de diseño consciente; al fin y al cabo, el diseñador de la API puede saber mejor que nadie qué Scheduler
es el adecuado. Por otro lado, proporcionar una versión sobrecargada de dicha API que permita anular el Scheduler
elegido es siempre una buena idea.
Estudiemos cómo se comporta subscribeOn()
:
log
(
"Starting"
);
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
//many other operators
.
subscribeOn
(
schedulerB
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
La salida sólo muestra los hilos de schedulerA
:
17 | main | Starting 73 | main | Created 83 | main | Exiting 84 | Sched-A-0 | Subscribed 84 | Sched-A-0 | Got A 84 | Sched-A-0 | Got B 84 | Sched-A-0 | Completed
Curiosamente, la suscripción en schedulerB
no se ignora por completo en favor de schedulerA
.schedulerB
se sigue utilizando durante un breve periodo de tiempo, pero apenas programa nuevas acciones en schedulerA
, que realiza todo el trabajo. Así pues, los múltiples subscribeOn()
no sólo se ignoran, sino que introducen una pequeña sobrecarga.
Hablando de operadores, hemos dicho que el método create()
utilizado cuando hay un nuevo Subscriber
se ejecuta dentro del programador proporcionado (si lo hay). Pero, ¿qué hilo ejecuta todas estas transformaciones que se producen entre create()
y subscribe()
? Ya sabemos que cuando todos los operadores se ejecutan por defecto en el mismo hilo (planificador), no hay concurrencia por defecto:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
this
::
log
)
.
map
(
x
->
x
+
'1'
)
.
doOnNext
(
this
::
log
)
.
map
(
x
->
x
+
'2'
)
.
subscribeOn
(
schedulerA
)
.
doOnNext
(
this
::
log
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Salpicamos ocasionalmente la cadena de operadores con doOnNext()
para ver qué hilo tiene el control en ese momento. Recuerda que la posición de subscribeOn()
no es relevante, puede estar justo después de Observable
o justo antes de subscribe()
. El resultado no es sorprendente:
20 | main | Starting 104 | main | Created 123 | main | Exiting 124 | Sched-A-0 | Subscribed 124 | Sched-A-0 | A 124 | Sched-A-0 | A1 124 | Sched-A-0 | A12 124 | Sched-A-0 | Got A12 124 | Sched-A-0 | B 124 | Sched-A-0 | B1 124 | Sched-A-0 | B12 125 | Sched-A-0 | Got B12
Observa cómo se invoca a create()
y produce los eventos A
y B
. Estos eventos viajan secuencialmente a través del hilo del programador para llegar finalmente a Subscriber
. Muchos recién llegados a RxJava creen que el uso de Scheduler
con un gran número de hilos bifurcará automáticamente el procesamiento de los eventos de forma concurrente y unirá de algún modo todos los resultados al final. Esto no es así. RxJava crea una única instancia de Worker
(ver: "Visión general de los detalles de implementación del programador") para toda la cadena, sobre todo para garantizar el procesamiento secuencial de los eventos.
Esto significa que si uno de tus operadores es especialmente lento -por ejemplo, map()
leyendo datos del disco para transformar los eventos que pasan- esta costosa operación se invocará dentro del mismo hilo. Un solo operador averiado puede ralentizar todo el canal, desde la producción hasta el consumo. Esto es un antipatrón en RxJava, los operadores deben ser no bloqueantes, rápidos y tan puros como sea posible.
De nuevo, flatMap()
viene al rescate. En lugar de bloquearnos dentro de map()
, podemos invocar a flatMap()
y recoger asíncronamente todos los resultados. Por tanto, flatMap()
y merge()
son los operadores cuando queremos conseguir un verdadero paralelismo. Pero incluso con flatMap()
no es obvio. Imagina una tienda de comestibles (llamémosla "RxGroceries") que proporciona una API para comprar productos:
class
RxGroceries
{
Observable
<
BigDecimal
>
purchase
(
String
productName
,
int
quantity
)
{
return
Observable
.
fromCallable
(()
->
doPurchase
(
productName
,
quantity
));
}
BigDecimal
doPurchase
(
String
productName
,
int
quantity
)
{
log
(
"Purchasing "
+
quantity
+
" "
+
productName
);
//real logic here
log
(
"Done "
+
quantity
+
" "
+
productName
);
return
priceForProduct
;
}
}
Obviamente, la implementación de doPurchase()
es irrelevante aquí, sólo imagina que tarda algo de tiempo y recursos en completarse. Simulamos la lógica de negocio añadiendo una suspensión artificial de un segundo, ligeramente superior si quantity
es mayor. El bloqueo de Observable
s como el devuelto por purchase()
es inusual en una aplicación real, pero vamos a mantenerlo así con fines educativos. Al comprar varios bienes nos gustaría paralelizar todo lo posible y calcular el precio total de todos los bienes al final. El primer intento es infructuoso:
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
subscribeOn
(
schedulerA
)
//BROKEN!!!
.
map
(
prod
->
rxGroceries
.
doPurchase
(
prod
,
1
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
El resultado es correcto, es un Observable
con un único valor: el precio total, calculado con reduce()
. Para cada producto, invocamos doPurchase()
con quantity
uno. Sin embargo, a pesar de utilizar schedulerA
respaldado por un pool de hilos de 10, el código es totalmente secuencial:
144 | Sched-A-0 | Purchasing 1 bread 1144 | Sched-A-0 | Done 1 bread 1146 | Sched-A-0 | Purchasing 1 butter 2146 | Sched-A-0 | Done 1 butter 2146 | Sched-A-0 | Purchasing 1 milk 3147 | Sched-A-0 | Done 1 milk 3147 | Sched-A-0 | Purchasing 1 tomato 4147 | Sched-A-0 | Done 1 tomato 4147 | Sched-A-0 | Purchasing 1 cheese 5148 | Sched-A-0 | Done 1 cheese
Observa cómo cada producto bloquea el procesamiento de los siguientes. Cuando se realiza la compra de pan, la de mantequilla comienza inmediatamente, pero no antes. Curiosamente, ni siquiera sustituir map()
por flatMap()
ayuda, y el resultado es exactamente el mismo:
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
subscribeOn
(
schedulerA
)
.
flatMap
(
prod
->
rxGroceries
.
purchase
(
prod
,
1
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
El código no funciona de forma concurrente porque sólo hay un único flujo de eventos, que por diseño debe ejecutarse secuencialmente. De lo contrario, tu Subscriber
tendría que ser consciente de las notificaciones concurrentes (onNext()
, onComplete()
, etc.), por lo que es un compromiso justo. Por suerte, la solución idiomática es muy parecida. Los principales productos emisores de Observable
no pueden paralelizarse. Sin embargo, para cada producto, creamos un nuevo Observable
independiente, devuelto por purchase()
. Como son independientes, podemos programar con seguridad cada uno de ellos de forma concurrente:
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
flatMap
(
prod
->
rxGroceries
.
purchase
(
prod
,
1
)
.
subscribeOn
(
schedulerA
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
¿Puedes ver dónde está subscribeOn()
? El Observable
principal no está haciendo realmente nada, por lo que no es necesario un grupo de hilos especial. Sin embargo, cada subflujo creado dentro de flatMap()
recibe un schedulerA
. Cada vez que se utiliza subscribeOn()
al Scheduler
tiene la oportunidad de devolver un nuevo Worker
, y por tanto un hilo separado (simplificando un poco):
113 | Sched-A-1 | Purchasing 1 butter 114 | Sched-A-0 | Purchasing 1 bread 125 | Sched-A-2 | Purchasing 1 milk 125 | Sched-A-3 | Purchasing 1 tomato 126 | Sched-A-4 | Purchasing 1 cheese 1126 | Sched-A-2 | Done 1 milk 1126 | Sched-A-0 | Done 1 bread 1126 | Sched-A-1 | Done 1 butter 1128 | Sched-A-3 | Done 1 tomato 1128 | Sched-A-4 | Done 1 cheese
Por último, hemos conseguido una verdadera concurrencia. Cada operación de compra comienza ahora al mismo tiempo y todas terminan finalmente. El operador flatMap()
está cuidadosamente diseñado e implementado para que recoja todos los eventos de todos los flujos independientes y los empuje hacia abajo secuencialmente. Sin embargo, como ya aprendimos en "Orden de los eventos tras flatMap()", ya no podemos confiar en el orden de los eventos aguas abajo: ni empiezan ni terminan en el mismo orden en que fueron emitidos (la secuencia original empezaba en pan). Cuando los eventos llegan al operador reduce()
, ya son secuenciales y se comportan bien.
A estas alturas, deberías alejarte poco a poco del modelo clásico Thread
y comprender cómo funciona Schedulers
. Pero si te resulta difícil, he aquí una sencilla analogía:
-
Observable
sin ningúnScheduler
funciona como un programa monohilo con llamadas a métodos bloqueantes que se pasan datos entre sí. -
Observable
con un únicosubscribeOn()
es como iniciar una gran tarea en segundo planoThread
. El programa dentro de eseThread
sigue siendo secuencial, pero al menos se ejecuta en segundo plano. -
Observable
usandoflatMap()
donde cadaObservable
interno tienesubscribeOn()
funciona comoForkJoinPool
dejava.util.concurrent
, donde cada subcorriente es una bifurcación de ejecución yflatMap()
es una etapa de unión segura.
Por supuesto, los consejos anteriores sólo se aplican a Observable
s bloqueantes, que rara vez se ven en aplicaciones reales. Si tus Observable
s subyacentes ya son asíncronos, conseguir concurrencia es cuestión de entender cómo se combinan y cuándo se produce la suscripción. Por ejemplo, merge()
en dos flujos se suscribirá a ambos de forma concurrente, mientras que el operador concat()
espera a que termine el primer flujo antes de suscribirse al segundo.
Agrupar peticiones utilizando groupBy()
¿Te has dado cuenta de que RxGroceries.purchase()
toma productName
y quantity
aunque la cantidad fuera siempre una? ¿Y si nuestra lista de la compra tuviera algunos productos varias veces, indicando una mayor demanda? La primera implementación ingenua simplemente envía la misma solicitud -por ejemplo, de huevos- varias veces, cada vez pidiendo uno. Afortunadamente, podemos enviar por lotes estas peticiones de forma declarativa utilizando groupBy()
-y esto sigue funcionando con la concurrencia declarativa:
import
org.apache.commons.lang3.tuple.Pair
;
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"egg"
,
"milk"
,
"tomato"
,
"cheese"
,
"tomato"
,
"egg"
,
"egg"
)
.
groupBy
(
prod
->
prod
)
.
flatMap
(
grouped
->
grouped
.
count
()
.
map
(
quantity
->
{
String
productName
=
grouped
.
getKey
();
return
Pair
.
of
(
productName
,
quantity
);
}))
.
flatMap
(
order
->
store
.
purchase
(
order
.
getKey
(),
order
.
getValue
())
.
subscribeOn
(
schedulerA
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
Este código es bastante complejo, así que antes de revelar el resultado, vamos a repasarlo rápidamente. En primer lugar, agrupamos los productos simplemente por su nombre, así la función de identidad prod -> prod
. A cambio obtenemos un incómodo Observable<GroupedObservable<String, String>>
. No hay nada malo en ello. A continuación, flatMap()
recibe cada GroupedObservable<String, String>
, que representa todos los productos del mismo nombre. Así, por ejemplo, habrá un ["egg", "egg", "egg"]
Observable
allí con una clave "egg"
, también. Si groupBy()
utilizara una función de clave diferente, como prod.length()
, la misma secuencia tendría una clave 3
.
En este punto, dentro de flatMap()
necesitamos construir un Observable
de tipo Pair<String, Integer>
que represente cada producto único y su cantidad. Tanto count()
como map()
devuelven un Observable
, así que todo encaja perfectamente. En segundo lugar flatMap()
recibe order
de tipo Pair<String, Integer>
y realiza una compra, esta vez la cantidad puede ser mayor. El resultado parece perfecto; fíjate en que los pedidos más grandes son ligeramente más lentos, pero aun así es mucho más rápido que tener varias peticiones repetidas:
164 | Sched-A-0 | Purchasing 1 bread 165 | Sched-A-1 | Purchasing 1 butter 166 | Sched-A-2 | Purchasing 3 egg 166 | Sched-A-3 | Purchasing 1 milk 166 | Sched-A-4 | Purchasing 2 tomato 166 | Sched-A-5 | Purchasing 1 cheese 1151 | Sched-A-0 | Done 1 bread 1178 | Sched-A-1 | Done 1 butter 1180 | Sched-A-5 | Done 1 cheese 1183 | Sched-A-3 | Done 1 milk 1253 | Sched-A-4 | Done 2 tomato 1354 | Sched-A-2 | Done 3 egg
Si crees que tu sistema puede beneficiarse de la agrupación por lotes de una forma u otra, consulta "Agrupar y colapsar comandos".
Concurrencia declarativa con observeOn()
Lo creas o no, la concurrencia en RxJava puede describirse mediante dos operadores: los ya mencionados subscribeOn()
y observeOn()
. Parecen muy similares y resultan confusos para los recién llegados, pero su semántica es en realidad bastante clara y razonable.
subscribeOn()
permite elegir qué Scheduler
se utilizará para invocar OnSubscribe
(expresión lambda dentro de create()
). Por lo tanto, cualquier código dentro de create()
se empuja a un hilo diferente -por ejemplo, para evitar bloquear el hilo principal. A la inversa, observeOn()
controla qué Scheduler
se utiliza para invocar Subscriber
s posteriores que se producen después de observeOn()
. Por ejemplo, la llamada a create()
se produce en io()
Scheduler
(a través de subscribeOn(io())
) para evitar bloquear la interfaz de usuario.
Sin embargo, la actualización de los widgets de la interfaz de usuario debe ocurrir en el hilo de UI (tanto Swing como Android tienen esta restricción), así que usamos observeOn()
por ejemplo con AndroidSchedulers.mainThread()
antes de que los operadores o suscriptores cambien la UI. De esta forma podemos usar un Scheduler
para manejar create()
y todos los operadores hasta el primer observeOn()
, pero otro(s) para aplicar transformaciones. Esto se explica mejor con un ejemplo:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
x
->
log
(
"Found 1: "
+
x
))
.
observeOn
(
schedulerA
)
.
doOnNext
(
x
->
log
(
"Found 2: "
+
x
))
.
subscribe
(
x
->
log
(
"Got 1: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
observeOn()
se produce en algún punto de la cadena de tuberías, y esta vez, a diferencia de subscribeOn()
, la posición de observeOn()
es bastante importante. No importa qué Scheduler
estuviera ejecutando operadores por encima de observeOn()
(si los hubiera), todo lo que hay por debajo utiliza el suministrado Scheduler
. En este ejemplo, no hay subscribeOn()
, por lo que se aplica el valor por defecto (sin concurrencia):
23 | main | Starting 136 | main | Created 163 | main | Subscribed 163 | main | Found 1: A 163 | main | Found 1: B 163 | main | Exiting 163 | Sched-A-0 | Found 2: A 164 | Sched-A-0 | Got 1: A 164 | Sched-A-0 | Found 2: B 164 | Sched-A-0 | Got 1: B 164 | Sched-A-0 | Completed
Todos los operadores por encima de observeOn
se ejecutan dentro del hilo cliente, que resulta ser el predeterminado en RxJava. Pero por debajo de observeOn()
, los operadores se ejecutan dentro del Scheduler
suministrado. Esto se hará aún más evidente cuando tanto subscribeOn()
como varios observeOn()
se produzcan dentro de la cadena:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
x
->
log
(
"Found 1: "
+
x
))
.
observeOn
(
schedulerB
)
.
doOnNext
(
x
->
log
(
"Found 2: "
+
x
))
.
observeOn
(
schedulerC
)
.
doOnNext
(
x
->
log
(
"Found 3: "
+
x
))
.
subscribeOn
(
schedulerA
)
.
subscribe
(
x
->
log
(
"Got 1: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
¿Puedes predecir el resultado? Recuerda que todo lo que esté por debajo de observeOn()
se ejecuta dentro del Scheduler
suministrado , por supuesto hasta que se encuentre otro observeOn()
. Además subscribeOn()
puede ocurrir en cualquier lugar entre Observable
y subscribe()
, pero esta vez sólo afecta a los operadores hasta el primer observeOn()
:
21 | main | Starting 98 | main | Created 108 | main | Exiting 129 | Sched-A-0 | Subscribed 129 | Sched-A-0 | Found 1: A 129 | Sched-A-0 | Found 1: B 130 | Sched-B-0 | Found 2: A 130 | Sched-B-0 | Found 2: B 130 | Sched-C-0 | Found 3: A 130 | Sched-C-0 | Got: A 130 | Sched-C-0 | Found 3: B 130 | Sched-C-0 | Got: B 130 | Sched-C-0 | Completed
La suscripción se produce en schedulerA
porque es lo que especificamos en subscribeOn()
. También se ejecutó el operador "Found 1"
dentro de ese Scheduler
porque está antes del primer observeOn()
. Más adelante, la situación se vuelve más interesante. observeOn()
cambia el actual Scheduler
por schedulerB
, y "Found 2"
utiliza éste, en su lugar. El último observeOn(schedulerC)
afecta tanto al operador "Found 3"
como a Subscriber
. Recuerda que Subscriber
funciona en el contexto del último Scheduler
encontrado.
subscribeOn()
y observeOn()
funcionan realmente bien juntos cuando quieres desacoplar físicamente al productor (Observable.create()
) y al consumidor (Subscriber
). Por defecto, no existe tal desacoplamiento, y RxJava simplemente utiliza el mismo hilo. subscribeOn()
no es suficiente, simplemente elegimos un hilo diferente.observeOn()
es mejor, pero entonces bloqueamos el hilo del cliente en caso de Observable
s síncronos.
Como la mayoría de los operadores son no bloqueantes y las expresiones lambda utilizadas en su interior suelen ser cortas y baratas, normalmente sólo hay un subscribeOn()
y un observeOn()
en la cadena de operadores. subscribeOn()
puede colocarse cerca del Observable
original para mejorar la legibilidad, mientras que observeOn()
está cerca de subscribe()
para que sólo Subscriber
utilice ese Scheduler
especial, los demás operadores dependen del Scheduler
de subscribeOn()
.
Aquí tienes un programa más avanzado que aprovecha estos dos operadores:
log
(
"Starting"
);
Observable
<
String
>
obs
=
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onNext
(
"C"
);
subscriber
.
onNext
(
"D"
);
subscriber
.
onCompleted
();
});
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
.
flatMap
(
record
->
store
(
record
).
subscribeOn
(
schedulerB
))
.
observeOn
(
schedulerC
)
.
subscribe
(
x
->
log
(
"Got: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Donde store()
es una simple operación anidada:
Observable
<
UUID
>
store
(
String
s
)
{
return
Observable
.
create
(
subscriber
->
{
log
(
"Storing "
+
s
);
//hard work
subscriber
.
onNext
(
UUID
.
randomUUID
());
subscriber
.
onCompleted
();
});
}
La producción de eventos ocurre en schedulerA
, pero cada evento se procesa de forma independiente utilizando schedulerB
para mejorar la concurrencia, una técnica que aprendimos en "Concurrencia y comportamiento de subscribeOn()". La suscripción al final ocurre en otro schedulerC
. Estamos bastante seguros de que ya entiendes qué Scheduler
/hilo ejecutará qué acción, pero por si acaso (se han añadido líneas vacías para mayor claridad):
26 | main | Starting 93 | main | Created 121 | main | Exiting 122 | Sched-A-0 | Subscribed 124 | Sched-B-0 | Storing A 124 | Sched-B-1 | Storing B 124 | Sched-B-2 | Storing C 124 | Sched-B-3 | Storing D 1136 | Sched-C-1 | Got: 44b8b999-e687-485f-b17a-a11f6a4bb9ce 1136 | Sched-C-1 | Got: 532ed720-eb35-4764-844e-690327ac4fe8 1136 | Sched-C-1 | Got: 13ddf253-c720-48fa-b248-4737579a2c2a 1136 | Sched-C-1 | Got: 0eced01d-3fa7-45ec-96fb-572ff1e33587 1137 | Sched-C-1 | Completed
observeOn()
es especialmente importante para las aplicaciones con una UI para la que no queremos bloquear el hilo de dispersión de eventos de la UI. En Android (ver "Desarrollo Android con RxJava") o Swing, algunas acciones como la actualización de la UI deben ejecutarse dentro de un hilo específico. Pero hacer demasiado en ese hilo hace que tu UI no responda.
En estos casos, colocas observeOn()
cerca de subscribe()
para que el código dentro de la suscripción se invoque en el contexto de un Scheduler
concreto (como el hilo de la UI). Sin embargo, otras transformaciones, incluso las más baratas, deben ejecutarse fuera del hilo de la UI. En el servidor, observeOn()
se utiliza raramente porque la verdadera fuente de concurrencia está incorporada en la mayoría de los Observable
s. Esto nos lleva a una conclusión interesante: RxJava controla la concurrencia con sólo dos operadores (subscribeOn()
y observeOn()
), pero cuanto más utilices las extensiones reactivas, menos frecuentemente las verás en el código de producción.
Otros usos de los programadores
Hay numerosos operadores que utilizan por defecto algún Scheduler
. Normalmente, se utiliza Schedulers.computation()
si no se proporciona ninguno-JavaDoc siempre lo deja claro. Por ejemplo, el operador delay()
toma eventos aguas arriba y los empuja aguas abajo después de un tiempo determinado. Obviamente, no puede mantener el hilo original durante ese periodo, por lo que debe utilizar un Scheduler
diferente:
Observable
.
just
(
'A'
,
'B'
)
.
delay
(
1
,
SECONDS
,
schedulerA
)
.
subscribe
(
this
::
log
);
Sin suministrar un schedulerA
personalizado, todos los operadores por debajo de delay()
utilizarían el computation()
Scheduler
. No hay nada intrínsecamente malo en ello; sin embargo, si tu Subscriber
se bloquea en E/S, consumiría un Worker
del programador computation()
compartido globalmente, lo que posiblemente afectaría a todo el sistema. Otros operadores importantes que admiten Scheduler
personalizados son: interval()
, range()
, timer()
, repeat()
, skip()
, take()
, timeout()
, y varios otros que aún no se han introducido. Si no proporcionas un planificador a dichos operadores, se utiliza computation()
Scheduler
, que es un valor predeterminado seguro en la mayoría de los casos.
Dominar los programadores es esencial para escribir código escalable y seguro utilizando RxJava. La diferencia entre subscribeOn()
y observeOn()
es especialmente importante con cargas elevadas, en las que cada tarea debe ejecutarse exactamente cuando esperamos. En las aplicaciones verdaderamente reactivas, en las que todas las operaciones de larga duración son asíncronas, se necesitan muy pocos hilos y, por tanto, Scheduler
s. Pero siempre hay alguna API o dependencia que requiere código de bloqueo.
Por último, pero no por ello menos importante, debemos estar seguros de que Scheduler
s utilizados aguas abajo pueden seguir el ritmo de la carga generada por Scheduler
s aguas arriba. Pero este peligro se explicará con todo detalle en el Capítulo 6.
Resumen
En este capítulo se han descrito varios patrones de las aplicaciones tradicionales que pueden sustituirse con RxJava. Espero que a estas alturas comprendas que el comercio de alta frecuencia o el streaming de publicaciones de las redes sociales no son los únicos casos de uso de RxJava. De hecho, casi cualquier API puede sustituirse sin problemas por Observable
. Incluso si no quieres o no necesitas la potencia de las extensiones reactivas en este momento, te permitirá evolucionar la implementación sin introducir cambios incompatibles con el pasado. Además, es el cliente el que acaba cosechando todas las posibilidades que ofrece RxJava, como la pereza, la concurrencia declarativa o el encadenamiento asíncrono. Aún mejor, gracias a la conversión sin fisuras deObservable
a BlockingObservable
, los clientes tradicionales pueden consumir tu API como quieran, y tú siempre puedes proporcionar una sencilla capa puente.
Deberías tener bastante confianza con RxJava y comprender las ventajas de aplicarlo incluso en sistemas heredados. Sin duda, trabajar conObservable
s reactivos es más desafiante y tiene una curva de aprendizaje algo empinada, pero las ventajas y posibilidades de crecimiento simplemente no pueden exagerarse. ¿Imaginas que pudiéramos escribir aplicaciones enteras utilizando extensiones reactivas, de arriba abajo? Como un proyecto totalmente nuevo para el que tuviéramos control sobre cada API, interfaz y sistema externo. El Capítulo 5 tratará sobre cómo puedes escribir una aplicación así y cuáles son sus implicaciones.
1 De hecho, RxJava intenta permanecer en el mismo hilo mediante la afinidad de hilos en el modelo de bucle de eventos para aprovechar también esto.
2 Ver también "Patrón de mamparo y Fail-Fast"
3 Compáralo con la evaluación perezosa de expresiones en Haskell.
4 Obviamente, para cualquier proyecto real, utilizarás un sistema de registro de nivel de producción como Logback o Log4J 2.
Get Programación Reactiva con RxJava now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.