Capítulo 4. Principios de diseño de los sistemas reactivos

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

En el Capítulo 3, vimos los retos que plantean los sistemas distribuidos. Ahora es el momento de ver lo que Reactive puede ofrecer. Reactive puede verse como un conjunto de principios para construir sistemas distribuidos, una especie de lista de comprobación para verificar que no se ha pasado por alto ninguna preocupación importante conocida al arquitecturar y construir un sistema. Estos principios se centran en lo siguiente:

Capacidad de respuesta

La capacidad de gestionar solicitudes cuando se enfrentan a fallos o picos de carga

Eficacia

La capacidad de hacer más con menos recursos

En este capítulo, tratamos los principios que promueven los sistemas reactivos.

Sistemas reactivos 101

En 2013, un grupo de expertos en sistemas distribuidos se reunieron y escribieron la primera versión de "El Manifiesto Reactivo". Reunieron en este libro blanco su experiencia construyendo sistemas distribuidos y aplicaciones en la nube. Aunque en 2013 la nube no era precisamente lo que es hoy, la creación dinámica de recursos efímeros ya era un mecanismo bien conocido.

"El Manifiesto Reactivo" define los sistemas reactivos como sistemas distribuidos que tienen cuatro características:

Respuesta

Capaz de gestionar las solicitudes en el momento oportuno

Resistente

Capaz de gestionar los fallos con elegancia

Elástico

Capaz de escalar hacia arriba y hacia abajo según la carga y los recursos

Impulsado por mensajes

Utilizar la comunicación asíncrona basada en mensajes entre los componentes que forman el sistema

Estas cuatro características se representan en la Figura 4-1.

Reactive systems characteristics
Figura 4-1. Características de los sistemas reactivos

Si ves esta imagen por primera vez, puede que te confundan todas las flechas. Puede parecer una campaña de marketing bien diseñada. No lo es, y vamos a explicarte por qué estos pilares tienen mucho sentido a la hora de crear aplicaciones nativas de la nube y de Kubernetes. Empecemos por la parte inferior de la figura.

En lugar de intentar hacer que los sistemas distribuidos sean más simples de lo que son, los sistemas reactivos adoptan su naturaleza asíncrona.Utilizan el paso asíncrono de mensajes para establecer el tejido conectivo entre los componentes. El paso asíncrono de mensajes garantiza un acoplamiento laxo, aislamiento y transparencia de localización. En un sistema reactivo, las interacciones se basan en mensajes enviados a destinos abstractos. Estos mensajes lo transportan todo: datos y fallos. Estos mensajes lo transportan todo, tanto los datos como los fallos. El paso asíncrono de mensajes también mejora la utilización de los recursos. Emplear la comunicación no bloqueante (esta parte la veremos más adelante en este capítulo) permite que los componentes ociosos no consuman casi CPU ni memoria. El paso asíncrono de mensajes permite la elasticidad y la resiliencia, como muestran las dos flechas inferiores de la Figura 4-1.

Elasticidad significa que el sistema puede adaptarse a sí mismo, o a partes de sí mismo, para gestionar la carga fluctuante. Observando los mensajes que fluyen entre los componentes, un sistema puede determinar qué partes alcanzan sus límites y crear más instancias o dirigir los mensajes a otro lugar. La infraestructura de la nube permite crear estas instancias rápidamente en tiempo de ejecución. Pero la elasticidad no sólo consiste en escalar hacia arriba; también consiste en escalar hacia abajo. El sistema puede decidir reducir las partes infrautilizadas para ahorrar recursos. En tiempo de ejecución, el sistema se ajusta a sí mismo, satisfaciendo siempre la demanda actual, evitando cuellos de botella, desbordamientos y recursos comprometidos en exceso. Como puedes imaginar, la elasticidad requiere características de observabilidad, replicación y enrutamiento. La observabilidad se trata en el Capítulo 13. En general, las dos últimas las proporciona la infraestructura, como Kubernetes o los proveedores de la nube.

La resiliencia significa manejar los fallos con elegancia. Como se explicó en el Capítulo 3, los fallos son inevitables en los sistemas distribuidos. En lugar de ocultarlos, los sistemas reactivos consideran a los fallos ciudadanos de primera clase. El sistema debe ser capaz de manejarlos y reaccionar ante ellos. Los fallos se contienen dentro de cada componente, aislando los componentes entre sí. Este aislamiento garantiza que partes del sistema puedan fallar y recuperarse sin poner en peligro todo el sistema. Por ejemplo, al replicar los componentes (elasticidad), el sistema puede seguir gestionando los mensajes entrantes aunque fallen algunos elementos. La implementación de la resiliencia se comparte entre la aplicación (que debe ser consciente de los fallos, contenerlos y, si es posible, gestionarlos con elegancia) y la infraestructura (que monitoriza los sistemas y reinicia los componentes caídos).

La última característica es el objetivo de los sistemas reactivos: ser receptivos. Tu sistema debe seguir siendo receptivo -responder a tiempo- incluso con cargas fluctuantes (elasticidad) y ante fallos (resiliencia). Basarse en el paso de mensajes permite estas características y muchas más, como el control del flujo mediante el monitoreo de los mensajes en el sistema y la aplicación de contrapresión cuando sea necesario.

