Capítulo 4. Aplicación de la programación reactiva a aplicaciones existentes

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

Introducir una nueva biblioteca, tecnología o paradigma en una aplicación, ya sea greenfield o una base de código heredada, debe ser una decisión cuidadosa. RxJava no es una excepción. En este capítulo, revisaremos algunos patrones y arquitecturas que se encuentran en las aplicaciones Java corrientes y veremos cómo puede ayudar Rx. Este proceso no es sencillo y requiere un importante cambio de mentalidad, por lo que nos transformaremos cuidadosamente del estilo imperativo al funcional y reactivo. Muchas bibliotecas de los proyectos Java actuales simplemente añaden hinchazón sin aportar nada a cambio. Sin embargo, verás cómo RxJava no sólo simplifica los proyectos tradicionales, sino qué tipo de ventajas aporta a las plataformas heredadas.

Estoy bastante seguro de que ya estás muy entusiasmado con RxJava. Los operadores incorporados y su simplicidad hacen de Rx una herramienta asombrosamente potente para transformar flujos de eventos. Sin embargo, si mañana vuelves a tu oficina, te darás cuenta de que no hay flujos, ni eventos en tiempo real de la bolsa de valores. Apenas puedes encontrar eventos en tus aplicaciones; es sólo un amasijo de peticiones web, bases de datos y API externas. Estás tan ansioso por probar esta nueva cosa RxJava en algún lugar más allá de Hello world. Sin embargo, parece que simplemente no hay casos de uso en la vida real que justifiquen el uso de Rx. Aun así, RxJava puede ser un importante paso adelante en términos de coherencia arquitectónica y robustez. No necesitas comprometerte con el estilo reactivo de arriba abajo; esto es demasiado arriesgado y requiere demasiado trabajo al principio. Pero Rx puede introducirse en cualquier capa, sin romper una aplicación en su conjunto.

Te llevaremos a través de algunos patrones de aplicación comunes y las formas en que puedes mejorarlos con RxJava de forma no invasiva, centrándonos en la consulta de bases de datos, el almacenamiento en caché, la gestión de errores y las tareas periódicas. Cuanto más RxJava añadas en diversos lugares de tu pila, más coherente será tu arquitectura.

De las colecciones a los observables

A menos que tu plataforma se haya construido recientemente en frameworks de JVM como Play, Akka actors, o quizás Vert.x, probablemente estés en una pila con un contenedor de servlets por un lado, y JDBC o servicios web por otro. Entre ambos, hay un número variable de capas que implementan la lógica de negocio, que no refactorizaremos todas a la vez; en su lugar, empecemos con un ejemplo sencillo. La siguiente clase representa un repositorio trivial que nos abstrae de una base de datos:

class PersonDao {

    List<Person> listPeople() {
        return query("SELECT * FROM PEOPLE");
    }

    private List<Person> query(String sql) {
        //...
    }

}

Dejando a un lado los detalles de implementación, ¿cómo se relaciona esto con Rx? Hasta ahora hemos estado hablando de eventos asíncronos empujados desde sistemas ascendentes o, en el mejor de los casos, cuando alguien se suscribe. ¿Qué relevancia tiene aquí el mundano Dao?Observable no es sólo una tubería que empuja eventos aguas abajo. Puedes tratar Observable<T> como una estructura de datos, dual a Iterable<T>. Ambas contienen elementos de tipo T, pero proporcionan una interfaz radicalmente distinta. Por tanto, no debería sorprenderte que puedas sustituir simplemente una por otra:

Observable<Person> listPeople() {
    final List<Person> people = query("SELECT * FROM PEOPLE");
    return Observable.from(people);
}

En este punto, hemos realizado un cambio de ruptura en la API existente. Dependiendo de lo grande que sea tu sistema, esa incompatibilidad podría ser un problema importante. Por eso, es importante que incorpores RxJava a tu API lo antes posible. Obviamente, estamos trabajando con una aplicación existente, así que no puede ser el caso.

BloqueoObservable: Salir del mundo reactivo

Si estás combinando RxJava con código existente, bloqueante e imperativo, puede que tengas que traducir Observable a una colección simple. Esta transformación es bastante desagradable, ya que requiere bloquearse en Observable a la espera de que se complete. Hasta que Observable no se complete, no podremos crear una colección.BlockingObservable es un tipo especial que facilita el trabajo con Observable en un entorno no reactivo.BlockingObservable debería ser tu última opción cuando trabajes con RxJava, pero es inevitable cuando se combina código bloqueante y no bloqueante.

En el capítulo 3, refactorizamos el método listPeople() para que devolviera Observable<People> en lugar de List.Observable no es un Iterable en ningún sentido, por lo que nuestro código ya no compila. Queremos dar pasos de bebé en lugar de una refactorización masiva, así que mantengamos el alcance de los cambios lo más reducido posible. El código del cliente podría tener este aspecto:

List<Person> people = pesonDao.listPeople();
String json = marshal(people);

Podemos imaginar el método marshal() extrayendo datos de la colección peopley serializándolos a JSON. Eso ya no es así, no podemos simplemente tirar de elementos de Observable cuando queramos.Observable se encarga de producir(empujar) elementos y notificar a los suscriptores si los hay. Este cambio radical puede sortearse fácilmente con BlockingObservable. Esta práctica clase es totalmente independiente de Observable y puede obtenerse mediante el método Observable.toBlocking(). La variante bloqueante de Observable tiene métodos superficialmente similares, como single() o subscribe(). Sin embargo, BlockingObservable es mucho más conveniente en entornos bloqueantes que no están preparados intrínsecamente para la naturaleza asíncrona de Observable. Los operadores de BlockingObservable suelen bloquearse (esperar) hasta que se completa el Observable subyacente. Esto contradice fuertemente el concepto principal de Observables de que todo es probablemente asíncrono, perezoso y se procesa sobre la marcha. Por ejemplo, Observable.forEach() recibirá asíncronamente los eventos de Observable a medida que lleguen, mientras que BlockingObservable.forEach() se bloqueará hasta que se procesen todos los eventos y se complete el flujo. Además, las excepciones ya no se propagan como valores (eventos), sino que se vuelven a lanzar en el hilo de llamada.

En nuestro caso, queremos volver a transformar Observable<Person> en List<Person> para limitar el alcance de la refactorización:

Observable<Person> peopleStream = personDao.listPeople();
Observable<List<Person>> peopleList = peopleStream.toList();
BlockingObservable<List<Person>> peopleBlocking = peopleList.toBlocking();
List<Person> people = peopleBlocking.single();

He dejado intencionadamente explícitos todos los tipos intermedios para explicar lo que ocurre. Tras la refactorización a Rx, nuestra API devuelveObservable<Person> peopleStream. Este flujo puede ser potencialmente totalmente reactivo, asíncrono y dirigido por eventos, lo que no se ajusta en absoluto a lo que necesitamos: un List estático. Como primer paso, convertimos Observable<Person>en Observable<List<Person>>. Este operador perezoso almacenará en búfer todos los eventosPerson y los mantendrá en memoria hasta que se reciba el evento onCompleted(). En ese momento, se emitirá un único evento de tipo List<Person>, que contendrá todos los eventos vistos a la vez, como se ilustra en el siguiente diagrama de mármol:

image

El flujo resultante se completa inmediatamente después de emitir un único elemento List. De nuevo, este operador es asíncrono; no espera a que lleguen todos los eventos, sino que almacena perezosamente todos los valores. El incómodo aspecto de Observable<List<Person>> peopleList se convierte entonces enBlockingObservable<List<Person>> peopleBlocking. BlockingObservable es una buena idea sólo cuando debes proporcionar una vista estática y bloqueante de tu, por lo demás, asíncrono Observable. Mientras que Observable.from(List<T>) convierte la colección normal basada en pull en Observable, toBlocking() hace algo totalmente opuesto. Quizá te preguntes por qué necesitamos dos abstracciones para operadores bloqueantes y no bloqueantes. Los autores de RxJava se dieron cuenta de que ser explícito sobre la naturaleza síncrona o asíncrona del operador subyacente es demasiado crucial como para dejarlo para JavaDoc. Tener dos tipos no relacionados garantiza que siempre trabajes con la estructura de datos adecuada. Además, BlockingObservable es tu arma de último recurso; normalmente, deberías componer y encadenar Observables sencillos el mayor tiempo posible. Sin embargo, a efectos de este ejercicio, vamos a escapar de Observable de inmediato. El último operador single() abandona por completo los observables y extrae un elemento, y sólo uno, que esperamos recibir deBlockingObservable<T>. Un operador similar, first(), devolverá un valor de T y descartará lo que le quede. single(), por otro lado, se asegura de que no haya más eventos pendientes en el Observable subyacente antes de terminar. Esto significa que single() se bloqueará esperando la devolución de llamada de onCompleted(). Aquí tienes el mismo fragmento de código que antes, esta vez con todos los operadores encadenados:

List<Person> people = personDao
    .listPeople()
    .toList()
    .toBlocking()
    .single();

Puede que pienses que nos hemos tomado la molestia de envolver y desenvolver Observable sin ninguna razón aparente. Recuerda que éste ha sido sólo el primer paso. La siguiente transformación introducirá algo de pereza. Nuestro código, tal y como está ahora, siempre ejecuta query("...") y lo envuelve con Observable. Como ya sabrás a estas alturas, los Observable(especialmente los fríos) son perezosos por definición. Mientras nadie se suscriba, sólo representan un flujo que nunca tuvo la oportunidad de empezar a emitir valores. La mayoría de las veces puedes llamar a métodos que devuelvan Observable y, mientras no te suscribas, no se hará ningún trabajo. Observable es como un Future porque promete que un valor aparecerá en el futuro. Pero mientras no lo solicites, un Observable frío ni siquiera empezará a emitir. Desde esa perspectiva, Observable es más parecido a java.util.function.Supplier<T>, generando valores del tipo T bajo demanda. Los Observableen caliente son diferentes porque emiten valores tanto si estás escuchando como si no, pero no los estamos considerando ahora. La mera existencia de Observable no indica un trabajo en segundo plano ni ningún efecto secundario, a diferencia de Future, que casi siempre sugiere alguna operación ejecutándose simultáneamente.

Abrazar la pereza

Entonces, ¿cómo hacemos que nuestro Observable sea perezoso? La técnica más sencilla es envolver un Observable ansioso con defer():

public Observable<Person> listPeople() {
    return Observable.defer(() ->
        Observable.from(query("SELECT * FROM PEOPLE")));
}

Observable.defer() toma una expresión lambda (una fábrica) que puede producir Observable. La subyacente Observable es ansiosa, por lo que queremos posponer su creación. defer() esperará hasta el último momento posible para crear realmente Observable; es decir, hasta que alguien se suscriba realmente a ella. Esto tiene algunas implicaciones interesantes. ComoObservable es perezoso, llamar a listPeople() no tiene efectos secundarios y casi no afecta al rendimiento. Todavía no se consulta ninguna base de datos. Puedes tratar Observable<Person> como una promesa, pero sin que se produzca todavía ningún procesamiento en segundo plano. Observa que no hay comportamiento asíncrono en este momento, sólo evaluación perezosa. Esto es similar a cómo los valores en el lenguaje de programación Haskell se evalúan perezosamente sólo cuando es absolutamente necesario.

Si nunca has programado en lenguajes funcionales, puede que estés bastante confuso sobre por qué la pereza es tan importante y rompedora. Resulta que ese comportamiento es bastante útil y puede mejorar bastante la calidad y la libertad de tu implementación. Por ejemplo, ya no tienes que prestar atención a qué recursos se obtienen, cuándo y en qué orden. RxJava los cargará sólo cuando sean absolutamente necesarios.

Como ejemplo, tomemos este trivial mecanismo de retroceso que todos hemos visto tantas veces:

void bestBookFor(Person person) {
    Book book;
    try {
        book = recommend(person);
    } catch (Exception e) {
        book = bestSeller();
    }
    display(book.getTitle());
}

void display(String title) {
    //...
}

Probablemente pienses que no hay nada malo en tal construcción. En este ejemplo, intentamos recomendar el mejor libro para una persona determinada, pero en caso de fallo, nos degradamos con elegancia y mostramos el más vendido. Se supone que obtener un best seller es más rápido y puede almacenarse en caché. Pero, ¿y si pudieras añadir la gestión de errores de forma declarativa, de modo que los bloques try-catch no estuvieran ocultando la lógica real?

void bestBookFor(Person person) {
    Observable<Book> recommended = recommend(person);
    Observable<Book> bestSeller = bestSeller();
    Observable<Book> book = recommended.onErrorResumeNext(bestSeller);
    Observable<String> title = book.map(Book::getTitle);
    title.subscribe(this::display);
}

De momento sólo estamos explorando RxJava, por lo que he dejado todos estos valores y tipos intermedios. En la vida real, bestBookFor() se parecería más a esto:

void bestBookFor(Person person) {
    recommend(person)
            .onErrorResumeNext(bestSeller())
            .map(Book::getTitle)
            .subscribe(this::display);
}

Este código es maravillosamente conciso y legible. Primero busca una recomendación para person. En caso de error (onErrorResumeNext), sigue con un superventas. Independientemente de cuál haya tenido éxito, map devuelve un valor extrayendo el título y luego lo muestra. onErrorResumeNext() es un potente operador que intercepta las excepciones que se producen aguas arriba, se las traga y se suscribe a la copia de seguridad proporcionada Observable. Así es como Rx implementa una cláusula try-catch. Dedicaremos mucho más tiempo a la gestión de errores más adelante en este libro (véase "Sustitución declarativa try-catch"). De momento, fíjate en cómo podemos llamar perezosamente a bestSeller() sin preocuparnos de que la búsqueda de best seller se produzca incluso cuando una recomendación real haya ido bien.