En pocas palabras, los sistemas reactivos son exactamente lo que queremos construir: sistemas distribuidos capaces de manejar la incertidumbre, los fallos y la carga de forma eficiente. Sus características cumplen a la perfección los requisitos de las aplicaciones nativas de la nube y de Kubernetes. Pero no te confundas; construir un sistema reactivo sigue siendo hacer un sistema distribuido. Es todo un reto. Sin embargo, siguiendo estos principios, el sistema resultante será más reactivo, más robusto y más eficiente. El resto de este libro detalla cómo podemos implementar fácilmente este tipo de sistemas con Quarkus y las tecnologías de mensajería.

Comandos y Eventos

Ahora que hemos cubierto muchos de los principios fundamentales, puede que estés confundido. En el Capítulo 1, dijimos que ser reactivo está relacionado con ser dirigido por eventos, pero en la sección anterior, mencionamos explícitamente el paso asíncrono de mensajes. ¿Significa lo mismo? No del todo.

Pero antes, debemos discutir las diferencias entre órdenes y eventos. Por complicado que pueda ser el diseño de un sistema distribuido, los conceptos de órdenes y eventos son fundamentales. Casi todas las interacciones entre componentes individuales implican uno u otro.

Comandos

Todos los sistemas emiten comandos.Los comandos son acciones que un usuario desea realizar. La mayoría de las API basadas en HTTP pasan comandos: el cliente pide que ocurra una acción. Es importante entender que la acción aún no ha ocurrido. Puede ocurrir en el futuro, o no; puede completarse con éxito o fallar. En general, los comandos se envían a un destinatario concreto, y se devuelve un resultado al cliente.

Tomemos como ejemplo la sencilla aplicación HTTP que utilizamos en el Capítulo 3. Emitiste una simple solicitud HTTP. Como hemos dicho, era una orden. La aplicación recibe esa orden, la gestiona y produce un resultado.

Eventos

Los eventos son acciones que se han completado con éxito. Un evento representa un hecho, algo que ha ocurrido: una pulsación de tecla, un fallo, una orden, cualquier cosa importante para la organización o el sistema en cuestión. Un evento puede ser el resultado del trabajo realizado por un comando.

Volvamos al ejemplo anterior de solicitud HTTP. Una vez escrita la respuesta, se convierte en un evento. Hemos visto una solicitud HTTP y su respuesta. Ese evento puede escribirse en un registro o difundirse a las partes interesadas para que sepan lo que ha ocurrido.

Los hechos son inmutables. No puedes borrar un hecho. Es cierto que no puedes cambiar el pasado. Si quieres refutar un hecho enviado previamente, necesitas disparar otro hecho que invalide el hecho. Los hechos llevados sólo se vuelven irrelevantes por otro hecho que establezca el conocimiento actual.

Mensajes

Pero, ¿cómo publicar estos eventos? Hay muchas maneras. Hoy en día, son populares soluciones como Apache Kafka o Apache ActiveMQ (cubrimos ambas en el Capítulo 11), que actúan como intermediarios entre los productores y los consumidores. Esencialmente, nuestros eventos se escriben en temas o colas. Para escribir estos eventos, la aplicación envía un mensaje al intermediario, apuntando a un destino específico (la cola o el tema).

Un mensaje es una estructura de datos autocontenida que describe el acontecimiento y cualquier detalle relevante sobre el mismo, como quién lo emitió, a qué hora se emitió y, potencialmente, su ID único. Generalmente es mejor mantener el acontecimiento en sí centrado en el negocio y utilizar metadatos adicionales para los aspectos técnicos.

Por otro lado, para consumir eventos, te suscribes a la cola o tema que contiene los eventos que te interesan y recibes los mensajes. Desenvuelves el evento y también puedes obtener los metadatos asociados (por ejemplo, cuándo ocurrió el evento, dónde ocurrió, etc.) El procesamiento de un evento puede dar lugar a la publicación de otros eventos (de nuevo, empaquetados en mensajes y enviados a un destino conocido) o a la ejecución de comandos.

Los intermediarios y los mensajes también pueden transmitir órdenes. En este caso, el mensaje contiene la descripción de la acción a ejecutar, y otro mensaje (potencialmente varios mensajes) llevaría el resultado si fuera necesario.

Comandos frente a eventos: Un ejemplo

Veamos un ejemplo para resaltar las diferencias entre comandos y eventos. Imagina una tienda de comercio electrónico, como la que se muestra en la Figura 4-2. El usuario elige un conjunto de productos y finaliza el pedido (procesa el pago, obtiene la fecha de entrega, etc.).

Simplified architecture of an ecommerce shop
Figura 4-2. Arquitectura simplificada de una tienda de comercio electrónico

El usuario envía una orden (mediante una solicitud HTTP, por ejemplo) al servicio tienda con los artículos que desea recibir. En una aplicación tradicional, una vez que ShopService recibe la orden, llamaría a OrderService e invocaría a un método order con el nombre de usuario, la lista de artículos (cesta), etc. Llamar al método order es una orden. Eso hace que ShopService dependa de OrderService y reduce la autonomía de los componentes: ShopService no puede funcionar sin OrderService. Estamos creando un monolito distribuido, una aplicación distribuida que se colapsaría en cuanto fallara una de sus partes.1

Veamos la diferencia si, en lugar de utilizar un comando entre ShopService y OrderService, publicamos un evento. Una vez que el usuario finaliza el pedido, la aplicación sigue enviando un comando a ShopService. Sin embargo, esta vez, ShopService transforma ese comando en un evento: se ha realizado un nuevo pedido. El evento contiene el usuario, la cesta, etc. El evento es un hecho escrito en un registro, o envuelto en un mensaje y enviado a un intermediario.

Por otro lado, OrderService observa el evento se ha realizado un nuevo pedido, leyendo dónde se almacenan estos eventos. Cuando ShopService emite el evento, lo recibe y puede procesarlo.

Con esta arquitectura, ShopService no depende de OrderService. Además, OrderService no depende de ShopService, y procesaría cualquier evento observado, independientemente del emisor. Por ejemplo, una aplicación móvil puede emitir el mismo evento cuando el usuario valida un pedido desde un teléfono móvil.

Varios componentes pueden consumir eventos(Figura 4-3). Por ejemplo, además de OrderService, StatisticsService lleva la cuenta de los elementos más pedidos. Consume el mismo evento, sin tener que modificar ShopService para recibirlos.

Un componente que observe sucesos puede derivar otros nuevos a partir de ellos. Por ejemplo,StatisticsService podría analizar el pedido y calcular recomendaciones. Estas recomendaciones podrían verse como otro hecho, y así comunicarse como un evento.ShopService podría observar estos eventos y procesarlos para influir en la selección de artículos. Sin embargo, StatisticsService y ShopService son independientes entre sí. El conocimiento es acumulativo y se produce al recibir nuevos eventos y derivar, como hace StatisticsService, nuevos hechos a partir de los eventos recibidos.

Como se muestra en la Figura 4-3, podemos utilizar colas de mensajes para transportar nuestros eventos. Estos eventos se envuelven en mensajes, que se envían a destinos conocidos (orders yrecommendations).OrderService y StatisticsService consumen y procesan los mensajes de forma independiente.

Architecture of the ecommerce shop using events and message brokers
Figura 4-3. Arquitectura de la tienda de comercio electrónico con eventos y colas de mensajes

Es importante que estos destinos persistan los eventos como una secuencia ordenada. Al mantener esa secuencia, el sistema puede volver atrás en el tiempo y reprocesar los eventos.Este mecanismo de repetición, popular en el mundo Kafka, tiene múltiples ventajas. Puede reiniciarse con un estado limpio tras un desastre reprocesando todos los eventos almacenados. Luego, si cambiamos el algoritmo de recomendación de los servicios estadísticos, por ejemplo, podría volver a acumular todo el conocimiento y derivar nuevas recomendaciones.

Aunque la emisión de eventos suena explícita en este ejemplo, no siempre es así. Por ejemplo, los eventos pueden crearse a partir de escrituras en la base de datos.2

Los comandos y los eventos son la base de la mayoría de las interacciones. Aunque utilizamos principalmente comandos, los eventos tienen importantes ventajas. Los eventos son hechos. Los eventos cuentan una historia, la historia de tu sistema, una narración que describe la evolución de tu sistema. En los sistemas reactivos, los eventos se envuelven en mensajes, y estos mensajes se envían a destinos, transportados por corredores de mensajes como AMQP o Kafka(Figura 4-4). Este enfoque resuelve dos importantes problemas arquitectónicos derivados de los sistemas distribuidos. En primer lugar, maneja de forma natural la asincronía del mundo real. En segundo lugar, une servicios sin depender de un fuerte acoplamiento. En el perímetro del sistema, este enfoque utiliza comandos la mayor parte del tiempo, a menudo confiando en HTTP.

Overview of a reactive system
Figura 4-4. Visión general de un sistema reactivo

Este aspecto asíncrono del paso de mensajes de los sistemas reactivos forma el tejido conjuntivo. No sólo otorga más autonomía e independencia a las aplicaciones que forman el sistema, sino que también permite resiliencia y elasticidad. Puede que te preguntes cómo, y obtendrás el principio de nuestra respuesta en la siguiente sección.

Destinos y desacoplamiento espacial

Las aplicaciones reactivas, que forman un sistema reactivo, se comunican mediante mensajes. Se suscriben a destinos y reciben los mensajes enviados por otros componentes a estos destinos. Estos mensajes pueden transportar órdenes o eventos, aunque, como se ha descrito en el apartado anterior, los eventos aportan ventajas interesantes. Estos destinos no están vinculados a componentes o instancias concretas. Son virtuales. Los componentes sólo deben conocer el nombre (generalmente relacionado con el negocio, como orders) del destino, no quién lo produce o consume. Permite la transparencia de la ubicación.

Si utilizas Kubernetes, puedes considerar que la transparencia de ubicación ya está gestionada por ti. De hecho, puedes utilizar los servicios de Kubernetes para aplicar la transparencia de ubicación. Tienes un único punto final que delega en un grupo de pods seleccionados. Pero esta transparencia de ubicación es algo limitada y suele estar vinculada a protocolos HTTP o de solicitud/respuesta. Otros entornos pueden utilizar infraestructuras de descubrimiento de servicios, como HashiCorp Consul o Netflix Eureka.

Utilizar mensajes enviados a un destino te permite, como remitente, ignorar quién va a recibir exactamente el mensaje. No sabes si hay alguien disponible en ese momento o si hay varios componentes o instancias esperando tu mensaje. Este número de consumidores puede evolucionar en tiempo de ejecución; se pueden crear, mover o destruir más instancias, y se pueden implementar nuevos componentes. Pero tú, como remitente, no necesitas saberlo, sólo utilizas un destino especificado. Ilustremos las ventajas de esta direccionabilidad utilizando el ejemplo de la sección anterior.ShopService emite eventos order placed transportados dentro de mensajes enviados al destino orders (Figura 4-3). Es probable que durante un periodo tranquilo, sólo se ejecute una única instancia de OrderService. Si no hay muchos pedidos, ¿para qué molestarse en tener más? Incluso podríamos imaginarnos no tener ninguna instancia, e instanciar una cuando recibamos un pedido. Las plataformas sin servidor ofrecen esta capacidad de escalado desde cero. Sin embargo, con el tiempo, tu tienda consigue más clientes, y una sola instancia puede no ser suficiente. Gracias a la transparencia deubicación, podemos iniciar otras instancias de OrderService para compartir la carga(Figura 4-5).ShopService no se modifica e ignora esta nueva topología.