Composición de observables

SELECT * FROM PEOPLE no es realmente una consulta SQL de última generación. En primer lugar, no debes obtener todas las columnas a ciegas, pero obtener todas las filas es aún más perjudicial. Nuestra antigua API no es capaz de paginar los resultados, visualizando sólo un subconjunto de una tabla. Podría tener este aspecto, de nuevo en una aplicación empresarial tradicional:

List<Person> listPeople(int page) {
    return query(
            "SELECT * FROM PEOPLE ORDER BY id LIMIT ? OFFSET ?",
            PAGE_SIZE,
            page * PAGE_SIZE
    );
}

Esto no es un libro de SQL, así que vamos a dejar a un lado los detalles de implementación. El autor de esta API fue despiadado: no tenemos libertad para elegir ningún rango de registros, sólo podemos operar sobre números de página basados en 0. Sin embargo, en RxJava, por pereza, podemos simular la lectura de toda una base de datos a partir de una página determinada:

import static rx.Observable.defer;
import static rx.Observable.from;


Observable<Person> allPeople(int initialPage) {
    return defer(() -> from(listPeople(initialPage)))
            .concatWith(defer(() ->
                    allPeople(initialPage + 1)));
}

Este fragmento de código carga perezosamente la página inicial de registros de la base de datos, por ejemplo 10 elementos. Si nadie se suscribe, ni siquiera se invoca esta primera consulta. Si hay un suscriptor que sólo consume unos pocos elementos iniciales (por ejemplo, allPeople(0).take(3)), RxJava se dará de baja automáticamente de nuestro flujo y no se ejecutarán más consultas. Entonces, ¿qué ocurre cuando solicitamos, digamos, 11 elementos, pero la primera llamada a listPeople() sólo devuelve 10? Bueno, RxJava se da cuenta de que el Observableinicial está agotado pero el consumidor sigue hambriento. Por suerte, ve el operadorconcatWith(), que básicamente dice: cuando se complete el Observable de la izquierda, en lugar de propagar la notificación de finalización a los suscriptores, suscríbete al Observable de la derecha y continúa como si no pasara nada, como se muestra en el siguiente diagrama de mármol:

image

En otras palabras, concatWith() puede unir dos Observables de modo que cuando el primero finaliza, el segundo toma el relevo. Ena.concatWith(b).subscribe(...), el suscriptor recibirá primero todos los eventos de a, seguidos de todos los eventos de b. En este caso, el suscriptor recibe primero 10 elementos iniciales, seguidos de otros 10. Sin embargo, fíjate bien, ¡hay una supuesta recursividad infinita en nuestro código!allPeople(initialPage) llama a allPeople(initialPage + 1) sin ninguna condición de parada. Esta es una receta para StackOverflowError en la mayoría de los lenguajes, pero no aquí. De nuevo, llamar aallPeople() es siempre perezoso, por lo tanto, en el momento en que dejes de escuchar (desuscribirte), esta recursividad habrá terminado. Técnicamente, concatWith() todavía puede producir StackOverflowError aquí. Espera a "Respetar la cantidad de datos solicitada", aprenderás cómo tratar la demanda variable de datos entrantes.

La técnica de cargar perezosamente los datos trozo a trozo es bastante útil porque te permite concentrarte en la lógica empresarial, no en la fontanería de bajo nivel. Ya vemos algunas ventajas de aplicar RxJava incluso a pequeña escala. Diseñar una API pensando en Rx no influye en toda la arquitectura, porque siempre podemos recurrir a BlockingObservable y a las colecciones de Java. Pero es mejor disponer de un amplio abanico de posibilidades que podamos recortar si es necesario.

Paginación perezosa y concatenación

Hay más formas de implementar la paginación perezosa con RxJava. Si lo piensas, la forma más sencilla de cargar datos paginados es cargarlo todo y luego coger lo que necesitemos. Parece una tontería, pero gracias a la paginación perezosa es factible. Primero generamos todos los números de página posibles y luego solicitamos cargar todas y cada una de las páginas individualmente:

Observable<List<Person>> allPages = Observable
            .range(0, Integer.MAX_VALUE)
            .map(this::listPeople)
            .takeWhile(list -> !list.isEmpty());

Si esto no fuera RxJava, el código anterior llevaría una enorme cantidad de tiempo y de memoria, básicamente cargando toda la base de datos en memoria. Pero como Observable es perezoso, aún no ha aparecido ninguna consulta a la base de datos. Además, si encontramos una página vacía, significa que todas las demás páginas también lo están (hemos llegado al final de la tabla). Por lo tanto, utilizamos takeWhile() en lugar de filter(). Para aplanar allPages a Observable<Person> podemos utilizar concatMap() (véase "Preservar el orden utilizando concatMap()"):

Observable<Person> people = allPages.concatMap(Observable::from);

concatMap() requiere una transformación de List<Person> a Observable<Person>, ejecutada para cada página. También podemos probar con concatMapIterable(), que hace lo mismo, pero la transformación debe devolver un Iterable<Person> para cada valor ascendente (que resulta ser ya Iterable<Person> ):

Observable<Person> people = allPages.concatMapIterable(page -> page);

Independientemente del enfoque que elijas, todas las transformaciones del objeto Person son perezosas. Siempre que limites el número de registros que quieres procesar (por ejemplo, con people.take(15)), Observable<Person> invocará a listPeople() lo más tarde posible.

Concurrencia imperativa

No suelo ver concurrencia explícita en las aplicaciones empresariales. La mayoría de las veces, una única solicitud es gestionada por un único subproceso. El mismo hilo hace lo siguiente

  • Acepta la conexión TCP/IP

  • Analiza las solicitudes HTTP

  • Llama a un controlador o servlet

  • Bloquea la llamada a la base de datos

  • Resultados de los procesos

  • Codifica la respuesta (por ejemplo, en JSON)

  • Envía bytes sin procesar al cliente

Este modelo por capas afecta a la latencia del usuario cuando el backend realiza varias peticiones independientes, por ejemplo, a la base de datos. Se realizan de forma secuencial, cuando se podrían paralelizar fácilmente. Además, la escalabilidad se ve afectada. Por ejemplo, en Tomcat hay 200 hilos por defecto en los ejecutores que se encargan de gestionar las peticiones. Esto significa que no podemos manejar más de 200 conexiones concurrentes. En caso de una repentina pero breve ráfaga de tráfico, las conexiones entrantes se ponen en cola y el servidor responde con mayor latencia. Sin embargo, esta situación no puede durar eternamente, y Tomcat acabará por empezar a rechazar el tráfico entrante. Dedicaremos gran parte del próximo capítulo (ver "Servidor HTTP no bloqueante con Netty y RxNetty") a cómo solucionar esta deficiencia bastante embarazosa. De momento, quedémonos con la arquitectura tradicional. Ejecutar cada paso de la gestión de solicitudes dentro de un único hilo tiene algunas ventajas, por ejemplo, una mejor localización de la caché y una mínima sobrecarga de sincronización.1 Por desgracia, en las aplicaciones clásicas, como la latencia global es la suma de las latencias de cada capa, un componente que funcione mal puede tener un impacto negativo en la latencia total.2 Además, a veces hay muchos pasos que son independientes entre sí y pueden ejecutarse simultáneamente. Por ejemplo, invocamos varias API externas o ejecutamos varias consultas SQL independientes.

El JDK tiene un soporte bastante bueno para la concurrencia, especialmente desde Java 5 con ExecutorService y Java 8 con CompletableFuture. Sin embargo, no se utiliza tan ampliamente como podría. Por ejemplo, veamos el siguiente programa sin ningún tipo de concurrencia:

Flight lookupFlight(String flightNo) {
    //...
}

Passenger findPassenger(long id) {
    //...
}

Ticket bookTicket(Flight flight, Passenger passenger) {
    //...
}

SmtpResponse sendEmail(Ticket ticket) {
    //...
}

Y en el lado del cliente:

Flight flight = lookupFlight("LOT 783");
Passenger passenger = findPassenger(42);
Ticket ticket = bookTicket(flight, passenger);
sendEmail(ticket);

De nuevo, un código de bloqueo bastante típico y clásico, similar al que puedes encontrar en muchas aplicaciones. Pero si te fijas bien desde el punto de vista de la latencia, el fragmento de código anterior tiene cuatro pasos; sin embargo, los dos primeros son independientes entre sí. Sólo el tercer paso (bookTicket()) necesita los resultados de lookupFlight() yfindPassenger(). Existe una oportunidad evidente de aprovechar la concurrencia. Sin embargo, muy pocos desarrolladores seguirán realmente este camino porque requiere incómodos grupos de hilos, Futures y devoluciones de llamada. ¿Y si la API ya fuera compatible con Rx? Recuerda que puedes simplemente envolver el código bloqueante heredado en Observable, como hicimos al principio de este capítulo:

Observable<Flight> rxLookupFlight(String flightNo) {
    return Observable.defer(() ->
            Observable.just(lookupFlight(flightNo)));
}

Observable<Passenger> rxFindPassenger(long id) {
    return Observable.defer(() ->
            Observable.just(findPassenger(id)));
}

Semánticamente, los métodos de rx- hacen exactamente lo mismo y de la misma forma; es decir, son bloqueantes por defecto. Todavía no hemos ganado nada, aparte de una API más verbosa desde la perspectiva del cliente:

Observable<Flight> flight = rxLookupFlight("LOT 783");
Observable<Passenger> passenger = rxFindPassenger(42);
Observable<Ticket> ticket =
        flight.zipWith(passenger, (f, p) -> bookTicket(f, p));
ticket.subscribe(this::sendEmail);

Tanto los programas de bloqueo tradicionales como el que tiene Observable funcionan exactamente igual. Es más perezoso por defecto, pero el orden de las operaciones es esencialmente el mismo. Primero, creamos Observable<Flight>, que como ya sabes, no hace nada por defecto. A menos que alguien pida explícitamente un Flight, este Observable es sólo un perezoso marcador de posición. Ya aprendimos que ésta es una valiosa propiedad de los Observablefríos. Lo mismo ocurre con Observable<Passenger>; tenemos dos marcadores de posición del tipoFlight y Passenger, pero aún no se ha realizado ningún efecto secundario. No hay consulta a la base de datos ni llamada al servicio web. Si decidimos detener el procesamiento aquí, no se ha realizado ningún trabajo superfluo.

Para proceder con bookTicket(), necesitamos instancias concretas tanto de Flight como de Passenger. Resulta tentador limitarse a bloquear en estas dos Observables utilizando el operador toBlocking(). Sin embargo, nos gustaría evitar el bloqueo en la medida de lo posible para reducir el consumo de recursos (especialmente de memoria) y permitir una mayor concurrencia. Otra solución poco adecuada es.subscribe() en las flight y passenger Observable s y esperar de algún modo a que finalicen ambas devoluciones de llamada. Es bastante sencillo cuandoObservable está bloqueando, pero si las devoluciones de llamada aparecen de forma asíncrona y necesitas sincronizar algún estado global esperando a ambas, esto se convierte rápidamente en una pesadilla. Además, un subscribe() anidado no es idiomático, y normalmente quieres una única suscripción para un flujo de mensajes (caso de uso). La única razón por la que las devoluciones de llamada funcionan de forma algo decente en JavaScript es porque sólo hay un hilo. La forma idiomática de suscribirse a varios Observables al mismo tiempo es zip y zipWith. Podrías percibir zip como una forma de unir dos flujos de datos independientes por pares. Pero mucho más a menudo, zip se utiliza simplemente para unir dos Observables de un solo elemento.ob1.zip(ob2).subscribe(...) significa esencialmente que se recibe un evento cuando tanto ob1 como ob2 han terminado (emiten un evento por sí solas). Así que siempre que veas zip, lo más probable es que alguien simplemente esté haciendo un paso de uniónen dos o más Observables que tenían caminos de ejecución bifurcados.zip es una forma de esperar asíncronamente dos o más valores, sin importar cuál aparezca en último lugar.

Así que volvamos a flight.zipWith(passenger, this::bookTicket) (una sintaxis más corta que utiliza referencia a método en lugar de lambda explícita, como en el ejemplo de código). La razón por la que mantengo toda la información de tipos en lugar de unir expresiones de forma fluida es porque quiero que prestes atención a los tipos de retorno. flight.zipWith(passenger, ...) no invoca simplemente una llamada de retorno cuando flight y passenger han terminado; devuelve un nuevoObservable que deberías reconocer inmediatamente como un marcador de posición perezoso para datos. Sorprendentemente, en este momento tampoco se ha iniciado ningún cálculo. Simplemente envolvimos unas cuantas estructuras de datos, pero no se desencadenó ningún comportamiento. Mientras nadie se suscriba a Observable<Ticket>, RxJava no ejecutará ningún código backend. Esto es lo que ocurre finalmente en la última declaración: ticket.subscribe() solicita explícitamente Ticket.

¿Dónde suscribirse?

Presta atención a dónde ves subscribe() en el código del dominio. A menudo, tu lógica empresarial sólo está componiendo Observables hasta el final y devolviéndolos a algún tipo de framework o capa de andamiaje. La suscripción real ocurre entre bastidores en un framework web o en algún código de pegamento. No es una mala práctica llamar tú mismo a subscribe(), pero intenta empujarlo hacia fuera tanto como sea posible.