Elasticity provided by the use of message passing
Figura 4-5. Elasticidad proporcionada por el uso del paso de mensajes

La forma en que se reparte la carga entre los consumidores también es irrelevante para el emisor. Puede ser un round-robin, una selección basada en la carga o algo más inteligente. Cuando la carga vuelve a la normalidad, el sistema puede reducir el número de instancias y ahorrar recursos. Ten en cuenta que este tipo de elasticidad funciona perfectamente para los servicios sin estado. Para los servicios con estado, puede ser más difícil, ya que las instancias pueden tener que compartir el estado. Sin embargo, existen soluciones (aunque no sin advertencias), como Kubernetes StatefulSet o una red de datos en memoria, para coordinar el estado entre las instancias del mismo servicio. El paso de mensajes también permite la replicación. Siguiendo el mismo principio, podemos hacer sombra a la instancia activa de OrderService y tomar el relevo si falla la instancia primaria(Figura 4-6). Este enfoque evita la interrupción del servicio. Ese tipo de conmutación por error también puede requerir compartir el estado.

Resilience provided by the use of message passing
Figura 4-6. Resistencia proporcionada por el uso del paso de mensajes

Al utilizar el paso de mensajes, nuestro sistema no sólo se vuelve asíncrono, sino también elástico y resistente. Cuando arquitectures tu sistema, enumera los destinos que implementan el patrón de comunicación que deseas. En general, utilizarías un destino por tipo de evento, pero no es necesariamente así. Sin embargo, evita a toda costa tener un destino por instancia de componente. Introduce acoplamiento entre el emisor y el receptor, desechando las ventajas. También reduce la extensibilidad. Por último, es importante mantener estable el conjunto de destinos. Cambiar un destino rompería los componentes que lo utilizan o te obligaría a gestionar redireccionamientos.

Desacoplamiento temporal

La transparencia de localización no es la única ventaja. El paso asíncrono de mensajes también permite desacoplar el tiempo.

Los corredores de mensajes modernos, como AMQP 1 .0, Apache Kafka e incluso Java Message Service (JMS), permiten el desacoplamiento temporal. Con estos corredores de eventos, los eventos no se pierden si no hay consumidores. Los eventos se almacenan y se entregan más tarde. Cada corredor tiene su propia manera. Por ejemplo, AMQP 1.0 utiliza mensajes persistentes y suscriptores duraderos para garantizar la entrega de los mensajes. Kafka almacena los registros en un registro ordenado duradero y tolerante a fallos. Los registros se pueden recuperar siempre que permanezcan almacenados dentro del tema.

Si nuestro ShopService emite los pedidos finalizados como eventos, no necesita saber si OrderService está disponible. Sabe que se procesarán en algún momento. Si, por ejemplo, no hay ninguna instancia de OrderService disponible cuando ShopService emite el evento, no se pierde. Cuando una instancia está lista, recibe los pedidos pendientes y los procesa. Entonces se notifica al usuario de forma asíncrona con un correo electrónico.

Por supuesto, el corredor de mensajes debe estar disponible y accesible. La mayoría de los corredores de mensajes tienen capacidades de replicación que evitan los problemas de indisponibilidad y la pérdida de mensajes.

Nota

Cada vez es más habitual almacenar los sucesos en un registro de sucesos. Esta estructura ordenada y anexada representa el historial completo de tu sistema. Cada vez que cambia el estado, el sistema anexa el nuevo estado al registro.

El desacoplamiento temporal aumenta la independencia de nuestros componentes. El desacoplamiento temporal, combinado con otras funciones que permite el paso asíncrono de mensajes, consigue un alto nivel de independencia entre nuestros componentes y mantiene el acoplamiento almínimo.

El papel de la entrada/salida no bloqueante

Llegados a este punto, puede que te preguntes cuál es la diferencia entre una aplicación que utiliza Kafka o AMQP y un sistema reactivo. El paso de mensajes es la esencia de los sistemas reactivos, y la mayoría de ellos dependen de algún tipo de intermediario de mensajes. El paso de mensajes permite la resiliencia y la elasticidad, que conducen a la capacidad de respuesta. Fomenta el desacoplamiento espacial y temporal, lo que hace que nuestro sistema sea mucho más robusto.

Pero los sistemas reactivos no se limitan a intercambiar mensajes. El envío y la recepción de mensajes deben realizarse de forma eficiente. Para conseguirlo, los sistemas reactivos promueven el uso de E/S no bloqueantes.

Bloqueo de E/S de red, hilos y concurrencia

Para comprender las ventajas de la E/S no bloqueante, necesitamos saber cómo funcionan las E/S bloqueantes. Utilicemos una interacción cliente/servidor para ilustrarlo. Cuando un cliente envía una solicitud a un servidor, el servidor la procesa y devuelve una respuesta. HTTP, por ejemplo, sigue este principio. Para que esto ocurra, tanto el cliente como el servidor necesitan establecer una conexión antes de que comience la interacción. No entraremos en las profundidades del modelo de siete capas y la pila de protocolos que intervienen en esta interacción; puedes encontrar muchos artículos en Internet sobre ese tema.

Nota