Para comprender el flujo de ejecución, es útil mirar de abajo arriba. Nos suscribimos a ticket, por lo que RxJava debe suscribirse de forma transparente tanto a flight como a passenger. En este punto ocurre la verdadera lógica. Como ambos Observableestán fríos y aún no hay concurrencia implicada, la primera suscripción a flight invoca al método de bloqueo lookupFlight() justo en el hilo de llamada. Cuando lookupFlight() termina, RxJava puede suscribirse a passenger. Sin embargo, ya ha recibido una instancia de Flight de flight síncrono. rxFindPassenger() llama a findPassenger() de forma bloqueante y recibe una instancia de Passenger. En este momento, los datos vuelven a fluir en sentido descendente. Las instancias de Flight y Passenger se combinan utilizando la lambda proporcionada (bookTicket) y se pasan a ticket.subscribe().

Esto parece mucho trabajo teniendo en cuenta que se comporta y funciona esencialmente igual que nuestro código de bloqueo del principio. Pero ahora podemos aplicar la concurrencia de forma declarativa sin cambiar ninguna lógica. Si nuestros métodos de negocio devolvieran Future<Flight> (oCompletableFuture<Flight>, en realidad no importa), se habrían tomado dos decisiones por nosotros:

  • La invocación subyacente de lookupFlight() ya ha comenzado y no hay lugar para la pereza. No bloqueamos dicho método, pero el trabajo ya ha comenzado.

  • No tenemos control alguno sobre la concurrencia, es la implementación del método la que decide si se invoca una tarea Future en un grupo de hilos, un nuevo hilo por petición, etc.

RxJava da más control a los usuarios. El hecho de que Observable<Flight>no se implementara pensando en la concurrencia no significa que no podamos aplicarla más adelante. Los Observabledel mundo real ya suelen ser asíncronos, pero en raras ocasiones hay que añadir concurrencia a un Observable existente. Los consumidores de nuestra API, no los implementadores, son libres de elegir el mecanismo de subprocesamiento en el caso del Observable síncrono. Todo esto se consigue utilizando el operador subscribeOn():

Observable<Flight> flight =
    rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
Observable<Passenger> passenger =
    rxFindPassenger(42).subscribeOn(Schedulers.io());

En cualquier momento antes de suscribirnos, podemos inyectar el operador subscribeOn() y proporcionar una instancia llamada Scheduler. En este caso, he utilizado el método de fábrica Schedulers.io(), pero también podemos utilizar un ExecutorService personalizado y envolverlo rápidamente con Scheduler. Cuando se produce la suscripción, la expresión lambda pasada aObservable.create() se ejecuta dentro del Scheduler suministrado en lugar de en el hilo del cliente. Todavía no es necesario, pero examinaremos los programadores en profundidad en la sección "¿Qué es un programador?". De momento, trata a un Scheduler como un grupo de hilos.

¿Cómo cambia Scheduler el comportamiento en tiempo de ejecución de nuestro programa? Recuerda que el operador zip() se suscribe a dos o másObservables y espera pares (o tuplas). Cuando la suscripción se produce de forma asíncrona, todos los Observables ascendentes pueden llamar a su código de bloqueo subyacente de forma concurrente. Si ahora ejecutas tu programa, lookupFlight() y findPassenger() comenzarán a ejecutarse inmediata y concurrentemente cuando se invoque a ticket.subscribe(). Entonces,bookTicket() se aplicará en cuanto el más lento de los mencionados Observables emita un valor.

Hablando de lentitud, también puedes aplicar declarativamente un tiempo de espera, cuando un determinado Observable no emita ningún valor en el tiempo especificado:

rxLookupFlight("LOT 783")
    .subscribeOn(Schedulers.io())
    .timeout(100, TimeUnit.MILLISECONDS)

Como siempre, en caso de errores, éstos se propagan aguas abajo en lugar de lanzarse arbitrariamente. Así que si el método lookupFlight() tarda más de 100 milisegundos, acabarás con TimeoutException en lugar de un valor emitido enviado aguas abajo a cada suscriptor. El operador timeout() se explica exhaustivamente en "Temporización cuando no se producen eventos".

Acabamos con dos métodos que se ejecutan concurrentemente sin mucho esfuerzo, suponiendo que tu API ya esté orientada a Rx. Pero hemos hecho un poco de trampa, ya que bookTicket() sigue devolviendo Ticket, lo que definitivamente significa que es bloqueante. Aunque reservar billete fuera extremadamente rápido, merece la pena declararlo como tal, sólo para facilitar la evolución de la API. La evolución podría significar añadir concurrencia o utilizarla en entornos totalmente no bloqueantes (consulta el Capítulo 5). Recuerda que convertir una API no bloqueante en una bloqueante es tan fácil como llamar a toBlocking(). Lo contrario suele ser complicado y requiere muchos recursos adicionales. Además, es muy difícil predecir la evolución de métodos como rxBookTicket(), si alguna vez tocan la red o el sistema de archivos, por no hablar de la base de datos, merece la pena envolverlos con un Observable que indique la posible latencia a nivel de tipo:

Observable<Ticket> rxBookTicket(Flight flight, Passenger passenger) {
    //...
}

Pero ahora zipWith() devuelve un incómodo Observable<Observable<Ticket>> y el código ya no compila. Una buena regla general es que siempre que veas un tipo con doble envoltorio (por ejemplo Optional<Optional<...>>) falta una invocación a flatMap() en alguna parte. Ese es también el caso aquí. zipWith() toma un par (o más generalmente una tupla) de eventos, aplica una función tomando estos eventos como argumentos, y pone el resultado en el Observable descendente tal cual. Por eso vimos primero Observable<Ticket>pero ahora es Observable<Observable<Ticket>>, dondeObservable<Ticket> es el resultado de nuestra función suministrada. Hay dos formas de superar este problema. Una forma es utilizando un par intermedio devuelto porzipWith:

import org.apache.commons.lang3.tuple.Pair;

Observable<Ticket> ticket = flight
        .zipWith(passenger, (Flight f, Passenger p) -> Pair.of(f, p))
        .flatMap(pair -> rxBookTicket(pair.getLeft(), pair.getRight()));

Si utilizar un Pair explícito de una biblioteca de terceros no oscureciera lo suficiente el flujo, en realidad funcionaría la referencia a métodos: Pair::of, pero, de nuevo, decidimos que la información de tipo visible es más valiosa que ahorrar unas pulsaciones. Al fin y al cabo, leemos código mucho más tiempo del que lo escribimos. Una alternativa a un par intermedio es aplicar un flatMap con una función de identidad:

Observable<Ticket> ticket = flight
        .zipWith(passenger, this::rxBookTicket)
        .flatMap(obs -> obs);

Esta expresión lambda obs -> obs aparentemente no hace nada, al menos si fuera un operador map(). Pero recuerda que flatMap() aplica una función a cada valor dentro de Observable, por lo que esta función tomaObservable<Ticket> como argumento en nuestro caso. Después, el resultado no se coloca directamente en el flujo resultante, como con map(). En su lugar, el valor de retorno (de tipo Observable<T>) se "aplana", dando lugar a un Observable<T> en lugar de Observable<Observable<T>>. Cuando se trata de programadores, el operador flatMap() se vuelve aún más potente. Puede que percibas flatMap() como un mero truco sintáctico para evitar un problema de Observable<Observable<...>> anidado, pero es mucho más fundamental que esto.

Casos de uso de Observable.subscribeOn()

Es tentador pensar que subscribeOn() es la herramienta adecuada para la concurrencia en RxJava. Este operador funciona, pero no deberías ver con frecuencia el uso de subscribeOn() (y aún por describir observeOn()). En la vida real, Observables proceden de fuentes asíncronas, por lo que la programación personalizada no es necesaria en absoluto. Utilizamos subscribeOn() a lo largo de este capítulo para mostrar explícitamente cómo actualizar las aplicaciones existentes para que utilicen los principios reactivos de forma selectiva. Pero en la práctica, Schedulers y subscribeOn() son armas de último recurso, no algo que se vea habitualmente.

flatMap() como operador de encadenamiento asíncrono

En nuestra aplicación de ejemplo, ahora debemos enviar una lista de Tickets por correo electrónico. Pero debemos tener en cuenta lo siguiente:

  1. La lista puede ser potencialmente muy larga.

  2. Enviar un correo electrónico puede llevar varios milisegundos o incluso segundos.

  3. La aplicación debe seguir funcionando con elegancia en caso de fallos, pero informar al final de qué tickets no se entregaron.

El último requisito descarta rápidamente el simpletickets.forEach(this::sendEmail), porque lanza una excepción y no continúa con las entradas que aún no se han entregado. En realidad, las excepciones son una desagradable puerta trasera al sistema de tipos y, al igual que las devoluciones de llamada, no son muy amigables cuando quieres gestionarlas de una forma más robusta. Por eso RxJava las modela explícitamente como notificaciones especiales, pero ten paciencia, ya llegaremos a eso. Teniendo en cuenta el requisito de gestión de errores, nuestro código se parece más o menos a esto:

List<Ticket> failures = new ArrayList<>();
for(Ticket ticket: tickets) {
    try {
        sendEmail(ticket);
    } catch (Exception e) {
        log.warn("Failed to send {}", ticket, e);
        failures.add(ticket);
    }
}

Sin embargo, no se abordan los dos primeros requisitos o directrices. No hay ninguna razón para que sigamos enviando correos electrónicos de un hilo secuencialmente. Tradicionalmente, podríamos utilizar un ExecutorService pool para ello, enviando cada correo electrónico como una tarea independiente:

List<Pair<Ticket, Future<SmtpResponse>>> tasks = tickets
    .stream()
    .map(ticket -> Pair.of(ticket, sendEmailAsync(ticket)))
    .collect(toList());

List<Ticket> failures = tasks.stream()
    .flatMap(pair -> {
        try {
            Future<SmtpResponse> future = pair.getRight();
            future.get(1, TimeUnit.SECONDS);
            return Stream.empty();
        } catch (Exception e) {
            Ticket ticket = pair.getLeft();
            log.warn("Failed to send {}", ticket, e);
            return Stream.of(ticket);
        }
    })
    .collect(toList());

//------------------------------------

private Future<SmtpResponse> sendEmailAsync(Ticket ticket) {
    return pool.submit(() -> sendEmail(ticket));
}

Es una buena cantidad de código con el que todos los programadores Java deberían estar familiarizados. Sin embargo, parece demasiado verboso y accidentalmente complejo. En primer lugar, iteramos sobre tickets y los enviamos a un grupo de hilos. Para ser precisos, llamamos al método de ayuda sendEmailAsync() que envía la invocaciónsendEmail() envuelta en Callable<SmtpResponse> a un hilopool. Las instancias aún más precisas de Callable se colocan primero en una cola ilimitada (por defecto) frente a un grupo de hilos. La falta de mecanismos que ralenticen el envío demasiado rápido de tareas si no se pueden procesar a tiempo condujo a los flujos reactivos y al esfuerzo de contrapresión (véase "Contrapresión").

Como más adelante necesitaremos una instancia de Ticket en caso de fallo, debemos llevar la cuenta de qué Future era responsable de quéTicket, de nuevo en un Pair. En código de producción real, deberías considerar un contenedor más significativo y dedicado, como un objeto de valor TicketAsyncTask. Recopilamos todos esos pares y pasamos a la siguiente iteración. En este punto, el grupo de hilos ya está ejecutando varias invocaciones a sendEmail() simultáneamente, que es precisamente lo que pretendíamos. El segundo bucle recorre todos los Futures e intenta desreferenciarlos bloqueándolos (get()) y esperando a que se completen. Si get() regresa con éxito, nos saltamos dicha Ticket. Sin embargo, si se produce una excepción, devolvemos la instancia Ticket que estaba asociada a esta tarea: sabemos que ha fallado y queremos informar de ello más adelante. Stream.flatMap() nos permite devolver cero o un elemento (o en realidad cualquier número), al contrario que Stream.map(), que siempre requiere uno.

Quizá te preguntes por qué necesitamos dos bucles en lugar de uno solo como éste:

//WARNING: code is sequential despite utilizing thread pool
List<Ticket> failures = tickets
        .stream()
        .map(ticket -> Pair.of(ticket, sendEmailAsync(ticket)))
        .flatMap(pair -> {
            //...
        })
        .collect(toList());

Se trata de un error interesante que es realmente difícil de encontrar si no entiendes cómo funciona Streams en Java 8. Dado que los flujos -al igual que Observables- son perezosos, evalúan la colección subyacente elemento a elemento y sólo cuando se ha solicitado una operación terminal (por ejemplo,collect(toList())). Esto significa que una operación map() que inicie tareas en segundo plano no se ejecuta en todas las entradas inmediatamente, sino que se hace de una en una, utilizando alternativamente una operación flatMap(). Además, realmente iniciamos un Future, bloqueamos su espera, iniciamos un segundo Future, bloqueamos su espera, y así sucesivamente. Se necesita una colección intermedia para forzar la evaluación, no por claridad o legibilidad. Al fin y al cabo, el tipoList<Pair<Ticket, Future<SmtpResponse>>> es difícilmente más legible.

Eso es mucho trabajo y la posibilidad de error es alta, por lo que no es de extrañar que los desarrolladores sean reacios a aplicar código concurrente a diario. El poco conocido ExecutorCompletionService del JDK se utiliza a veces cuando hay un conjunto de tareas asíncronas y queremos procesarlas a medida que se completan. Además, Java 8 trae CompletableFuture (ver "CompletableFuture y Streams") que es totalmente reactivo y no bloqueante. Pero, ¿cómo puede ayudar aquí RxJava? En primer lugar, supongamos que una API para enviar un correo electrónico ya está adaptada para utilizar RxJava:

import static rx.Observable.fromCallable;

Observable<SmtpResponse> rxSendEmail(Ticket ticket) {
    //unusual synchronous Observable
    return fromCallable(() -> sendEmail())
}

No hay concurrencia de por medio, simplemente se envuelve sendEmail() dentro de un Observable. Se trata de un Observable raro; normalmente se utilizaría subscribeOn() en la implementación para que el Observable sea asíncrono por defecto. Llegados a este punto, podemos iterar sobre todos los tickets como antes:

List<Ticket> failures = Observable.from(tickets)
    .flatMap(ticket ->
        rxSendEmail(ticket)
            .flatMap(response -> Observable.<Ticket>empty())
            .doOnError(e -> log.warn("Failed to send {}", ticket, e))
            .onErrorReturn(err -> ticket))
    .toList()
    .toBlocking()
    .single();

Observable.ignorarElementos()

Es fácil ver que el operador interno flatMap() de nuestro ejemplo ignora response y devuelve un flujo vacío. En estos casos, flatMap() es una exageración; el operador ignoreElements() es mucho más eficiente.ignoreElements() simplemente ignora todos los valores emitidos y reenvía las notificaciones onCompleted() o onError(). Como estamos ignorando la respuesta real y sólo nos ocupamos de los errores, ignoreElements() funciona muy bien aquí.

Todo lo que nos interesa está dentro del exterior flatMap(). Si sólo se tratara deflatMap(this::rxSendEmail), el código funcionaría; sin embargo, cualquier fallo emitido desde rxSendEmail terminaría con todo el flujo. Pero queremos "atrapar" todos los errores emitidos y recogerlos para su posterior consumo. Utilizamos un truco similar al de Stream.flatMap(): si response se emitió con éxito, lo transformamos en un Observable vacío. Esto significa básicamente que descartamos las entradas con éxito. Sin embargo, en caso de fallos, devolvemos un ticket que ha emitido una excepción. Una llamada de retorno adicional adoOnError() nos permite registrar la excepción; por supuesto, también podemos añadir el registro al operador onErrorReturn(), pero me pareció más funcional esta separación de preocupaciones.

Para seguir siendo compatibles con las implementaciones anteriores, transformamos Observable en Observable<List<Ticket>>, BlockingObservable<List<Ticket>>, toBlocking(), y finalmente List<Ticket> (single()). Curiosamente, incluso BlockingObservable sigue siendo perezoso. Un operador toBlocking() por sí solo no fuerza la evaluación suscribiéndose al flujo subyacente y ni siquiera se bloquea. La suscripción y, por tanto, la iteración y el envío de correos electrónicos se posponen hasta que se invoca single().

Observa que si sustituimos el flatMap() externo por concatMap() (ver "Formas de combinar flujos: concat(), merge() y switchOnNext()" y "Preservar el orden utilizando concatMap()"), nos encontraremos con un fallo similar al mencionado con el Stream del JDK. A diferencia de flatMap() (o merge) que se suscriben inmediatamente a todos los flujos internos, concatMap (o concat) se suscribe a un Observable interno tras otro. Y mientras nadie se suscriba a Observable, ni siquiera se iniciará el trabajo.

Hasta ahora, se ha sustituido un simple bucle for con un operador try-catch por otro menos legible y más complejo Observable. Sin embargo, para convertir nuestro código secuencial en computación multihilo apenas necesitamos añadir un operador más:

Observable
        .from(tickets)
        .flatMap(ticket ->
                rxSendEmail(ticket)
                        .ignoreElements()
                        .doOnError(e -> log.warn("Failed to send {}", ticket, e))
                        .onErrorReturn(err -> ticket)
                        .subscribeOn(Schedulers.io()))

Es tan poco invasivo que puede resultarte difícil detectarlo. Un operadorsubscribeOn() adicional hace que cada rxSendMail() individual se ejecute en un Scheduler especificado (io(), en este caso). Éste es uno de los puntos fuertes de RxJava: no tiene reparos en lo que respecta a los subprocesos, ya que por defecto se ejecuta de forma síncrona, pero permite el multihilo sin fisuras y de forma casi transparente. Por supuesto, esto no significa que puedas inyectar programadores de forma segura en lugares arbitrarios. Pero al menos la API es menos verbosa y de más alto nivel. Exploraremos los programadores con mucho más detalle más adelante en "Multihilo en RxJava". De momento, recuerda que Observables es síncrono por defecto; sin embargo, podemos cambiarlo fácilmente y aplicar la concurrencia en los lugares donde menos se esperaba. Esto es especialmente valioso en aplicaciones heredadas existentes, que puedes optimizar sin mucha complicación.

Para terminar, si estás implementando Observables desde cero, hacerlos asíncronos por defecto es más idiomático. Eso significa colocar subscribeOn() directamente dentro de rxSendEmail() en lugar de externamente. De lo contrario, corres el riesgo de envolver flujos ya asíncronos con otra capa más de programadores. Por supuesto, si el productor que hay detrás de Observable ya es asíncrono, es incluso mejor, porque tu flujo no se vincula a ningún hilo en particular. Además, debes posponer la suscripción a un Observable lo más tarde posible, normalmente cerca del marco web de nuestro mundo exterior. Esto cambia significativamente tu forma de pensar. Toda tu lógica empresarial es perezosa hasta que alguien quiera ver realmente los resultados.3

Sustituir Callbacks por Streams

Las API tradicionales son bloqueantes la mayor parte del tiempo, lo que significa que te obligan a esperar los resultados de forma sincrónica. Este enfoque funciona relativamente bien, al menos antes de que conocieras RxJava. Pero una API bloqueante es especialmente problemática cuando hay que enviar datos desde el productor de la API a los consumidores: éste es un ámbito en el que RxJava brilla de verdad. Hay numerosos ejemplos de estos casos y los diseñadores de API adoptan diversos enfoques. Normalmente, necesitamos proporcionar algún tipo de llamada de retorno que la API invoque, a menudo llamados escuchadores de eventos. Uno de los escenarios más comunes de este tipo esJava Message Service (JMS). Consumir JMS suele implicar implementar una clase a la que el servidor o contenedor de aplicaciones notifica cada mensaje entrante. Podemos sustituir con relativa facilidad tales escuchadores por un componible Observable, que es mucho más robusto y versátil. El oyente tradicional tiene un aspecto similar al de esta clase, que aquí utiliza el soporte JMS enSpring framework, pero nuestra solución es agnóstica con respecto a la tecnología:

@Component
class JmsConsumer {

    @JmsListener(destination = "orders")
    public void newOrder(Message message) {
        //...
    }
}

Cuando se recibe un mensaje JMS message, la clase JmsConsumer debe decidir qué hacer con él. Normalmente, se invoca cierta lógica empresarial dentro de un consumidor de mensajes. Cuando un nuevo componente quiere recibir notificaciones sobre dichos mensajes, debe modificar JmsConsumer adecuadamente. En otras palabras, imagina Observable<Message> al que puede suscribirse cualquiera. Además, hay disponible todo un universo de operadores RxJava, que permiten capacidades de mapeo, filtrado y combinación. La forma más sencilla de pasar de una API push basada en llamadas de retorno a Observable es utilizarSubjects. Cada vez que se entrega un nuevo mensaje JMS, empujamos ese mensaje a un PublishSubject que parece unObservable caliente ordinario desde el exterior:

private final PublishSubject<Message> subject = PublishSubject.create();

@JmsListener(destination = "orders", concurrency="1")
public void newOrder(Message msg) {
    subject.onNext(msg);
}

Observable<Message> observe() {
    return subject;
}

Ten en cuenta que Observable<Message> es caliente; empieza a emitir mensajes JMS en cuanto se consumen. Si no hay nadie suscrito en ese momento, los mensajes simplemente se pierden. ReplaySubject es una alternativa, pero como almacena en caché todos los eventos desde el inicio de la aplicación, no es adecuada para procesos de larga duración. En caso de que tengas un suscriptor que deba recibir absolutamente todos los mensajes, asegúrate de que se suscribe antes de que se inicialice el listener de mensajes JMS. Además, nuestro listener de mensajes tiene un parámetro concurrency="1" para asegurar que Subject no sea invocado desde múltiples hilos. Como alternativa, puedes utilizar Subject.toSerialized().

Como nota al margen, Subjects son más fáciles de poner en marcha, pero se sabe que pueden dar problemas al cabo de un tiempo. En este caso concreto, podemos sustituir fácilmente Subject por el más idiomático RxJava Observable, que utiliza directamente create():

public Observable<Message> observe(
    ConnectionFactory connectionFactory,
    Topic topic) {
    return Observable.create(subscriber -> {
        try {
            subscribeThrowing(subscriber, connectionFactory, topic);
        } catch (JMSException e) {
            subscriber.onError(e);
        }
    });
}

private void subscribeThrowing(
        Subscriber<? super Message> subscriber,
        ConnectionFactory connectionFactory,
        Topic orders) throws JMSException {
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(true, AUTO_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer(orders);
    consumer.setMessageListener(subscriber::onNext);
    subscriber.add(onUnsubscribe(connection));
    connection.start();
}

private Subscription onUnsubscribe(Connection connection) {
    return Subscriptions.create(() -> {
        try {
            connection.close();
        } catch (Exception e) {
            log.error("Can't close", e);
        }
    });
}

La API JMS proporciona dos formas de recibir mensajes de un intermediario: sincrónica, mediante el método receive(), que se bloquea, y no bloqueante, mediante MessageListener. La API no bloqueante es beneficiosa por muchas razones; por ejemplo, consume menos recursos como hilos y memoria de pila. Además, se alinea perfectamente con el estilo de programación Rx. En lugar de crear una instancia de MessageListener y llamar a nuestro suscriptor desde dentro de ella, podemos utilizar esta sintaxis escueta con referencia a métodos:

consumer.setMessageListener(subscriber::onNext)

Además, debemos ocuparnos de la limpieza de los recursos y de la gestión adecuada de los errores. Esta minúscula capa de transformación nos permite consumir fácilmente mensajes JMS sin preocuparnos de los aspectos internos de la API. Aquí tienes un ejemplo utilizando el popular broker de mensajería ActiveMQ ejecutándose localmente:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

ConnectionFactory connectionFactory =
    new ActiveMQConnectionFactory("tcp://localhost:61616");
Observable<String> txtMessages =
        observe(connectionFactory, new ActiveMQTopic("orders"))
        .cast(TextMessage.class)
        .flatMap(m -> {
            try {
                return Observable.just(m.getText());
            } catch (JMSException e) {
                return Observable.error(e);
            }
        });

JMS, al igual que JDBC, tiene fama de utilizar en gran medida JMSException comprobado, incluso cuando se llama a getText() en un TextMessage. Para gestionar adecuadamente los errores (para más detalles, consulta "Gestión de errores" ) utilizamos flatMap() y envolvemos las excepciones. A partir de ahí, puedes tratar los mensajes JMS que fluyen como cualquier otro flujo asíncrono y no bloqueante. Y, por cierto, utilizamos el operador cast() que, de forma optimista, convierte los eventos ascendentes a un tipo determinado, fallando con onError(), en caso contrario.cast() es básicamente un operador especializado map() que se comporta como map(x -> (TextMessage)x).

Sondear periódicamente los cambios

La peor API de bloqueo con la que puedes trabajar requiere sondear los cambios. No proporciona ningún mecanismo para empujar los cambios hacia ti, ni siquiera con devoluciones de llamada o bloqueando indefinidamente. El único mecanismo que proporciona esta API es preguntar por el estado actual, y depende de ti averiguar si difiere del estado anterior o no. RxJava tiene unos cuantos operadores realmente potentes que puedes aplicar para adaptar una determinada API al estilo Rx. El primer caso que quiero que consideres es un método sencillo que proporciona un único valor que representa el estado, por ejemplolong getOrderBookLength(). Para realizar un seguimiento de los cambios, debemos llamar a este método con suficiente frecuencia y capturar las diferencias. Puedes conseguirlo en RxJava con un operador de composición muy básico:

Observable
        .interval(10, TimeUnit.MILLISECONDS)
        .map(x -> getOrderBookLength())
        .distinctUntilChanged()

Primero producimos un valor sintético long cada 10 milisegundos, que sirve como contador de tic-tac básico. Por cada valor de este tipo (es decir, cada 10 milisegundos), llamamos a getOrderBookLength(). Sin embargo, el método mencionado no cambia tan a menudo, y no queremos inundar a nuestros suscriptores con montones de cambios de estado irrelevantes. Por suerte, podemos decir simplemente distinctUntilChanged() y RxJava omitirá de forma transparente longlos valores devueltos por getOrderBookLength() que no hayan cambiado desde la última invocación, como se demuestra en el siguiente diagrama de mármol:

distinct

Podemos aplicar este patrón aún más. Imagina que estás pendiente de los cambios en el sistema de archivos o en las tablas de la base de datos. El único mecanismo a tu disposición es tomar una instantánea actual de los archivos o registros de la base de datos. Estás construyendo una API que notificará a los clientes cada novedad. Obviamente, puedes utilizar java.nio.file.WatchService o disparadores de base de datos, pero tómalo como un ejemplo didáctico. Esta vez, de nuevo, empezamos tomando periódicamente una instantánea del estado actual:

Observable<Item> observeNewItems() {
    return Observable
            .interval(1, TimeUnit.SECONDS)
            .flatMapIterable(x -> query())
            .distinct();
}

List<Item> query() {
    //take snapshot of file system directory
    //or database table
}

El operador distinct() mantiene un registro de todos los elementos que han pasado por él (véase también "Eliminar duplicados utilizando distinct() y distinctUntilChanged()"). Si el mismo elemento aparece por segunda vez, simplemente se ignora. Por eso podemos empujar la misma lista de Items cada segundo. La primera vez se envían a todos los suscriptores. Sin embargo, cuando aparece exactamente la misma lista un segundo después, ya se han visto todos los elementos y, por tanto, se descartan. Si en algún momento la lista devuelta por query() contiene unItem de más, distinct() lo dejará pasar, pero lo descartará la próxima vez. Este sencillo patrón nos permite sustituir un montón de invocaciones a Thread.sleep() y el almacenamiento manual en caché por el sondeo periódico.Es aplicable en muchos ámbitos, como el sondeo del Protocolo de Transferencia de Archivos (FTP), el raspado web, etc.

Multihilo en RxJava

Hay APIs de terceros que se bloquean y sencillamente no podemos hacer nada al respecto. Puede que no dispongamos del código fuente y que reescribirlo suponga demasiado riesgo. En ese caso, debemos aprender a lidiar con el código bloqueante en lugar de luchar contra él.

Uno de los rasgos distintivos de RxJava es la concurrencia declarativa, en contraposición a la concurrencia imperativa. La creación y gestión manual de hilos es cosa del pasado (compárese con "Thread Pool de Conexiones") la mayoría de nosotros ya utilizamos pools de hilos gestionados (por ejemplo, con ExecutorService). Pero RxJava va un paso más allá: Observable puede ser no bloqueante, igual que CompletableFuture en Java 8 (véase "CompletableFuture y Streams"), pero a diferencia del otro, también es perezoso. A menos que te suscribas, un Observable que se comporte bien no realizará ninguna acción. Pero el poder de Observable va incluso más allá.

Un Observable asíncrono es el que llama a los métodos de devolución de llamada de tu Subscribers (como onNext()) desde un hilo diferente. ¿Recuerdas "Dominar Observable.create()", en el que exploramos cuándo subscribe() se bloquea, esperando a que lleguen todas las notificaciones? En la vida real, la mayoría de los Observables proceden de fuentes que son asíncronas por naturaleza. El capítulo 5 está enteramente dedicado a tales Observables. Pero incluso nuestro sencillo ejemplo JMS de "Sustituir las devoluciones de llamada por flujos", que utiliza una API integrada y no bloqueante de la especificación JMS (interfazMessageListener ). Esto no lo impone ni lo sugiere el sistema de tipos, pero muchos Observables son asíncronos desde el principio, y debes asumirlo. Un método subscribe() bloqueante ocurre muy raramente, cuando una lambda dentro de Observable.create() no está respaldada por ningún proceso o flujo asíncrono. Sin embargo, por defecto (con create()) todo ocurre en el hilo del cliente (el que se suscribió). Si pulsas onNext() directamente dentro de tu llamada de retorno a create(), no hay multihilo ni concurrencia de ningún tipo.

Al encontrarnos con un Observable tan inusual, podemos seleccionar declarativamente el llamado Scheduler que se utilizará para emitir valores. En el caso de CompletableFuture, no tenemos control sobre los hilos subyacentes, la API tomó la decisión y, en el peor de los casos, es imposible anularla. RxJava rara vez toma esas decisiones por sí solo y opta por el valor predeterminado seguro: hilo de cliente y sin multihilo de por medio. A efectos de este capítulo, utilizaremos una "biblioteca" de registro realmente sencilla4 que imprimirá un mensaje junto con el hilo actual y el número de milisegundos transcurridos desde el inicio del programa utilizando System.currentTimeMillis():

void log(Object label) {
    System.out.println(
        System.currentTimeMillis() - start + "\t| " +
        Thread.currentThread().getName()   + "\t| " +
        label);
}

¿Qué es un Programador?

RxJava es agnóstico respecto a la concurrencia y, de hecho, no la introduce por sí mismo. Sin embargo, algunas abstracciones para tratar con hilos están expuestas al usuario final. Además, ciertos operadores no pueden funcionar correctamente sin concurrencia; consulta "Otros usos de los programadores" para conocer algunos de ellos. Por suerte, la clase Scheduler, la única a la que debes prestar atención, es bastante sencilla. En principio, funciona de forma similar a ScheduledExecutorService de java.util.concurrent-ejecuta bloques arbitrarios de código, posiblemente en el futuro. Sin embargo, para cumplir el contrato Rx, ofrece algunas abstracciones más finas, de las que puedes ver más en la sección avanzada "Visión general de los detalles de implementación del programador".

Los programadores se utilizan junto con los operadores subscribeOn() y observeOn(), así como al crear determinados tipos de Observables. Un programador sólo crea instancias de Workers que se encargan de programar y ejecutar código. Cuando RxJava necesita programar algún código, primero pide a Scheduler que le proporcione un Worker y utiliza este último para programar las tareas posteriores. Más adelante encontrarás ejemplos de esta API, pero primero familiarízate con los programadores integrados disponibles:

Schedulers.newThread()

Este programador simplemente inicia un nuevo hilo cada vez que se solicita a través de subscribeOn() o observeOn(). newThread() casi nunca es una buena opción, no sólo por la latencia que implica iniciar un hilo, sino también porque este hilo no se reutiliza. El espacio de la pila debe asignarse por adelantado (normalmente en torno a un megabyte, como controla el parámetro -Xss de la JVM) y el sistema operativo debe iniciar un nuevo hilo nativo. Cuando termina la Worker, el hilo simplemente termina. Este planificador sólo puede ser útil cuando las tareas son de grano grueso: tardan mucho tiempo en completarse, pero son muy pocas, por lo que es poco probable que los hilos se reutilicen en absoluto. Véase también "Hilo por conexión". En la práctica, seguir Schedulers.io() es casi siempre una opción mejor.

Schedulers.io()

Este planificador es similar a newThread(), pero los subprocesos ya iniciados se reciclan y posiblemente puedan gestionar futuras peticiones. Esta implementación funciona de forma similar a ThreadPoolExecutor de java.util.concurrent con un conjunto ilimitado de hilos. Cada vez que se solicita un nuevo Worker, o bien se inicia un nuevo hilo (y después se mantiene inactivo durante algún tiempo) o se reutiliza el que está inactivo.

El nombre io() no es una coincidencia. Considera la posibilidad de utilizar este programador para tareas ligadas a E/S que requieren muy pocos recursos de CPU. Sin embargo, suelen tardar bastante tiempo, esperando a la red o al disco. Por lo tanto, es una buena idea tener un conjunto relativamente grande de hilos. Aun así, ten cuidado con los recursos ilimitados de cualquier tipo: en caso de dependencias externas lentas o que no respondan, como los servicios web, el programador io() podría iniciar un número enorme de hilos, provocando que tu propia aplicación tampoco responda. Consulta " Gestión de fallos con Hystrix" para más detalles sobre cómo abordar este problema.

Schedulers.computation()

Debes utilizar un programador de cálculo cuando las tareas estén totalmente ligadas a la CPU; es decir, que requieran potencia de cálculo y no tengan código de bloqueo (lectura del disco, red, suspensión, espera de bloqueo, etc.) Dado que se supone que cada tarea ejecutada en este programador debe utilizar completamente un núcleo de la CPU, ejecutar más tareas de este tipo en paralelo que núcleos disponibles no aportaría mucho valor. Por lo tanto, el planificador computation() limita por defecto el número de hilos que se ejecutan en paralelo al valor de availableProcessors(), que se encuentra en la clase de utilidad Runtime.getRuntime().

Si por alguna razón necesitas un número de hilos distinto del predeterminado, siempre puedes utilizar la propiedad del sistema rx.scheduler.max-computation-threads. Tomando menos hilos te aseguras de que siempre haya uno o más núcleos de CPU ociosos, e incluso bajo una carga pesada, el conjunto de hilos de computation() no satura tu servidor. No es posible tener más hilos de cálculo que núcleos.

computation() El programador utiliza una cola ilimitada delante de cada hilo, de modo que si la tarea está programada pero todos los núcleos están ocupados, se ponen en cola. En caso de pico de carga, este programador mantendrá limitado el número de hilos. Sin embargo, la cola delante de cada hilo seguirá creciendo.

Por suerte, los operadores incorporados, especialmente observeOn() que vamos a descubrir en "Concurrencia declarativa con observeOn()" garantizan que este Scheduler no se sobrecargue.

Schedulers.from(Executor executor)

Schedulers son internamente más complejas que Executors de java.util.concurrent, por lo que se necesitaba una abstracción aparte. Pero como conceptualmente son bastante similares, no es de extrañar que exista una envoltura que pueda convertir Executor en Scheduler utilizando el método de fábrica from():

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import rx.Scheduler;
import rx.schedulers.Schedulers;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

//...

ThreadFactory threadFactory = new ThreadFactoryBuilder()
    .setNameFormat("MyPool-%d")
    .build();
Executor executor = new ThreadPoolExecutor(
    10,  //corePoolSize
    10,  //maximumPoolSize
    0L, TimeUnit.MILLISECONDS, //keepAliveTime, unit
    new LinkedBlockingQueue<>(1000),  //workQueue
    threadFactory
);
Scheduler scheduler = Schedulers.from(executor);

Estoy utilizando intencionadamente esta sintaxis verbosa para crear ExecutorService en lugar de la versión más sencilla:

import java.util.concurrent.Executors;

//...

ExecutorService executor = Executors.newFixedThreadPool(10);

Aunque tentadora, la clase de fábrica Executors codifica de forma rígida varios valores predeterminados que son poco prácticos o incluso peligrosos en aplicaciones empresariales. Por ejemplo, utiliza LinkedBlockingQueue sin límites que pueden crecer infinitamente, lo que resulta en OutOfMemoryError para casos en los que hay un gran número de tareas pendientes. Además, por defecto ThreadFactory utiliza nombres de hilos sin sentido como pool-5-thread-3. Nombrar los hilos correctamente es una herramienta inestimable a la hora de crear perfiles o analizar los volcados de hilos. Implementar ThreadFactory desde cero es un poco engorroso, así que utilizamos ThreadFactoryBuilder de Guava. Si te interesa afinar aún más y utilizar adecuadamente los thread pools, consulta "Thread Pool de Conexiones" y "Gestión de fallos con Hystrix". Crear planificadores a partir de Executor que configuramos conscientemente es aconsejable para proyectos con mucha carga. Sin embargo, como RxJava no tiene control sobre los hilos creados independientemente en un Executor, no puede fijar hilos (es decir, intentar mantener el trabajo de la misma tarea en el mismo hilo para mejorar la localidad de la caché). Este Scheduler apenas se asegura de que un único Scheduler.Worker (ver "Visión general de los detalles de implementación del programador") procese los eventos secuencialmente.

Schedulers.immediate()

Schedulers.immediate() es un planificador especial que invoca una tarea dentro del subproceso del cliente de forma bloqueante, en lugar de asíncrona. Utilizarlo no tiene sentido a menos que alguna parte de tu API requiera proporcionar un planificador, mientras que estás absolutamente bien con el comportamiento por defecto de Observable, que no implica ningún tipo de roscado. De hecho, suscribirse a un Observable (más sobre esto en un segundo) a través de immediate() Scheduler suele tener el mismo efecto que no suscribirse con ningún programador en particular. En general, evita este programador, bloquea el subproceso de llamada y es de uso limitado.

Schedulers.trampoline()

El programador trampoline() es muy similar a immediate() porque también programa tareas en el mismo hilo, bloqueando de forma efectiva. Sin embargo, a diferencia de immediate(), la tarea siguiente se ejecuta cuando finalizan todas las tareas programadas anteriormente.immediate() invoca una tarea determinada inmediatamente, mientras que trampoline() espera a que finalice la tarea actual. Trampolín es un patrón de la programación funcional que permite implementar la recursividad sin hacer crecer infinitamente la pila de llamadas. Esto se explica mejor con un ejemplo, primero con immediate(). Por cierto, fíjate en que no interactuamos directamente con una instancia de Scheduler, sino que primero creamos un Worker. Esto tiene sentido, como verás rápidamente en "Visión general de los detalles de implementación del programador".

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();

log("Main start");
worker.schedule(() -> {
    log(" Outer start");
    sleepOneSecond();
    worker.schedule(() -> {
        log("  Inner start");
        sleepOneSecond();
        log("  Inner end");
    });
    log(" Outer end");
});
log("Main end");
worker.unsubscribe();

El resultado es el esperado; en realidad, podrías sustituir schedule() por una simple invocación a un método:

1044    | main  | Main start
1094    | main  |  Outer start
2097    | main  |   Inner start
3097    | main  |   Inner end
3100    | main  |  Outer end
3100    | main  | Main end

Dentro del bloque Outer tenemos el bloque schedule() Inner que se invoca inmediatamente, interrumpiendo la tarea Outer. Cuando Inner termina, el control vuelve a Outer. De nuevo, esto es simplemente una forma enrevesada de invocar una tarea de forma bloqueante indirectamente a través de immediate() Scheduler . Pero, ¿qué ocurre si sustituimos Schedulers.immediate() por Schedulers.trampoline()? El resultado es muy diferente:

1030    | main  | Main start
1096    | main  |  Outer start
2101    | main  |  Outer end
2101    | main  |   Inner start
3101    | main  |   Inner end
3101    | main  | Main end

¿Ves cómo Outer consigue terminar antes incluso de que empiece Inner? Esto se debe a que la tarea Inner estaba en cola dentro de la tarea trampoline() Scheduler , que ya estaba ocupada por la tarea Outer. Cuando Outer terminó, comenzó la primera tarea de la cola (Inner). Podemos ir aún más lejos para asegurarnos de que entiendes la diferencia:

log("Main start");
worker.schedule(() -> {
    log(" Outer start");
    sleepOneSecond();
    worker.schedule(() -> {
        log("  Middle start");
        sleepOneSecond();
        worker.schedule(() -> {
            log("   Inner start");
            sleepOneSecond();
            log("   Inner end");
        });
        log("  Middle end");
    });
    log(" Outer end");
});
log("Main end");

El Worker de immediate() Scheduler produce lo siguiente:

1029    | main  | Main start
1091    | main  |  Outer start
2093    | main  |   Middle start
3095    | main  |    Inner start
4096    | main  |    Inner end
4099    | main  |   Middle end
4099    | main  |  Outer end
4099    | main  | Main end

Frente al trabajador trampoline():

1041    | main  | Main start
1095    | main  |  Outer start
2099    | main  |  Outer end
2099    | main  |   Middle start
3101    | main  |   Middle end
3101    | main  |    Inner start
4102    | main  |    Inner end
4102    | main  | Main end
Schedulers.test()

Este Scheduler sólo se utiliza para pruebas, y nunca lo verás en código de producción. Su principal ventaja es la capacidad de avanzar arbitrariamente el reloj, simulando el paso del tiempo. TestScheduler se describe en gran medida en "Programadores en pruebas unitarias". Schedulers por sí solos no son muy interesantes. Si quieres descubrir cómo funcionan internamente y cómo implementar el tuyo propio, consulta la siguiente sección.

Resumen de los detalles de implementación del programador

Nota

Esta sección es totalmente opcional, siéntete libre de saltar directamente a "Suscripción declarativa con subscribeOn()" si no te interesan los detalles de la implementación.

Scheduler no sólo desacopla las tareas y su ejecución (normalmente ejecutándolas en otro hilo), sino que también abstrae el reloj, como aprenderemos en "Tiempo virtual". La API de Scheduler es un poco más sencilla en comparación con, por ejemplo, ScheduledExecutorService:

abstract class Scheduler {
    abstract Worker createWorker();

    long now();

    abstract static class Worker implements Subscription {

        abstract Subscription schedule(Action0 action);

        abstract Subscription schedule(Action0 action,
                             long delayTime, TimeUnit unit);

        long now();
    }
}

Cuando RxJava quiere programar una tarea (presumiblemente, pero no necesariamente en segundo plano), primero debe solicitar una instancia de Worker. Es el Worker el que permite programar la tarea sin ningún retraso o en algún momento en el tiempo. Tanto Scheduler como Worker tienen una fuente de tiempo anulable (métodonow() ) que utiliza para determinar cuándo debe ejecutarse una tarea determinada. Ingenuamente, puedes pensar en un Scheduler como en un pool de hilos y en un Worker como en un hilo dentro de ese pool.

La separación entre Scheduler y Worker es necesaria para aplicar fácilmente algunas de las directrices que impone el contrato Rx, como invocar el método de Subscriberde forma secuencial, no concurrente. WorkerEl contrato de Rx establece precisamente eso: dos tareas programadas en el mismo Worker nunca se ejecutarán simultáneamente. Sin embargo, Workers independientes del mismo Scheduler pueden ejecutar tareas simultáneamente sin problemas.

En lugar de repasar la API, vamos a analizar el código fuente de un Scheduler existente, concretamente HandlerScheduler, que se encuentra en el proyecto RxAndroid. Este Scheduler simplemente ejecuta todas las tareas programadas en un hilo de la interfaz de usuario de Android. La actualización de la interfaz de usuario sólo se permite desde ese hilo (para más detalles, consulta "Desarrollo Android con RxJava" ). Esto es similar al Hilo de Despacho de Eventos (EDT) que se encuentra en Swing, donde la mayoría de las actualizaciones de ventanas y componentes deben ejecutarse dentro de un hilo dedicado (EDT). Como era de esperar, también existe el proyecto RxSwing5 para ello.

El fragmento de código que sigue es una clase despojada e incompleta de RxAndroid sólo con fines educativos:

package rx.android.schedulers;

import android.os.Handler;
import android.os.Looper;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.TimeUnit;

public final class SimplifiedHandlerScheduler extends Scheduler {

    @Override
    public Worker createWorker() {
        return new HandlerWorker();
    }

    static class HandlerWorker extends Worker {

        private final Handler handler = new Handler(Looper.getMainLooper());

        @Override
        public void unsubscribe() {
            //Implementation coming soon...
        }

        @Override
        public boolean isUnsubscribed() {
            //Implementation coming soon...
            return false;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }

        @Override
        public Subscription schedule(
        Action0 action, long delayTime, TimeUnit unit) {
            ScheduledAction scheduledAction = new ScheduledAction(action);
            handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

            scheduledAction.add(Subscriptions.create(() ->
                    handler.removeCallbacks(scheduledAction)));

            return scheduledAction;
        }
    }
}

Los detalles de la API de Android no son importantes por el momento. Lo que ocurre aquí es que cada vez que programamos algo en un HandlerWorker, el bloque de código se pasa a un método especial postDelayed() que lo ejecuta en un hilo dedicado de Android. Sólo hay uno de estos hilos, por lo que los eventos se serializan no sólo dentro de Workers, sino también entre ellos.

Antes de pasar action para que se ejecute, lo envolvemos con ScheduledAction, que implementa tanto Runnable como Subscription. RxJava es perezoso siempre que puede; esto también se aplica a la programación de tareas. Si por alguna razón decides que un determinado action no debe ejecutarse después de todo (esto tiene sentido cuando la acción se programó en el futuro, no inmediatamente), simplemente ejecuta unsubscribe() en el Subscription devuelto por schedule(). Es responsabilidad del Worker manejar adecuadamente la desuscripción (al menos, el mejor esfuerzo).

El código cliente también puede decidir unsubscribe() desde Worker en su totalidad. Esto debería anular la suscripción de todas las tareas en cola, así como liberar el Worker para que el hilo subyacente pueda reutilizarse potencialmente más adelante. El siguiente fragmento de código mejora el SimplifiedHandlerScheduler añadiendo el flujo de desuscripción Worker (sólo se incluyen los métodos modificados):

private CompositeSubscription compositeSubscription =
    new CompositeSubscription();

@Override
public void unsubscribe() {
    compositeSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
    return compositeSubscription.isUnsubscribed();
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    if (compositeSubscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }

    final ScheduledAction scheduledAction = new ScheduledAction(action);
    scheduledAction.addParent(compositeSubscription);
    compositeSubscription.add(scheduledAction);

    handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

    scheduledAction.add(Subscriptions.create(() ->
            handler.removeCallbacks(scheduledAction)));

    return scheduledAction;
}

En "Control de los receptores mediante la suscripción y el suscriptor<T>", exploramos la interfaz Subscription, pero no llegamos a examinar los detalles de su implementación. CompositeSubscription es una de las muchas implementaciones disponibles, que en sí misma no es más que un contenedor para hijos Subscription(un patrón de diseño compuesto ). Darse de baja de CompositeSubscription significa darse de baja de todos los hijos. También puedes añadir y eliminar los hijos gestionados por CompositeSubscription.

En nuestro Scheduler personalizado, CompositeSubscription se utiliza para rastrear todos los Subscriptions de las invocaciones anteriores a schedule() (ver compositeSubscription.add(scheduledAction)). Por otra parte, el ScheduledAction hijo necesita conocer a su padre (ver: addParent()) para poder eliminarse a sí mismo cuando la acción finalice o se cancele. De lo contrario, Worker acumularía hijos Subscriptions obsoletos para siempre. Cuando el código cliente decide que ya no necesita una instancia de HandlerWorker, se da de baja de ella. La baja se propaga a todos los Subscriptions hijos pendientes (si los hay).

Ésta ha sido una breve introducción a Schedulers en RxJava. Los detalles de su funcionamiento interno no son tan útiles en el trabajo diario; de hecho, están diseñados para que el uso de RxJava sea más intuitivo y predecible. Dicho esto, veamos rápidamente cómo Schedulers resuelve muchos problemas de concurrencia en Rx.

Suscripción declarativa con subscribeOn()

En "Dominar Observable.create()" vimos que subscribe() utiliza por defecto el hilo del cliente. Para recapitular, aquí tienes la suscripción más sencilla que se te pueda ocurrir en la que no haya ningún tipo de subproceso implicado:

Observable<String> simple() {
    return Observable.create(subscriber -> {
        log("Subscribed");
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onCompleted();
    });
}

//...

log("Starting");
final Observable<String> obs = simple();
log("Created");
final Observable<String> obs2 = obs
        .map(x -> x)
        .filter(x -> true);
log("Transformed");
obs2.subscribe(
        x -> log("Got " + x),
        Throwable::printStackTrace,
        () -> log("Completed")
);
log("Exiting");

Fíjate en dónde se colocan las sentencias de registro y estudia detenidamente la salida, sobre todo en lo que respecta a qué hilo invocó la sentencia print:

33  | main  | Starting
120 | main  | Created
128 | main  | Transformed
133 | main  | Subscribed
133 | main  | Got A
133 | main  | Got B
133 | main  | Completed
134 | main  | Exiting

Presta atención: el orden de las declaraciones es absolutamente predecible. En primer lugar, cada línea de código del fragmento de código anterior se ejecuta en el hilo main, no hay grupos de hilos ni emisión asíncrona de eventos implicados. En segundo lugar, el orden de ejecución puede no estar del todo claro a primera vista.

Cuando se inicia el programa, imprime Starting, lo cual es comprensible. Después de crear una instancia de Observable<String>, vemos el mensaje Created. Observa que Subscribed aparece más tarde, cuando realmente nos suscribimos. Sin la invocación a subscribe(), el bloque de código dentro de Observable.create() nunca se ejecuta. Además, incluso los operadores map() y filter() no tienen efectos secundarios visibles, observa cómo el mensaje Transformed se imprime incluso antes que Subscribed.

Después, recibimos todos los eventos emitidos y la notificación de finalización. Por último, se imprime la declaración Exiting y el programa puede volver. Ésta es una observación interesante: se supone quesubscribe() registra una llamada de retorno cuando aparecen eventos de forma asíncrona. Esta es la suposición que deberías hacer por defecto. Sin embargo, en este caso no hay hilos implicados y subscribe() en realidad está bloqueando. ¿Cómo es posible?

Existe una conexión inherente pero oculta entre subscribe() y create(). Cada vez que llamas a subscribe() en un Observable, se invoca su método de devolución de llamada OnSubscribe (envolviendo la expresión lambda que pasaste a create()). Recibe tu Subscriber como argumento. Por defecto, esto ocurre en el mismo hilo y es bloqueante, así que cualquier cosa que hagas dentro de create() bloqueará subscribe(). Si tu método create() duerme unos segundos, subscribe() se bloqueará. Además, si hay operadores entre Observable.create() y tu Subscriber (lambda que actúa como devolución de llamada), todos estos operadores se invocan en nombre de la hebra que invocó subscribe(). RxJava no inyecta por defecto ninguna facilidad de concurrencia entre Observable y Subscriber. La razón es que Observablesuele estar respaldada por otros mecanismos de concurrencia, como bucles de eventos o hebras personalizadas, por lo que Rx te permite tomar el control total en lugar de imponer ninguna convención.

Esta observación prepara el terreno para el operador subscribeOn(). Al insertar subscribeOn() en cualquier lugar entre un Observable original y subscribe(), seleccionas declarativamente Scheduler donde se invocará el método de devolución de llamada OnSubscribe. No importa lo que hagas dentro de create(), este trabajo se descarga a un Scheduler independiente y tu invocación a subscribe() ya no se bloquea:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .subscribeOn(schedulerA)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");
35  | main  | Starting
112 | main  | Created
123 | main  | Exiting
123 | Sched-A-0 | Subscribed
124 | Sched-A-0 | Got A
124 | Sched-A-0 | Got B
124 | Sched-A-0 | Completed

¿Ves cómo el hilo main sale antes incluso de que Observable empiece a emitir ningún valor? Técnicamente, el orden de los mensajes de registro ya no es tan predecible porque hay dos hilos ejecutándose simultáneamente: main Sched-A-0 , que se suscribió y quiere salir, y , que emite eventos en cuanto alguien se suscribe. Tanto el hilo schedulerA como el Sched-A-0 proceden de los siguientes programadores de ejemplo que construimos con fines ilustrativos:

import static java.util.concurrent.Executors.newFixedThreadPool;


ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);

ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);