Los ejemplos de esta sección pueden ejecutarse directamente desde tu IDE. Utiliza chapter-4/non-blocking-io/src/main/java/org/acme/client/EchoClient.java para invocar al servidor iniciado. Asegúrate de no ejecutar varios servidores simultáneamente, ya que todos utilizan el mismo puerto (9999).

Para establecer esa conexión entre el cliente y el servidor, utilizamos sockets, como se muestra en el Ejemplo 4-1.

Ejemplo 4-1. Un servidor de eco monohilo que utiliza E/S de bloqueo(chapter-4/non-blocking-io/src/main/java/org/acme/blocking/BlockingEchoServer.java)
int port = 9999;

// Create a server socket
try (ServerSocket server = new ServerSocket(port)) {
    while (true) {

        // Wait for the next connection from a client
        Socket client = server.accept();

        PrintWriter response = new PrintWriter(client.getOutputStream(), true);
        BufferedReader request = new BufferedReader(
                new InputStreamReader(client.getInputStream()));

        String line;
        while ((line = request.readLine()) != null) {
            System.out.println("Server received message from client: " + line);
            // Echo the request
            response.println(line);

            // Add a way to stop the application.
            if ("done".equalsIgnoreCase(line)) {
                break;
            }
        }
        client.close();
    }
}

El cliente y el servidor tienen que enlazarse a un socket formando la conexión. El servidor escucha en su socket la conexión del cliente. Una vez establecida, el cliente y el servidor pueden tanto escribir como leer datos del socket enlazado a esa conexión.

Tradicionalmente, porque es más sencillo, las aplicaciones se desarrollan utilizando un modelo de desarrollo síncrono. Dicho modelo de desarrollo ejecuta las instrucciones secuencialmente, una tras otra. Por eso, cuando dichas aplicaciones interactúan a través de la red, esperan seguir utilizando un modelo de desarrollo síncrono incluso para la E/S. Este modelo utiliza la comunicación síncrona y bloquea la ejecución hasta que se completa la operación. En el Ejemplo 4-1, esperamos una conexión y la gestionamos de forma síncrona. Leemos y escribimos utilizando API síncronas. Es más sencillo, pero conlleva el uso de E/S de bloqueo.

Con la E/S bloqueante, cuando el cliente envía una solicitud al servidor, el socket que procesa esa conexión y el hilo correspondiente que lee de él se bloquean hasta que aparecen algunos datos leídos. Los bytes se acumulan en el búfer de la red hasta que todo está leído y listo para ser procesado. Hasta que la operación se completa, el servidor no puede hacer nada más que esperar.

La consecuencia de este modelo es que no podemos servir más de una conexión dentro de un mismo hilo. Cuando el servidor recibe una conexión, utiliza ese hilo para leer la solicitud, procesarla y escribir la respuesta. Ese hilo se bloquea hasta que el último byte de la respuesta se escribe en el cable. Una sola conexión de cliente bloquea el servidor! No es muy eficiente, ¿verdad?

Para ejecutar peticiones concurrentes con este enfoque, la única forma es tener varios hilos. Tenemos que asignar un nuevo hilo para cada conexión de cliente. Para gestionar más clientes, tienes que utilizar más hilos y procesar cada petición en un hilo de trabajador diferente; consulta el Ejemplo 4-2.

Ejemplo 4-2. Principios en los que se basa un servidor multihilo que utiliza E/S de bloqueo
while (listening) {
    accept a connection;
    create a worker thread to process the client request;
}

Para poner en práctica este principio, necesitamos un pool de hilos(pool de trabajadores). Cuando el cliente se conecta, aceptamos la conexión y descargamos el procesamiento a un hilo independiente. Así, el hilo del servidor puede seguir aceptando otras conexiones, como se muestra en el Ejemplo 4-3.

Ejemplo 4-3. Un servidor de eco multihilo que utiliza E/S de bloqueo(chapter-4/non-blocking-io/src/main/java/org/acme/blocking/BlockingWithWorkerEchoServer.java)
int port = 9999;
ExecutorService executors = Executors.newFixedThreadPool(10); 1

// Create a server socket
try (ServerSocket server = new ServerSocket(port)) {
    while (true) {

        // Wait for the next connection from a client
        Socket client = server.accept();

        executors.submit(() -> {                                    2
            try {
                PrintWriter response =
                new PrintWriter(client.getOutputStream(), true);
                BufferedReader request = new BufferedReader(
                        new InputStreamReader(client.getInputStream()));

                String line;
                while ((line = request.readLine()) != null) {
                    System.out.println(Thread.currentThread().getName() +
                            " - Server received message from client: " + line);
                    // Echo the request
                    response.println(line);

                    // Add a way to stop the application.
                    if ("done".equalsIgnoreCase(line)) {
                        break;
                    }
                }
                client.close();
            } catch (Exception e) {
                System.err.println("Couldn't serve I/O: " + e.toString());

            }
        });
    }
}
1

Crea un grupo de hilos de trabajo para gestionar la solicitud.

2

Descarga el procesamiento de la solicitud a un hilo del repositorio de hilos. El resto del código no se modifica.

Ese es el modelo utilizado, por defecto, en los marcos tradicionales de Java, como Jakarta EE o Spring.3 Aunque estos marcos utilicen E/S no bloqueante, utilizan hilos de trabajo para gestionar las peticiones. Pero este enfoque tiene muchos inconvenientes, entre ellos:

  • Cada hilo requiere que se le asigne una pila de memoria. Con el creciente número de conexiones, generar varios hilos y cambiar entre ellos consumirá no sólo memoria, sino también ciclos de CPU.

  • En un momento dado, varios hilos podrían estar esperando las peticiones del cliente, lo que supone un enorme desperdicio de recursos.

  • Tu concurrencia (el número de peticiones que puedes gestionar en un momento dado-10 en el ejemplo anterior) está limitada por el número de hilos que puedes crear.