ExecutorService poolC = newFixedThreadPool(10, threadFactory("Sched-C-%d"));
Scheduler schedulerC = Schedulers.from(poolC);

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
        .setNameFormat(pattern)
        .build();
}

Estos programadores se utilizarán en todos los ejemplos, pero son bastante fáciles de recordar. Tres programadores independientes, cada uno de los cuales gestiona 10 hilos de un ExecutorService. Para que la salida sea más agradable, cada grupo de hilos tiene un patrón de nombres distinto.

Antes de empezar, debes comprender que en las aplicaciones maduras, en términos de adopción de Rx, subscribeOn() se utiliza muy raramente. Normalmente, Observables proceden de fuentes que son naturalmente asíncronas (como RxNetty, véase "Servidor HTTP no bloqueante con Netty y RxNetty") o aplican la programación por sí mismas (como Hystrix, véase "Gestión de fallos con Hystrix"). Debes tratar subscribeOn() sólo en casos especiales, cuando se sepa que el Observable subyacente es síncrono (siendocreate() bloqueante). Sin embargo, subscribeOn() sigue siendo una solución mucho mejor que la creación manual de hilos dentro de create():

//Don't do this
Observable<String> obs = Observable.create(subscriber -> {
    log("Subscribed");
    Runnable code = () -> {
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onCompleted();
    };
    new Thread(code, "Async").start();
});