En las nubes públicas, el enfoque de bloqueo de E/S infla tu factura mensual; en las nubes privadas, reduce la densidad de implementación. Por tanto, este enfoque no es ideal si tienes que gestionar muchas conexiones o implementar aplicaciones que traten con mucha E/S. En el ámbito de los sistemas distribuidos, suele ser así. Por suerte, hay una alternativa.

¿Cómo funciona la E/S no bloqueante?

La alternativa es la E/S no bloqueante. La diferencia es evidente por su nombre. En lugar de esperar a que se complete la transmisión, la persona que llama no está bloqueada y puede continuar su procesamiento. La magia ocurre en el sistema operativo. Con la E/S no bloqueante, el sistema operativo pone en cola las peticiones. El sistema procesa la E/S real en el futuro. Cuando la E/S finaliza, y la respuesta está lista, se produce una continuación, a menudo implementada como una llamada de retorno, y la persona que llama recibe el resultado.

Para comprender mejor las ventajas y ver cómo funcionan estas continuaciones, tenemosque mirar bajo el capó: ¿cómo se implementa la E/S sin bloqueo? Ya hemos mencionado una cola. El sistema pone en cola las operaciones de E/S y las devuelve inmediatamente, de modo que la persona que llama no se bloquea mientras espera a que se completen las operaciones de E/S. Cuando vuelve una respuesta, el sistema almacena el resultado en una estructura. Cuando la persona que llama necesita el resultado, interroga al sistema para ver si se ha completado la operación(Ejemplo 4-4).

Ejemplo 4-4. Un servidor de eco que utiliza E/Sno bloqueante (chapter-4/non-blocking-io/src/main/java/org/acme/nio/NonBlockingServer.java)
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
Selector selector = Selector.open();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);

channel.socket().bind(address);
// Server socket supports only ACCEPT
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
    int available = selector.select(); // wait for events
    if (available == 0) {
        continue;  // Nothing ready yet.
    }

    // We have the request ready to be processed.
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = keys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        if (key.isAcceptable()) {
            // --  New connection --
            SocketChannel client = channel.accept();
            client.configureBlocking(false);
            client.register(selector, SelectionKey.OP_READ);
            System.out.println("Client connection accepted: "
                + client.getLocalAddress());
        } else if (key.isReadable()) {
            // --  A client sent data ready to be read and we can write --
            SocketChannel client = (SocketChannel) key.channel();
            // Read the data assuming the size is sufficient for reading.
            ByteBuffer payload = ByteBuffer.allocate(256);
            int size = client.read(payload);
            if (size == -1 ) { // Handle disconnection
                System.out.println("Disconnection from "
                    + client.getRemoteAddress());
                channel.close();
                key.cancel();
            } else {
                String result = new String(payload.array(),
                    StandardCharsets.UTF_8).trim();
                System.out.println("Received message: " + result);
                if (result.equals("done")) {
                    client.close();
                }
                payload.rewind(); // Echo
                client.write(payload);
            }
        }
        // Be sure not to handle it twice.
        iterator.remove();
    }
}

La E/S no bloqueante introduce algunos conceptos nuevos:

  • No utilizamos InputStream ni OutputStream (que son bloqueantes por naturaleza), sino Buffer, que es un almacenamiento temporal.

  • Channel puede verse como un punto final de una conexión abierta.

  • Selector es la piedra angular de la E/S no bloqueante en Java.

Selector gestiona varios canales, ya sean canales de servidor o de cliente. Cuando utilizas E/S sin bloqueo, creas . Cada vez que tratas con un nuevo canal, registras este canal en el selector con los eventos que te interesan (aceptar, listo para leer, listo para escribir). Selector

Entonces tu código sondea Selector con un solo hilo para ver si el canal está listo. Cuando el canal está listo para leer o escribir, puedes empezar a leer y escribir. No necesitamos tener un hilo para cada canal en absoluto, y un solo hilo puede manejar muchos canales.

El selector es una abstracción de la implementación de E/S no bloqueante proporcionada por el sistema operativo subyacente. Existen varios enfoques, según los sistemas operativos.

En primer lugar, select se implementó en los años 80. Admite el registro de 1.024 sockets. Sin duda, eso era suficiente en los años 80, pero ya no.

poll es un sustituto de introducido en 1997. La diferencia más significativa es que ya no limita el número de tomas. Sin embargo, al igual que con , el sistema sólo te dice cuántos canales están listos, no cuáles. Tienes que iterar sobre el conjunto de canales para comprobar cuáles están listos. Cuando hay pocos canales, no es un gran problema. Una vez que el número de canales supera los cientos de miles, el tiempo de iteración es considerable. select poll select

Después, epoll apareció en 2002 en el Kernel Linux 2.5.44.Kqueue apareció en FreeBSD en 2000 y /dev/poll en Solaris por la misma época. Estos mecanismos devuelven el conjunto de canales que están listos para ser procesados: ¡se acabó la iteración sobre cada canal! Por último, los sistemas Windows proporcionan IOCP, una implementación optimizada de select.