El código anterior mezcla dos conceptos: la producción de eventos y la elección de la estrategia de concurrencia. Observable sólo debe ser responsable de la lógica de producción, mientras que sólo el código del cliente puede tomar una decisión sensata sobre la concurrencia. Recuerda que Observable es perezoso pero también inmutable, en el sentido de que subscribeOn() sólo afecta a los suscriptores posteriores, si alguien se suscribe exactamente al mismo Observable sin subscribeOn() por medio, no habrá concurrencia por defecto.

Ten en cuenta que en este capítulo nos centramos en las aplicaciones existentes y en introducir RxJava gradualmente. El operador subscribeOn() es bastante útil en tales circunstancias; sin embargo, después de comprender las extensiones reactivas y empezar a utilizarlas a gran escala, el valor de subscribeOn() disminuye. En pilas de software totalmente reactivas, como las que se encuentran por ejemplo en Netflix , subscribeOn() casi nunca se utiliza, y sin embargo todos los Observables son asíncronos. La mayoría de las veces, Observables proceden de fuentes asíncronas y se tratan como asíncronas por defecto. Por tanto, el uso de subscribeOn() es muy limitado, sobre todo cuando se adaptan API o bibliotecas existentes. En el Capítulo 5, escribimos aplicaciones verdaderamente asíncronas sin subscribeOn() y Schedulers explícitas por completo.

subscribeOn() Concurrencia y comportamiento

Hay varios matices sobre cómo funciona subscribeOn(). En primer lugar, el lector curioso se preguntará qué ocurre si aparecen dos invocaciones del subscribeOn() entre Observable y subscribe(). La respuesta es sencilla: gana el subscribeOn() más cercano al Observable original. Esto tiene importantes implicaciones prácticas. Si estás diseñando una API y utilizas subscribeOn() internamente, el código del cliente no tiene forma de anular el Scheduler elegido. Esto puede ser una decisión de diseño consciente; al fin y al cabo, el diseñador de la API puede saber mejor que nadie qué Scheduler es el adecuado. Por otro lado, proporcionar una versión sobrecargada de dicha API que permita anular el Scheduler elegido es siempre una buena idea.

Estudiemos cómo se comporta subscribeOn():

log("Starting");
Observable<String> obs = simple();
log("Created");
obs
        .subscribeOn(schedulerA)
        //many other operators
        .subscribeOn(schedulerB)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

La salida sólo muestra los hilos de schedulerA:

17  | main  | Starting
73  | main  | Created
83  | main  | Exiting
84  | Sched-A-0 | Subscribed
84  | Sched-A-0 | Got A
84  | Sched-A-0 | Got B
84  | Sched-A-0 | Completed

Curiosamente, la suscripción en schedulerB no se ignora por completo en favor de schedulerA.schedulerB se sigue utilizando durante un breve periodo de tiempo, pero apenas programa nuevas acciones en schedulerA, que realiza todo el trabajo. Así pues, los múltiples subscribeOn() no sólo se ignoran, sino que introducen una pequeña sobrecarga.

Hablando de operadores, hemos dicho que el método create() utilizado cuando hay un nuevo Subscriber se ejecuta dentro del programador proporcionado (si lo hay). Pero, ¿qué hilo ejecuta todas estas transformaciones que se producen entre create() y subscribe()? Ya sabemos que cuando todos los operadores se ejecutan por defecto en el mismo hilo (planificador), no hay concurrencia por defecto:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(this::log)
        .map(x -> x + '1')
        .doOnNext(this::log)
        .map(x -> x + '2')
        .subscribeOn(schedulerA)
        .doOnNext(this::log)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

Salpicamos ocasionalmente la cadena de operadores con doOnNext() para ver qué hilo tiene el control en ese momento. Recuerda que la posición de subscribeOn() no es relevante, puede estar justo después de Observable o justo antes de subscribe(). El resultado no es sorprendente:

20  | main  | Starting
104 | main  | Created
123 | main  | Exiting
124 | Sched-A-0 | Subscribed
124 | Sched-A-0 | A
124 | Sched-A-0 | A1
124 | Sched-A-0 | A12
124 | Sched-A-0 | Got A12
124 | Sched-A-0 | B
124 | Sched-A-0 | B1
124 | Sched-A-0 | B12
125 | Sched-A-0 | Got B12

Observa cómo se invoca a create() y produce los eventos A y B. Estos eventos viajan secuencialmente a través del hilo del programador para llegar finalmente a Subscriber. Muchos recién llegados a RxJava creen que el uso de Scheduler con un gran número de hilos bifurcará automáticamente el procesamiento de los eventos de forma concurrente y unirá de algún modo todos los resultados al final. Esto no es así. RxJava crea una única instancia de Worker (ver: "Visión general de los detalles de implementación del programador") para toda la cadena, sobre todo para garantizar el procesamiento secuencial de los eventos.

Esto significa que si uno de tus operadores es especialmente lento -por ejemplo, map() leyendo datos del disco para transformar los eventos que pasan- esta costosa operación se invocará dentro del mismo hilo. Un solo operador averiado puede ralentizar todo el canal, desde la producción hasta el consumo. Esto es un antipatrón en RxJava, los operadores deben ser no bloqueantes, rápidos y tan puros como sea posible.

De nuevo, flatMap() viene al rescate. En lugar de bloquearnos dentro de map(), podemos invocar a flatMap() y recoger asíncronamente todos los resultados. Por tanto, flatMap() y merge() son los operadores cuando queremos conseguir un verdadero paralelismo. Pero incluso con flatMap() no es obvio. Imagina una tienda de comestibles (llamémosla "RxGroceries") que proporciona una API para comprar productos:

class RxGroceries {

    Observable<BigDecimal> purchase(String productName, int quantity) {
        return Observable.fromCallable(() ->
            doPurchase(productName, quantity));
    }

    BigDecimal doPurchase(String productName, int quantity) {
        log("Purchasing " + quantity + " " + productName);
        //real logic here
        log("Done " + quantity + " " + productName);
        return priceForProduct;
    }

}

Obviamente, la implementación de doPurchase() es irrelevante aquí, sólo imagina que tarda algo de tiempo y recursos en completarse. Simulamos la lógica de negocio añadiendo una suspensión artificial de un segundo, ligeramente superior si quantity es mayor. El bloqueo de Observables como el devuelto por purchase() es inusual en una aplicación real, pero vamos a mantenerlo así con fines educativos. Al comprar varios bienes nos gustaría paralelizar todo lo posible y calcular el precio total de todos los bienes al final. El primer intento es infructuoso:

Observable<BigDecimal> totalPrice = Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .subscribeOn(schedulerA)  //BROKEN!!!
        .map(prod -> rxGroceries.doPurchase(prod, 1))
        .reduce(BigDecimal::add)
        .single();

El resultado es correcto, es un Observable con un único valor: el precio total, calculado con reduce(). Para cada producto, invocamos doPurchase() con quantity uno. Sin embargo, a pesar de utilizar schedulerA respaldado por un pool de hilos de 10, el código es totalmente secuencial:

144  | Sched-A-0 | Purchasing 1 bread
1144 | Sched-A-0 | Done 1 bread
1146 | Sched-A-0 | Purchasing 1 butter
2146 | Sched-A-0 | Done 1 butter
2146 | Sched-A-0 | Purchasing 1 milk
3147 | Sched-A-0 | Done 1 milk
3147 | Sched-A-0 | Purchasing 1 tomato
4147 | Sched-A-0 | Done 1 tomato
4147 | Sched-A-0 | Purchasing 1 cheese
5148 | Sched-A-0 | Done 1 cheese

Observa cómo cada producto bloquea el procesamiento de los siguientes. Cuando se realiza la compra de pan, la de mantequilla comienza inmediatamente, pero no antes. Curiosamente, ni siquiera sustituir map() por flatMap() ayuda, y el resultado es exactamente el mismo:

Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .subscribeOn(schedulerA)
        .flatMap(prod -> rxGroceries.purchase(prod, 1))
        .reduce(BigDecimal::add)
        .single();

El código no funciona de forma concurrente porque sólo hay un único flujo de eventos, que por diseño debe ejecutarse secuencialmente. De lo contrario, tu Subscriber tendría que ser consciente de las notificaciones concurrentes (onNext(), onComplete(), etc.), por lo que es un compromiso justo. Por suerte, la solución idiomática es muy parecida. Los principales productos emisores de Observable no pueden paralelizarse. Sin embargo, para cada producto, creamos un nuevo Observable independiente, devuelto por purchase(). Como son independientes, podemos programar con seguridad cada uno de ellos de forma concurrente:

Observable<BigDecimal> totalPrice = Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .flatMap(prod ->
                rxGroceries
                        .purchase(prod, 1)
                        .subscribeOn(schedulerA))
        .reduce(BigDecimal::add)
        .single();

¿Puedes ver dónde está subscribeOn()? El Observable principal no está haciendo realmente nada, por lo que no es necesario un grupo de hilos especial. Sin embargo, cada subflujo creado dentro de flatMap() recibe un schedulerA. Cada vez que se utiliza subscribeOn() al Scheduler tiene la oportunidad de devolver un nuevo Worker, y por tanto un hilo separado (simplificando un poco):

113  | Sched-A-1 | Purchasing 1 butter
114  | Sched-A-0 | Purchasing 1 bread
125  | Sched-A-2 | Purchasing 1 milk
125  | Sched-A-3 | Purchasing 1 tomato
126  | Sched-A-4 | Purchasing 1 cheese
1126 | Sched-A-2 | Done 1 milk
1126 | Sched-A-0 | Done 1 bread
1126 | Sched-A-1 | Done 1 butter
1128 | Sched-A-3 | Done 1 tomato
1128 | Sched-A-4 | Done 1 cheese

Por último, hemos conseguido una verdadera concurrencia. Cada operación de compra comienza ahora al mismo tiempo y todas terminan finalmente. El operador flatMap() está cuidadosamente diseñado e implementado para que recoja todos los eventos de todos los flujos independientes y los empuje hacia abajo secuencialmente. Sin embargo, como ya aprendimos en "Orden de los eventos tras flatMap()", ya no podemos confiar en el orden de los eventos aguas abajo: ni empiezan ni terminan en el mismo orden en que fueron emitidos (la secuencia original empezaba en pan). Cuando los eventos llegan al operador reduce(), ya son secuenciales y se comportan bien.

A estas alturas, deberías alejarte poco a poco del modelo clásico Thread y comprender cómo funciona Schedulers. Pero si te resulta difícil, he aquí una sencilla analogía:

  • Observable sin ningún Scheduler funciona como un programa monohilo con llamadas a métodos bloqueantes que se pasan datos entre sí.

  • Observable con un único subscribeOn() es como iniciar una gran tarea en segundo plano Thread. El programa dentro de ese Thread sigue siendo secuencial, pero al menos se ejecuta en segundo plano.

  • Observable usando flatMap() donde cada Observable interno tiene subscribeOn() funciona como ForkJoinPool de java.util.concurrent, donde cada subcorriente es una bifurcación de ejecución y flatMap() es una etapa de unión segura.

Por supuesto, los consejos anteriores sólo se aplican a Observables bloqueantes, que rara vez se ven en aplicaciones reales. Si tus Observables subyacentes ya son asíncronos, conseguir concurrencia es cuestión de entender cómo se combinan y cuándo se produce la suscripción. Por ejemplo, merge() en dos flujos se suscribirá a ambos de forma concurrente, mientras que el operador concat() espera a que termine el primer flujo antes de suscribirse al segundo.

Agrupar peticiones utilizando groupBy()

¿Te has dado cuenta de que RxGroceries.purchase() toma productName y quantity aunque la cantidad fuera siempre una? ¿Y si nuestra lista de la compra tuviera algunos productos varias veces, indicando una mayor demanda? La primera implementación ingenua simplemente envía la misma solicitud -por ejemplo, de huevos- varias veces, cada vez pidiendo uno. Afortunadamente, podemos enviar por lotes estas peticiones de forma declarativa utilizando groupBy()-y esto sigue funcionando con la concurrencia declarativa:

import org.apache.commons.lang3.tuple.Pair;

Observable<BigDecimal> totalPrice = Observable
    .just("bread", "butter", "egg", "milk", "tomato",
      "cheese", "tomato", "egg", "egg")
    .groupBy(prod -> prod)
    .flatMap(grouped -> grouped
        .count()
        .map(quantity -> {
            String productName = grouped.getKey();
            return Pair.of(productName, quantity);
        }))
    .flatMap(order -> store
        .purchase(order.getKey(), order.getValue())
        .subscribeOn(schedulerA))
    .reduce(BigDecimal::add)
    .single();

Este código es bastante complejo, así que antes de revelar el resultado, vamos a repasarlo rápidamente. En primer lugar, agrupamos los productos simplemente por su nombre, así la función de identidad prod -> prod. A cambio obtenemos un incómodo Observable<GroupedObservable<String, String>>. No hay nada malo en ello. A continuación, flatMap() recibe cada GroupedObservable<String, String>, que representa todos los productos del mismo nombre. Así, por ejemplo, habrá un ["egg", "egg", "egg"] Observable allí con una clave "egg", también. Si groupBy() utilizara una función de clave diferente, como prod.length(), la misma secuencia tendría una clave 3.