Lo que es importante recordar es que, independientemente de cómo lo implementen los sistemas operativos, con la E/S no bloqueante, sólo necesitas un único hilo para gestionar múltiples peticiones. Este modelo es mucho más eficiente que la E/S bloqueante, ya que no necesitas crear hilos para gestionar peticiones concurrentes. La eliminación de estos hilos adicionales hace que tu aplicación sea mucho más eficiente en términos de consumo de memoria (aproximadamente 1 MB por hilo) y evita malgastar ciclos de CPU debido a los cambios de contexto (1-2 microsegundos por cambio).4

Los sistemas reactivos recomiendan el uso de E/S no bloqueantes para recibir y enviar mensajes. Así, tu aplicación puede manejar más mensajes con menos recursos. Otra ventaja es que una aplicación inactiva casi no consumiría memoria ni CPU. No tienes que reservar recursos por adelantado.

Patrón Reactor y Bucle de Sucesos

La E/S no bloqueante nos da la posibilidad de gestionar múltiples peticiones o mensajes concurrentes con un único hilo. ¿Cómo podríamos gestionar estas peticiones concurrentes? ¿Cómo estructuramos nuestro código cuando utilizamos la E/S no bloqueante? Los ejemplos dados en la sección anterior no escalan bien; podemos ver rápidamente que implementar una API REST con un modelo así será una pesadilla. Además, nos gustaría evitar el uso de hilos de trabajador, ya que descartaría las ventajas de la E/S no bloqueante. Necesitamos algo diferente: el patrón reactor.

El patrón reactor, ilustrado en la Figura 4-7, permite asociar eventos de E/S con manejadores de eventos. El reactor, piedra angular de este mecanismo, invoca a los manejadores de eventos cuando se recibe el evento esperado.

La finalidad del patrón reactor es evitar la creación de un hilo para cada mensaje, solicitud y conexión. Este patrón recibe eventos de varios canales y los distribuye secuencialmente a los manejadores de eventos correspondientes.

The reactor pattern
Figura 4-7. El patrón del reactor

La implementación del patrón reactor utiliza un bucle de eventos(Figura 4-7). Se trata de un hilo que itera sobre el conjunto de canales, y cuando los datos están listos para ser consumidos, el bucle de eventos invoca secuencialmente al manejador de eventos asociado, de forma monohilo.

Cuando combinas la E/S no bloqueante y el patrón reactor, organizas tu código como un conjunto de manejadores de eventos. Ese enfoque funciona de maravilla con el código reactivo, ya que expone la noción de eventos, la esencia de lo Reactivo.

El patrón del reactor tiene dos variantes:

  • El patrón multirreactor utiliza múltiples bucles de eventos (generalmente uno o dos por núcleo de CPU), que aumentan la concurrencia de la aplicación. Las implementaciones del patrón multirreactor, como Eclipse Vert.x, llaman a los manejadores de eventos de forma monohilo para evitar problemas de bloqueo o de visibilidad del estado.

  • El patrón proactor puede considerarse una versión asíncrona del patrón reactor. Los manejadores de eventos de larga duración invocan una continuación cuando finalizan. Estos mecanismos permiten mezclar E/S no bloqueantes y bloqueantes(Figura 4-8).

the proactor pattern
Figura 4-8. El patrón proactor

Puedes integrar manejadores de eventos no bloqueantes, así como bloqueantes, descargando su ejecución a hilos separados cuando sea inevitable. Cuando finaliza su ejecución, el patrón proactor invoca la continuación. Como verás en el Capítulo 6, éste es el patrón utilizado por Quarkus.

Anatomía de las aplicaciones reactivas

En los últimos años, han surgido muchos marcos de trabajo que ofrecen soporte para aplicaciones reactivas. Su objetivo es simplificar la implementación de aplicaciones reactivas. Lo consiguen proporcionando primitivas de alto nivel y API para gestionar eventos y E/S abstracta no bloqueante.

De hecho, y puede que ya lo hayas reconocido, utilizar E/S no bloqueante no es tan sencillo. Combinarlo con un patrón reactor (o una variante) puede ser enrevesado. Afortunadamente, junto a frameworks, bibliotecas y conjuntos de herramientas están haciendo el trabajo pesado.Netty es un framework de aplicaciones de red asíncrono basado en eventos que aprovecha la E/S no bloqueante para crear aplicaciones altamente concurrentes. Es la biblioteca más utilizada para manejar E/S no bloqueante en el mundo Java. Pero Netty puede ser un reto.El Ejemplo 4-5 implementa el servidor TCP eco utilizando Netty.

Ejemplo 4-5. Un servidor de eco utilizando Netty(chapter-4/non-blocking-io/src/main/java/org/acme/netty/NettyEchoServer.java)
public static void main(String[] args) throws Exception {
    new NettyServer(9999).run();
}

private final int port;

public NettyServer(int port) {
    this.port = port;
}

public void run() throws Exception {
    // NioEventLoopGroup is a multithreaded event loop that handles I/O operation.
    // The first one, often called 'boss', accepts an incoming connection.
    // The second one, often called 'worker', handles the traffic of the accepted
    // connection once the boss accepts the connection and registers the
    // accepted connection to the worker.
    EventLoopGroup bossGroup = new NioEventLoopGroup();

    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        // ServerBootstrap is a helper class that sets up a server.
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                // the NioServerSocketChannel class is used to instantiate a
                // new Channel to accept incoming connections.
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    // This handler is called for each accepted channel and
                    // allows customizing the processing. In this case, we
                    // just append the echo handler.
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new EchoServerHandler());
                    }
                });

        // Bind and start to accept incoming connections.
        ChannelFuture f = b.bind(port).sync();

        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

private static class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // Write the received object, and flush
        ctx.writeAndFlush(msg);
    }
}