En este punto, dentro de flatMap() necesitamos construir un Observable de tipo Pair<String, Integer> que represente cada producto único y su cantidad. Tanto count() como map() devuelven un Observable, así que todo encaja perfectamente. En segundo lugar flatMap() recibe order de tipo Pair<String, Integer> y realiza una compra, esta vez la cantidad puede ser mayor. El resultado parece perfecto; fíjate en que los pedidos más grandes son ligeramente más lentos, pero aun así es mucho más rápido que tener varias peticiones repetidas:

164  | Sched-A-0 | Purchasing 1 bread
165  | Sched-A-1 | Purchasing 1 butter
166  | Sched-A-2 | Purchasing 3 egg
166  | Sched-A-3 | Purchasing 1 milk
166  | Sched-A-4 | Purchasing 2 tomato
166  | Sched-A-5 | Purchasing 1 cheese
1151 | Sched-A-0 | Done 1 bread
1178 | Sched-A-1 | Done 1 butter
1180 | Sched-A-5 | Done 1 cheese
1183 | Sched-A-3 | Done 1 milk
1253 | Sched-A-4 | Done 2 tomato
1354 | Sched-A-2 | Done 3 egg

Si crees que tu sistema puede beneficiarse de la agrupación por lotes de una forma u otra, consulta "Agrupar y colapsar comandos".

Concurrencia declarativa con observeOn()

Lo creas o no, la concurrencia en RxJava puede describirse mediante dos operadores: los ya mencionados subscribeOn() y observeOn(). Parecen muy similares y resultan confusos para los recién llegados, pero su semántica es en realidad bastante clara y razonable.

subscribeOn() permite elegir qué Scheduler se utilizará para invocar OnSubscribe (expresión lambda dentro de create()). Por lo tanto, cualquier código dentro de create() se empuja a un hilo diferente -por ejemplo, para evitar bloquear el hilo principal. A la inversa, observeOn() controla qué Scheduler se utiliza para invocar Subscribers posteriores que se producen después de observeOn(). Por ejemplo, la llamada a create() se produce en io() Scheduler (a través de subscribeOn(io())) para evitar bloquear la interfaz de usuario. Sin embargo, la actualización de los widgets de la interfaz de usuario debe ocurrir en el hilo de UI (tanto Swing como Android tienen esta restricción), así que usamos observeOn() por ejemplo con AndroidSchedulers.mainThread() antes de que los operadores o suscriptores cambien la UI. De esta forma podemos usar un Scheduler para manejar create() y todos los operadores hasta el primer observeOn(), pero otro(s) para aplicar transformaciones. Esto se explica mejor con un ejemplo:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(x -> log("Found 1: " + x))
        .observeOn(schedulerA)
        .doOnNext(x -> log("Found 2: " + x))
        .subscribe(
                x -> log("Got 1: " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

observeOn() se produce en algún punto de la cadena de tuberías, y esta vez, a diferencia de subscribeOn(), la posición de observeOn() es bastante importante. No importa qué Scheduler estuviera ejecutando operadores por encima de observeOn() (si los hubiera), todo lo que hay por debajo utiliza el suministrado Scheduler. En este ejemplo, no hay subscribeOn(), por lo que se aplica el valor por defecto (sin concurrencia):

23  | main  | Starting
136 | main  | Created
163 | main  | Subscribed
163 | main  | Found 1: A
163 | main  | Found 1: B
163 | main  | Exiting
163 | Sched-A-0 | Found 2: A
164 | Sched-A-0 | Got 1: A
164 | Sched-A-0 | Found 2: B
164 | Sched-A-0 | Got 1: B
164 | Sched-A-0 | Completed

Todos los operadores por encima de observeOn se ejecutan dentro del hilo cliente, que resulta ser el predeterminado en RxJava. Pero por debajo de observeOn(), los operadores se ejecutan dentro del Scheduler suministrado. Esto se hará aún más evidente cuando tanto subscribeOn() como varios observeOn() se produzcan dentro de la cadena:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(x -> log("Found 1: " + x))
        .observeOn(schedulerB)
        .doOnNext(x -> log("Found 2: " + x))
        .observeOn(schedulerC)
        .doOnNext(x -> log("Found 3: " + x))
        .subscribeOn(schedulerA)
        .subscribe(
                x -> log("Got 1: " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

¿Puedes predecir el resultado? Recuerda que todo lo que esté por debajo de observeOn() se ejecuta dentro del Scheduler suministrado , por supuesto hasta que se encuentre otro observeOn(). Además subscribeOn() puede ocurrir en cualquier lugar entre Observable y subscribe(), pero esta vez sólo afecta a los operadores hasta el primer observeOn():

21  | main  | Starting
98  | main  | Created
108 | main  | Exiting
129 | Sched-A-0 | Subscribed
129 | Sched-A-0 | Found 1: A
129 | Sched-A-0 | Found 1: B
130 | Sched-B-0 | Found 2: A
130 | Sched-B-0 | Found 2: B
130 | Sched-C-0 | Found 3: A
130 | Sched-C-0 | Got: A
130 | Sched-C-0 | Found 3: B
130 | Sched-C-0 | Got: B
130 | Sched-C-0 | Completed

La suscripción se produce en schedulerA porque es lo que especificamos en subscribeOn(). También se ejecutó el operador "Found 1" dentro de ese Scheduler porque está antes del primer observeOn(). Más adelante, la situación se vuelve más interesante. observeOn() cambia el actual Scheduler por schedulerB, y "Found 2" utiliza éste, en su lugar. El último observeOn(schedulerC) afecta tanto al operador "Found 3" como a Subscriber. Recuerda que Subscriber funciona en el contexto del último Scheduler encontrado.

subscribeOn() y observeOn() funcionan realmente bien juntos cuando quieres desacoplar físicamente al productor (Observable.create()) y al consumidor (Subscriber). Por defecto, no existe tal desacoplamiento, y RxJava simplemente utiliza el mismo hilo. subscribeOn() no es suficiente, simplemente elegimos un hilo diferente.observeOn() es mejor, pero entonces bloqueamos el hilo del cliente en caso de Observables síncronos. Como la mayoría de los operadores son no bloqueantes y las expresiones lambda utilizadas en su interior suelen ser cortas y baratas, normalmente sólo hay un subscribeOn() y un observeOn() en la cadena de operadores. subscribeOn() puede colocarse cerca del Observable original para mejorar la legibilidad, mientras que observeOn() está cerca de subscribe() para que sólo Subscriber utilice ese Scheduler especial, los demás operadores dependen del Scheduler de subscribeOn().

Aquí tienes un programa más avanzado que aprovecha estos dos operadores:

log("Starting");
Observable<String> obs = Observable.create(subscriber -> {
    log("Subscribed");
    subscriber.onNext("A");
    subscriber.onNext("B");
    subscriber.onNext("C");
    subscriber.onNext("D");
    subscriber.onCompleted();
});
log("Created");
obs
    .subscribeOn(schedulerA)
    .flatMap(record -> store(record).subscribeOn(schedulerB))
    .observeOn(schedulerC)
    .subscribe(
            x -> log("Got: " + x),
            Throwable::printStackTrace,
            () -> log("Completed")
    );
log("Exiting");

Donde store() es una simple operación anidada:

Observable<UUID> store(String s) {
    return Observable.create(subscriber -> {
        log("Storing " + s);
        //hard work
        subscriber.onNext(UUID.randomUUID());
        subscriber.onCompleted();
    });
}

La producción de eventos ocurre en schedulerA, pero cada evento se procesa de forma independiente utilizando schedulerB para mejorar la concurrencia, una técnica que aprendimos en "Concurrencia y comportamiento de subscribeOn()". La suscripción al final ocurre en otro schedulerC. Estamos bastante seguros de que ya entiendes qué Scheduler/hilo ejecutará qué acción, pero por si acaso (se han añadido líneas vacías para mayor claridad):

26   | main  | Starting
93   | main  | Created
121  | main  | Exiting

122  | Sched-A-0 | Subscribed
124  | Sched-B-0 | Storing A
124  | Sched-B-1 | Storing B
124  | Sched-B-2 | Storing C
124  | Sched-B-3 | Storing D

1136 | Sched-C-1 | Got: 44b8b999-e687-485f-b17a-a11f6a4bb9ce
1136 | Sched-C-1 | Got: 532ed720-eb35-4764-844e-690327ac4fe8
1136 | Sched-C-1 | Got: 13ddf253-c720-48fa-b248-4737579a2c2a
1136 | Sched-C-1 | Got: 0eced01d-3fa7-45ec-96fb-572ff1e33587
1137 | Sched-C-1 | Completed

observeOn() es especialmente importante para las aplicaciones con una UI para la que no queremos bloquear el hilo de dispersión de eventos de la UI. En Android (ver "Desarrollo Android con RxJava") o Swing, algunas acciones como la actualización de la UI deben ejecutarse dentro de un hilo específico. Pero hacer demasiado en ese hilo hace que tu UI no responda. En estos casos, colocas observeOn() cerca de subscribe() para que el código dentro de la suscripción se invoque en el contexto de un Scheduler concreto (como el hilo de la UI). Sin embargo, otras transformaciones, incluso las más baratas, deben ejecutarse fuera del hilo de la UI. En el servidor, observeOn() se utiliza raramente porque la verdadera fuente de concurrencia está incorporada en la mayoría de los Observables. Esto nos lleva a una conclusión interesante: RxJava controla la concurrencia con sólo dos operadores (subscribeOn() y observeOn()), pero cuanto más utilices las extensiones reactivas, menos frecuentemente las verás en el código de producción.

Otros usos de los programadores

Hay numerosos operadores que utilizan por defecto algún Scheduler. Normalmente, se utiliza Schedulers.computation() si no se proporciona ninguno-JavaDoc siempre lo deja claro. Por ejemplo, el operador delay() toma eventos aguas arriba y los empuja aguas abajo después de un tiempo determinado. Obviamente, no puede mantener el hilo original durante ese periodo, por lo que debe utilizar un Scheduler diferente:

Observable
        .just('A', 'B')
        .delay(1, SECONDS, schedulerA)
        .subscribe(this::log);

Sin suministrar un schedulerA personalizado, todos los operadores por debajo de delay() utilizarían el computation() Scheduler . No hay nada intrínsecamente malo en ello; sin embargo, si tu Subscriber se bloquea en E/S, consumiría un Worker del programador computation() compartido globalmente, lo que posiblemente afectaría a todo el sistema. Otros operadores importantes que admiten Scheduler personalizados son: interval(), range(), timer(), repeat(), skip(), take(), timeout(), y varios otros que aún no se han introducido. Si no proporcionas un planificador a dichos operadores, se utiliza computation() Scheduler , que es un valor predeterminado seguro en la mayoría de los casos.

Dominar los programadores es esencial para escribir código escalable y seguro utilizando RxJava. La diferencia entre subscribeOn() y observeOn() es especialmente importante con cargas elevadas, en las que cada tarea debe ejecutarse exactamente cuando esperamos. En las aplicaciones verdaderamente reactivas, en las que todas las operaciones de larga duración son asíncronas, se necesitan muy pocos hilos y, por tanto, Schedulers. Pero siempre hay alguna API o dependencia que requiere código de bloqueo.

Por último, pero no por ello menos importante, debemos estar seguros de que Schedulers utilizados aguas abajo pueden seguir el ritmo de la carga generada por Schedulers aguas arriba. Pero este peligro se explicará con todo detalle en el Capítulo 6.

Resumen

En este capítulo se han descrito varios patrones de las aplicaciones tradicionales que pueden sustituirse con RxJava. Espero que a estas alturas comprendas que el comercio de alta frecuencia o el streaming de publicaciones de las redes sociales no son los únicos casos de uso de RxJava. De hecho, casi cualquier API puede sustituirse sin problemas por Observable. Incluso si no quieres o no necesitas la potencia de las extensiones reactivas en este momento, te permitirá evolucionar la implementación sin introducir cambios incompatibles con el pasado. Además, es el cliente el que acaba cosechando todas las posibilidades que ofrece RxJava, como la pereza, la concurrencia declarativa o el encadenamiento asíncrono. Aún mejor, gracias a la conversión sin fisuras deObservable a BlockingObservable, los clientes tradicionales pueden consumir tu API como quieran, y tú siempre puedes proporcionar una sencilla capa puente.

Deberías tener bastante confianza con RxJava y comprender las ventajas de aplicarlo incluso en sistemas heredados. Sin duda, trabajar conObservables reactivos es más desafiante y tiene una curva de aprendizaje algo empinada, pero las ventajas y posibilidades de crecimiento simplemente no pueden exagerarse. ¿Imaginas que pudiéramos escribir aplicaciones enteras utilizando extensiones reactivas, de arriba abajo? Como un proyecto totalmente nuevo para el que tuviéramos control sobre cada API, interfaz y sistema externo. El Capítulo 5 tratará sobre cómo puedes escribir una aplicación así y cuáles son sus implicaciones.

1 De hecho, RxJava intenta permanecer en el mismo hilo mediante la afinidad de hilos en el modelo de bucle de eventos para aprovechar también esto.

2 Ver también "Patrón de mamparo y Fail-Fast"

3 Compáralo con la evaluación perezosa de expresiones en Haskell.

4 Obviamente, para cualquier proyecto real, utilizarás un sistema de registro de nivel de producción como Logback o Log4J 2.

5 https://github.com/ReactiveX/RxSwing

Get Programación Reactiva con RxJava now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.