El conjunto de herramientas Vert.x, basado en Netty, proporciona funciones de nivel superior para crear aplicaciones reactivas, como clientes y servidores HTTP, clientes de mensajería, etc. Normalmente, el mismo servidor TCP eco que utiliza Vert.x tiene el aspecto del Ejemplo 4-6.

Ejemplo 4-6. Un servidor de eco utilizando Vert.x(chapter-4/non-blocking-io/src/main/java/org/acme/vertx/VertxEchoServer.java)
Vertx vertx = Vertx.vertx();
// Create a TCP server
vertx.createNetServer()
        // Invoke the given function for each connection
        .connectHandler(socket -> {
            // Just write the content back
            socket.handler(buffer -> socket.write(buffer));
        })
        .listen(9999);

La mayoría de los marcos Java que ofrecen capacidades Reactivas se basan en Netty o Vert.x. Como se muestra en la Figura 4-9, todos siguen el mismo tipo de esquema.

The common architecture of reactive frameworks
Figura 4-9. La arquitectura común de los marcos reactivos

En la parte inferior, tienes la E/S no bloqueante. Generalmente, los frameworks utilizan Netty o Vert.x. Esta capa gestiona las conexiones de los clientes, las peticiones salientes y la escritura de respuestas. En otras palabras, gestiona la parte de E/S. La mayoría de las veces, esta capa implementa el patrón reactor (o una variante), por lo que proporciona un modelo basado en bucles de eventos.

Luego, en la segunda capa, tienes el framework reactivo propiamente dicho. La función de esta capa es proporcionar API de alto nivel que sean fáciles de usar. Utilizas estas API para escribir el código de tu aplicación. En lugar de tener que manejar canales de E/S no bloqueantes, esta capa proporciona objetos de alto nivel como solicitudes HTTP, respuestas, mensajes Kafka, etc. ¡Mucho más fácil!

Por último, en la capa superior, tienes tu aplicación. Tu código no necesita tocar los conceptos de E/S no bloqueante, gracias al marco reactivo. Puede centrarse en los eventos entrantes y manejarlos. Tu código no es más que una colección de manejadores de eventos. Puede utilizar las funciones que proporciona el marco reactivo para interactuar con otros servicios o middleware.

Pero hay una trampa.El manejador de eventos de tu código se invoca utilizando el hilo del bucle de eventos (un hilo de E/S). Si tu código bloquea este hilo, no se podrán procesar otros eventos concurrentes. Sería un desastre en términos de capacidad de respuesta y concurrencia. La consecuencia de una arquitectura así es clara: tu código debe ser no bloqueante. Nunca debe bloquear los hilos de E/S, ya que son poco frecuentes y se utilizan para manejar múltiples peticiones concurrentes. Para conseguirlo, podrías descargar el procesamiento de algunos eventos a un hilo trabajador (utilizando el patrón proactor). Aunque puede descartar algunas de las ventajas de la E/S no bloqueante, a veces es la opción más racional(Figura 4-10). Sin embargo, no debemos abusar de ello, ya que descartaría las ventajas reactivas y haría que la aplicación fuera lenta. Los múltiples cambios de contexto necesarios para manejar un evento en un hilo trabajador penalizan el tiempo de respuesta.

Running some event handlers on worker threads
Figura 4-10. Ejecutando algunos manejadores de eventos en hilos trabajadores

Normalmente, nuestras aplicaciones del Capítulo 2 y del Capítulo 3 se basan en un mecanismo de este tipo.

Otra posibilidad es confiar sólo en código no bloqueante, apoyándose en las API asíncronas que proporciona el marco reactivo. Estas API serían no bloqueantes, y si la lógica de negocio implica E/S, utiliza E/S no bloqueante. Cada vez que un manejador de eventos ejecuta una operación asíncrona, se registra otro manejador (la continuación), y cuando llega el evento esperado, el bucle de eventos lo invoca. Así, el procesamiento se divide en manejadores más pequeños que se ejecutan asíncronamente. Ese modelo es el más eficiente y abarca por completo los conceptos que hay detrás de la Reactividad.

Resumen

Los sistemas reactivos tratan de construir mejores sistemas distribuidos. No pretenden ocultar la naturaleza de los sistemas distribuidos sino, al contrario, abrazarla.

En este capítulo has aprendido lo siguiente:

  • Los cuatro pilares de los sistemas reactivos (paso asíncrono de mensajes, elasticidad, resiliencia y capacidad de respuesta)

  • Cómo el paso asíncrono de mensajes permite elasticidad y resiliencia, y aumenta la autonomía de cada componente individual

  • El papel de los comandos y eventos en un sistema distribuido

  • Cómo la E/S no bloqueante mejora la utilización de recursos en aplicaciones reactivas

Pero este último punto tiene un inconveniente importante, ya que necesitamos escribir código no bloqueante ¡Qué casualidad! ¡El próximo capítulo trata precisamente de eso!

1 "No construyas un monolito distribuido", de Ben Christensen, es una interesante charla sobre los monolitos distribuidos y por qué debes evitarlos.

2 Este patrón se denomina Captura de Datos de Cambios. Frameworks como Debezium son un elemento clave de los sistemas reactivos cuando utilizan bases de datos, ya que los eventos se emiten sin ningún impacto en el código de la aplicación.

3 Nos referimos al Spring Framework tradicional. Spring reactivo se basa en la E/S no bloqueante.

4 "Measuring Context Switching and Memory Overheads for Linux Threads" de Eli Bendersky proporciona datos interesantes sobre el coste de los hilos en Linux.

Get Sistemas reactivos en Java now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.