Chapter 4. Design Principles of Reactive Systems

In Chapter 3, we looked at the challenges behind distributed systems. It’s now time to see what Reactive has to offer. Reactive can be seen as a set of principles for building distributed systems, a kind of checklist to verify that no major known concern was overlooked while architecting and building a system. These principles focus on the following:

Responsiveness

The ability to handle requests when facing failures or peaks of load

Efficiency

The ability to do more with fewer resources

In this chapter, we cover the principles promoted by reactive systems.

Reactive Systems 101

In 2013, a group of distributed systems experts gathered and wrote the first version of “The Reactive Manifesto.” They assembled in this whitepaper their experience building distributed systems and cloud applications. While in 2013 the cloud was not precisely what it is today, the dynamic creation of ephemeral resources was already a well-known mechanism.

“The Reactive Manifesto” defines reactive systems as distributed systems having four characteristics:

Responsive

Able to handle requests in a timely fashion

Resilient

Able to manage failures gracefully

Elastic

Able to scale up and down according to the load and resources

Message driven

Using asynchronous message-based communication among the components forming the system

These four characteristics are represented in Figure 4-1.

Reactive systems characteristics
Figure 4-1. Reactive systems characteristics

If you’re seeing this picture for the first time, you may be confused by all the arrows. It can look like a well-tailored marketing campaign. It’s not, and let’s explain why these pillars make a lot of sense when building cloud native and Kubernetes-native applications. Let’s start with the bottom of the figure.

Instead of trying to make distributed systems simpler than they are, reactive systems embrace their asynchronous nature. They use asynchronous message passing to establish the connective tissue among the components. Asynchronous message passing ensures loose coupling, isolation, and location transparency. In a reactive system, interactions rely on messages sent to abstract destinations. These messages carry everything—data as well as failures. Asynchronous message passing also improves resource utilization. Employing nonblocking communication (we cover that part later in this chapter) allows idle components to consume almost no CPU and memory. Asynchronous message passing enables elasticity and resilience, as depicted by the two bottom arrows in Figure 4-1.

Elasticity means that the system can adapt itself, or parts of itself, to handle the fluctuating load. By looking at the messages flowing among the components, a system can determine which parts reach their limits and create more instances or route the messages elsewhere. Cloud infrastructure enables creating these instances quickly at runtime. But elasticity is not only about scaling up; it’s also about scaling down. The system can decide to scale down underused parts to save resources. At runtime, the system adjusts itself, always meeting the current demand, avoiding bottlenecks, overflows, and overcommitted resources. As you can imagine, elasticity requires observability, replication, and routing features. Observability is covered in Chapter 13. In general, the last two are provided by the infrastructure such as Kubernetes or cloud providers.

Resilience means handling failure gracefully. As explained in Chapter 3, failures are inevitable in distributed systems. Instead of hiding them, reactive systems consider failures first-class citizens. The system should be able to handle them and react to them. Failures are contained within each component, isolating components from one another. This isolation ensures that parts of the system can fail and recover without jeopardizing the whole system. For instance, by replicating components (elasticity), the system can continue to handle incoming messages even if some elements are failing. The implementation of resilience is shared between the application (which needs to be aware of failures, contain them, and, if possible, handle them gracefully) and the infrastructure (which monitors the systems and restarts fallen components).

The last characteristic is the whole purpose of reactive systems: being responsive. Your system needs to stay responsive—to respond in a timely fashion—even under fluctuating load (elasticity) and when facing failure (resilience). Relying on message passing enables these characteristics and much more, such as flow control by monitoring the messages in the system and applying backpressure when necessary.

In a nutshell, reactive systems are exactly what we want to build: distributed systems able to handle the uncertainty, failures, and load efficiently. Their characteristics meet the requirement for cloud native and Kubernetes-native applications perfectly. But don’t be mistaken; building a reactive system is still making a distributed system. It’s challenging. However, by following these principles, the resulting system will be more responsive, more robust, and more efficient. The rest of this book details how we can easily implement such systems with Quarkus and messaging technologies.

Commands and Events

Now that we’ve covered many of the foundational principles, you might be confused. In Chapter 1, we said that being reactive is related to being event driven, but in the previous section, we explicitly mentioned asynchronous message passing. Does that mean the same thing? Not completely.

But first, we need to discuss the differences between commands and events. As complicated as a distributed system design can be, the concepts of commands and events are fundamental. Nearly all interactions between individual components involve one or the other.

Commands

Every system issues commands. Commands are actions that a user wishes to perform. Most HTTP-based APIs pass commands: the client asks for an action to happen. It’s important to understand that the action has not yet happened. It may happen in the future, or not; it may complete successfully or fail. In general, commands are sent to a specific recipient, and a result is sent back to the client.

Take the simple HTTP application we used in Chapter 3. You emitted a simple HTTP request. As we’ve said, that was a command. The application receives that command, handles it, and produces a result.

Events

Events are actions that have successfully completed. An event represents a fact, something that happened: a keystroke, a failure, an order, anything important to the organization or system at hand. An event can be the result of work done by a command.

Let’s go back to the preceding HTTP request example. Once the response has been written, it becomes an event. We have seen an HTTP request and its response. That event can be written in a log or broadcast to interested parties so they can be aware of what happened.

Events are immutable. You cannot delete an event. Admittedly, you can’t change the past. If you want to refute a previously sent fact, you need to fire another event invalidating the fact. The carried facts are made irrelevant only by another fact establishing the current knowledge.

Messages

But, how to publish these events? There are many ways. These days, solutions like Apache Kafka or Apache ActiveMQ (we cover both in Chapter 11) are popular. They act as brokers between the producers and consumers. Essentially, our events are written into topics or queues. To write these events, the application sends a message to the broker, targeting a specific destination (the queue or the topic).

A message is a self-contained data structure describing the event and any relevant details about the event, such as who emitted it, at what time it was emitted, and potentially its unique ID. It’s generally better to keep the event itself business-centric and use additional metadata for the technical aspects.

On the other side, to consume events, you subscribe to the queue or topic containing the events you are interested in and receive the messages. You unwrap the event and can also get the associated metadata (for example, when the event happened, where it happened, and so forth). The processing of an event can lead to the publication of other events (again, packaged in messages and sent to a known destination) or to the execution of commands.

Brokers and messages can also convey commands. In this case, the message contains the description of the action to execute, and another message (potentially multiple messages) would carry the outcome if needed.

Commands Versus Events: An Example

Let’s take a look at an example to highlight the differences between commands and events. Imagine an ecommerce shop, like the one depicted in Figure 4-2. The user picks a set of products and finalizes the order (process to payment, get the delivery date, etc.).

Simplified architecture of an ecommerce shop
Figure 4-2. Simplified architecture of an ecommerce shop

The user sends a command (using an HTTP request, for example) to the shop service with the items the user wishes to receive. In a traditional application, once ShopService receives the command, it would call OrderService and invoke an order method with the username, the list of items (basket), and so on. Calling the order method is a command. That makes ShopService dependent on OrderService and reduces the component autonomy: ShopService cannot operate without OrderService. We are creating a distributed monolith, a distributed application that would collapse as soon as one of its parts fails.1

Let’s see the difference if, instead of using a command between ShopService and OrderService, we publish an event. Once the user finalizes the order, the application still sends a command to ShopService. However, this time, ShopService transforms that command into an event: a new order has been placed. The event contains the user, the basket, and so on. The event is a fact written in a log, or wrapped into a message and sent to a broker.

On the other side, OrderService observes the a new order has been placed event, by reading where these events are stored. When ShopService emits the event, it receives it and can process it.

With this architecture, ShopService does not depend on OrderService. In addition, OrderService does not depend on ShopService, and it would process any observed event, regardless of the emitter. For example, a mobile application can emit the same event when the user validates an order from a mobile phone.

Multiple components can consume events (Figure 4-3). For example, in addition to OrderService, StatisticsService keeps track of the most ordered items. It consumes the same event, without having to modify ShopService to receive them.

A component observing events can derive new ones from them. For instance, StatisticsService could analyze the order and compute recommendations. These recommendations could be seen as another fact, and so communicate as an event. ShopService could observe these events and process them to influence item selection. However, StatisticsService and ShopService are independent of each other. The knowledge is cumulative and occurs by receiving new events and deriving, as done by StatisticsService, new facts from the received events.

As depicted in Figure 4-3, we can use message queues to transport our events. These events are wrapped into messages, sent to known destinations (orders and recommendations). OrderService and StatisticsService consume and process the messages independently.

Architecture of the ecommerce shop using events and message brokers
Figure 4-3. Architecture of the ecommerce shop with events and message queues

It’s important for these destinations to persist the events as an ordered sequence. By keeping that sequence, the system can go back in time and reprocess the events. Such a replay mechanism, popular in the Kafka world, has multiple benefits. You can restart with a clean state after a disaster by reprocessing all the stored events. Then, if we change the recommendation algorithm from the statistic services, for example, it would be able to re-accumulate all the knowledge and derive new recommendations.

While the event emission sounds explicit in this example, that’s not always the case. For instance, events can be created from database writes.2

Commands and events are the basis of most of the interactions. While we use mostly commands, events come with significant benefits. Events are facts. Events tell a story, the story of your system, a narrative that describes your system’s evolution. In reactive systems, events are wrapped into messages, and these messages are sent to destinations, transported by message brokers such as AMQP or Kafka (Figure 4-4). Such an approach solves two important architectural issues arising from the distributed systems. First, it naturally handles real-world asynchronicity. Second, it binds together services without relying on strong coupling. At the edge of the system, this approach uses commands most of the time, often relying on HTTP.

Overview of a reactive system
Figure 4-4. Overview of a reactive system

This asynchronous message-passing aspect of reactive systems forms the connective tissue. It not only grants the applications forming the system more autonomy and independence, but also enables resilience and elasticity. You may wonder how, and you will get the beginning of our response in the next section.

Destinations and Space Decoupling

The reactive applications, forming a reactive system, communicate using messages. They subscribe to destinations and receive the messages sent by other components to these destinations. These messages can carry commands or events, though as described in the previous section, events provide interesting benefits. These destinations are not bound to specific components or instances. They are virtual. Components must know only the name (generally business related, such as orders) of the destination, not who’s producing or consuming. It enables location transparency.

If you are using Kubernetes, you may consider location transparency as already managed for you. Indeed, you can use Kubernetes services to implement location transparency. You have a single endpoint delegating to a group of selected pods. But this location transparency is somewhat limited and often tied to HTTP or request/reply protocols. Other environments can use service discovery infrastructure such as HashiCorp Consul or Netflix Eureka.

Using messages sent to a destination allows you, as the sender, to ignore who precisely is going to receive the message. You don’t know if someone is currently available or if multiple components or instances are waiting for your message. This number of consumers can evolve at runtime; more instances can be created, moved, or destroyed, and new components deployed. But for you, as a sender, you don’t need to know. You just use a specified destination. Let’s illustrate the advantages of this addressability by using the example from the previous section. ShopService emits order placed events carried inside messages sent to the orders destination (Figure 4-3). It is likely possible that during a quiet period, only a single instance of OrderService runs. If there are not many orders, why bother having more? We could even imagine having no instance, and instantiating one when we receive an order. Serverless platforms are offering this scale-from-zero ability. However, over time, your shop gets more customers, and a single instance may not be enough. Thanks to location transparency, we can start other instances of OrderService to share the load (Figure 4-5). ShopService is not modified and ignores this new topology.

Elasticity provided by the use of message passing
Figure 4-5. Elasticity provided by the use of message passing

The way the load is shared among the consumers is also irrelevant for the sender. It can be a round-robin, a load-based selection, or something more clever. When the load returns to normal, the system can reduce the number of instances and save resources. Note that this kind of elasticity works perfectly for stateless services. For stateful services, it may be harder, as the instances may have to share the state. However, solutions exist (though not without caveats), like the Kubernetes StatefulSet or an in-memory data grid, to coordinate state among instances of the same service. Message passing also enables replication. Following the same principle, we can shadow the active OrderService instance and take over if the primary instance fails (Figure 4-6). This approach avoids service disruption. That kind of failover may also require state sharing.

Resilience provided by the use of message passing
Figure 4-6. Resilience provided by the use of message passing

By using message passing, our system becomes not only asynchronous, but also elastic and resilient. When you architect your system, you list the destinations that implement the communication pattern you want. In general, you would use one destination per type of event, but that’s not necessarily the case. However, avoid at all costs having a destination per component instance. It introduces coupling between the sender and the receiver, discarding the benefits. It also reduces the extensibility. Finally, it’s important to keep the set of destinations stable. Changing a destination would break the components using it or would force you to handle redirections.

Time Decoupling

Location transparency is not the only benefit. Asynchronous message passing also enables time decoupling.

Modern message backbones, such as AMQP 1.0, Apache Kafka, and even Java Message Service (JMS), enable time decoupling. With these event brokers, events are not lost if there are no consumers. The events are stored and delivered later. Each broker has its own way. For instance, AMQP 1.0 uses persistent messages and durable subscribers to ensure message delivery. Kafka stores records in a durable, fault-tolerant, ordered log. The records can be retrieved so long as they remain stored within the topic.

If our ShopService emits the finalized orders as events, it does not need to know whether OrderService is available. It knows that it’s going to be processed eventually. If, for example, no instances of OrderService are available when ShopService emits the event, it’s not lost. When an instance gets ready, it receives the pending orders and processes them. The user is then notified asynchronously with an email.

Of course, the message broker must be available and reachable. Most message brokers have replication abilities preventing unavailability issues and message loss.

Note

It is becoming common to store events in an event log. Such ordered and append-only structure represents the full history of your system. Every time the state changes, the system appends the new state to the log.

Time decoupling increases the independence of our components. Time decoupling, combined with other features enabled by asynchronous message passing, achieves a high level of independence among our components and keeps coupling to a minimum.

The Role of Nonblocking Input/Output

At this point, you may wonder what the difference is between an application using Kafka or AMQP and a reactive system. Message passing is the essence of reactive systems, and most of them rely on some sort of message broker. Message passing enables resilience and elasticity, which lead to responsiveness. It promotes space and time decoupling, making our system much more robust.

But reactive systems are not only exchanging messages. Sending and receiving messages must be done efficiently. To achieve this, Reactive promotes the use of nonblocking I/Os.

Blocking Network I/O, Threads, and Concurrency

To understand the benefits of nonblocking I/O, we need to know how blocking I/Os work. Let’s use a client/server interaction to illustrate. When a client sends a request to a server, the server processes it and sends back a response. HTTP, for instance, follows this principle. For this to happen, both the client and the server need to establish a connection before the interaction starts. We will not go into the depths of the seven-layer model and the protocol stack involved in this interaction; you can find plenty of articles online about that topic.

Note

Examples from this section can be run directly from your IDE. Use chapter-4/non-blocking-io/src/main/java/org/acme/client/EchoClient.java to invoke the started server. Be sure to avoid running multiple servers concurrently as they all use the same port (9999).

To establish that connection between the client and the server, we use sockets, as shown in Example 4-1.

Example 4-1. A single-threaded echo server using blocking I/O (chapter-4/non-blocking-io/src/main/java/org/acme/blocking/BlockingEchoServer.java)
int port = 9999;

// Create a server socket
try (ServerSocket server = new ServerSocket(port)) {
    while (true) {

        // Wait for the next connection from a client
        Socket client = server.accept();

        PrintWriter response = new PrintWriter(client.getOutputStream(), true);
        BufferedReader request = new BufferedReader(
                new InputStreamReader(client.getInputStream()));

        String line;
        while ((line = request.readLine()) != null) {
            System.out.println("Server received message from client: " + line);
            // Echo the request
            response.println(line);

            // Add a way to stop the application.
            if ("done".equalsIgnoreCase(line)) {
                break;
            }
        }
        client.close();
    }
}

The client and server have to bind themselves to a socket forming the connection. The server listens to its socket for the client to connect. Once established, the client and server can both write and read data from the socket bound to that connection.

Traditionally, because it’s simpler, applications are developed using a synchronous development model. Such a development model executes instructions sequentially, one after the other. So when such applications interact across the network, they expect to continue using a synchronous development model even for I/O. This model uses synchronous communication and blocks the execution until the operation completes. In Example 4-1, we wait for a connection and handle it synchronously. We read and write using synchronous APIs. It’s simpler, but it leads to the use of blocking I/O.

With blocking I/O, when the client sends a request to the server, the socket processing that connection and the corresponding thread that reads from it is blocked until some read data appears. The bytes are accumulated in the network buffer until everything is read and ready for processing. Until the operation is complete, the server can do nothing more but wait.

The consequence of this model is that we cannot serve more than one connection within a single thread. When the server receives a connection, it uses that thread to read the request, process it, and write the response. That thread is blocked until the last byte of the response is written on the wire. A single client connection blocks the server! Not very efficient, right?

To execute concurrent requests with this approach, the only way is to have multiple threads. We need to allocate a new thread for each client connection. To handle more clients, you need to use more threads and process each request on a different worker thread; see Example 4-2.

Example 4-2. Principles behind multithreaded server using blocking I/O
while (listening) {
    accept a connection;
    create a worker thread to process the client request;
}

To implement this principle, we need a thread pool (worker pool). When the client connects, we accept the connection and offload the processing to a separate thread. Thus, the server thread can still accept other connections, as shown in Example 4-3.

Example 4-3. A multithreaded echo server using blocking I/O (chapter-4/non-blocking-io/src/main/java/org/acme/blocking/BlockingWithWorkerEchoServer.java)
int port = 9999;
ExecutorService executors = Executors.newFixedThreadPool(10); 1

// Create a server socket
try (ServerSocket server = new ServerSocket(port)) {
    while (true) {

        // Wait for the next connection from a client
        Socket client = server.accept();

        executors.submit(() -> {                                    2
            try {
                PrintWriter response =
                new PrintWriter(client.getOutputStream(), true);
                BufferedReader request = new BufferedReader(
                        new InputStreamReader(client.getInputStream()));

                String line;
                while ((line = request.readLine()) != null) {
                    System.out.println(Thread.currentThread().getName() +
                            " - Server received message from client: " + line);
                    // Echo the request
                    response.println(line);

                    // Add a way to stop the application.
                    if ("done".equalsIgnoreCase(line)) {
                        break;
                    }
                }
                client.close();
            } catch (Exception e) {
                System.err.println("Couldn't serve I/O: " + e.toString());

            }
        });
    }
}
1

Create a worker thread pool to handle the request.

2

Offload the processing of the request to a thread from the thread pool. The rest of the code is unchanged.

That’s the model used, by default, in traditional Java frameworks such as Jakarta EE or Spring.3 Even if these frameworks may use nonblocking I/O under the hood, they use worker threads to handle the requests. But this approach has many drawbacks, including:

  • Each thread requires a stack of memory allocated to it. With the increasing number of connections, spawning multiple threads and switching between them will consume not only memory but also CPU cycles.

  • At any given point in time, multiple threads could be waiting for the client requests. That’s a massive waste of resources.

  • Your concurrency (the number of requests you can handle at a given time—10 in the previous example) is limited by the number of threads you can create.

On public clouds, the blocking I/O approach inflates your monthly bill; on private clouds, it reduces the deployment density. Therefore, this approach is not ideal if you have to handle many connections or implement applications dealing with a lot of I/O. In the realm of distributed systems, that’s often the case. Luckily, there’s an alternative.

How Does Nonblocking I/O Work?

The alternative is nonblocking I/O. The difference is evident from its name. Instead of waiting for the completion of the transmission, the caller is not blocked and can continue its processing. The magic happens in the operating system. With nonblocking I/O, the operating system queues the requests. The system processes the actual I/O in the future. When the I/O completes, and the response is ready, a continuation, often implemented as a callback, happens and the caller receives the result.

To better understand the benefits and see how these continuations work, we need to look under the hood: how is nonblocking I/O implemented? We already mentioned a queue. The system enqueues I/O operations and returns immediately, so the caller is not blocked while waiting for the I/O operations to complete. When a response comes back, the system stores the result in a structure. When the caller needs the result, it interrogates the system to see whether the operation completed (Example 4-4).

Example 4-4. An echo server using nonblocking I/O (chapter-4/non-blocking-io/src/main/java/org/acme/nio/NonBlockingServer.java)
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
Selector selector = Selector.open();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);

channel.socket().bind(address);
// Server socket supports only ACCEPT
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
    int available = selector.select(); // wait for events
    if (available == 0) {
        continue;  // Nothing ready yet.
    }

    // We have the request ready to be processed.
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = keys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        if (key.isAcceptable()) {
            // --  New connection --
            SocketChannel client = channel.accept();
            client.configureBlocking(false);
            client.register(selector, SelectionKey.OP_READ);
            System.out.println("Client connection accepted: "
                + client.getLocalAddress());
        } else if (key.isReadable()) {
            // --  A client sent data ready to be read and we can write --
            SocketChannel client = (SocketChannel) key.channel();
            // Read the data assuming the size is sufficient for reading.
            ByteBuffer payload = ByteBuffer.allocate(256);
            int size = client.read(payload);
            if (size == -1 ) { // Handle disconnection
                System.out.println("Disconnection from "
                    + client.getRemoteAddress());
                channel.close();
                key.cancel();
            } else {
                String result = new String(payload.array(),
                    StandardCharsets.UTF_8).trim();
                System.out.println("Received message: " + result);
                if (result.equals("done")) {
                    client.close();
                }
                payload.rewind(); // Echo
                client.write(payload);
            }
        }
        // Be sure not to handle it twice.
        iterator.remove();
    }
}

Nonblocking I/O introduces a few new concepts:

  • We don’t use InputStream or OutputStream (which are blocking by nature), but Buffer, which is a temporary storage.

  • Channel can be viewed as an endpoint for an open connection.

  • Selector is the cornerstone of nonblocking I/O in Java.

Selector manages multiple channels, either server or client channels. When you use nonblocking I/O, you create Selector. Each time you deal with a new channel, you register this channel on the selector with the events you are interested in (accept, ready to read, ready to write).

Then your code polls Selector with only one thread to see if the channel is ready. When the channel is ready to read or write, you can start to read and write. We don’t need to have a thread for every channel at all, and a single thread can handle many channels.

The selector is an abstraction of the nonblocking I/O implementation provided by the underlying operating system. Various approaches, depending on the operating systems, are available.

First, select was implemented in the 1980s. It supports the registration of 1,024 sockets. That was certainly enough in the ’80s, but not anymore.

poll is a replacement for select introduced in 1997. The most significant difference is that poll no longer limits the number of sockets. However, as with select, the system tells you only how many channels are ready, not which ones. You need to iterate over the set of channels to check which ones are ready. When there are few channels, it is not a big problem. Once the number of channels is more than hundreds of thousands, the iteration time is considerable.

Then, epoll appeared in 2002 in the Linux Kernel 2.5.44. Kqueue appeared in FreeBSD in 2000 and /dev/poll in Solaris around the same time. These mechanisms return the set of channels that are ready to be processed—no more iteration over every channel! Finally, Windows systems provide IOCP, an optimized implementation of select.

What’s important to remember is that regardless of how the operating systems implement it, with nonblocking I/O, you need only a single thread to handle multiple requests. This model is much more efficient than blocking I/O, as you don’t need to create threads to handle concurrent requests. Eliminating these extra threads makes your application much more efficient in terms of memory consumption (about 1 MB per thread) and avoids wasting CPU cycles because of context switches (1–2 microseconds per switch).4

Reactive systems recommend the use of nonblocking I/O to receive and send messages. Thus, your application can handle more messages with fewer resources. Another advantage is that an idle application would consume almost no memory or CPUs. You don’t have to reserve resources up front.

Reactor Pattern and Event Loop

Nonblocking I/O gives us the possibility to handle multiple concurrent requests or messages with a single thread. How could we handle these concurrent requests? How do we structure our code when using nonblocking I/O? The examples given in the previous section are not scaling well; we can quickly see that implementing a REST API with such a model will be a nightmare. Besides, we would like to avoid using worker threads, as it would discard the advantages of nonblocking I/O. We need something different: the reactor pattern.

The reactor pattern, illustrated in Figure 4-7, allows associating I/O events with event handlers. The reactor, the cornerstone of this mechanism, invokes the event handlers when the expected event is received.

The purpose of the reactor pattern is to avoid creating a thread for each message, request, and connection. This pattern receives events from multiple channels and sequentially distributes them to the corresponding event handlers.

The reactor pattern
Figure 4-7. The reactor pattern

Implementation of the reactor pattern uses an event loop (Figure 4-7). It’s a thread iterating over the set of channels, and when data is ready to be consumed, the event loop invokes the associated event handler sequentially, in a single-threaded manner.

When you combine nonblocking I/O and the reactor pattern, you organize your code as a set of event handlers. That approach works wonderfully with reactive code as it exposes the notion of events, the essence of Reactive.

The reactor pattern has two variants:

  • The multireactor pattern uses multiple event loops (generally one or two per CPU core), which increase the concurrency of the application. Multireactor pattern implementations, such as Eclipse Vert.x, call the event handlers in a single-threaded manner to avoid deadlock or state visibility issues.

  • The proactor pattern can be seen as an asynchronous version of the reactor pattern. Long-running event handlers invoke a continuation when they complete. Such mechanisms allow mixing nonblocking and blocking I/O (Figure 4-8).

the proactor pattern
Figure 4-8. The proactor pattern

You can integrate nonblocking event handlers, as well as blocking ones, by offloading their execution to separate threads when it’s inevitable. When their execution completes, the proactor pattern invokes the continuation. As you will see in Chapter 6, this is the pattern used by Quarkus.

Anatomy of Reactive Applications

In the last few years, many frameworks have popped up, offering reactive application support. Their goal is to simplify the implementation of reactive applications. They achieve this by providing higher-level primitives and APIs to handle events and abstract nonblocking I/O.

Indeed, and you may have recognized this already, using nonblocking I/O is not that simple. Combining this with a reactor pattern (or a variant) can be convoluted. Fortunately, alongside frameworks, libraries and toolkits are doing the heavy lifting. Netty is an asynchronous event-driven network application framework leveraging nonblocking I/O to build highly concurrent applications. It’s the most used library to handle nonblocking I/O in the Java world. But Netty can be challenging. Example 4-5 implements the echo TCP server using Netty.

Example 4-5. An echo server using Netty (chapter-4/non-blocking-io/src/main/java/org/acme/netty/NettyEchoServer.java)
public static void main(String[] args) throws Exception {
    new NettyServer(9999).run();
}

private final int port;

public NettyServer(int port) {
    this.port = port;
}

public void run() throws Exception {
    // NioEventLoopGroup is a multithreaded event loop that handles I/O operation.
    // The first one, often called 'boss', accepts an incoming connection.
    // The second one, often called 'worker', handles the traffic of the accepted
    // connection once the boss accepts the connection and registers the
    // accepted connection to the worker.
    EventLoopGroup bossGroup = new NioEventLoopGroup();

    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        // ServerBootstrap is a helper class that sets up a server.
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                // the NioServerSocketChannel class is used to instantiate a
                // new Channel to accept incoming connections.
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    // This handler is called for each accepted channel and
                    // allows customizing the processing. In this case, we
                    // just append the echo handler.
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new EchoServerHandler());
                    }
                });

        // Bind and start to accept incoming connections.
        ChannelFuture f = b.bind(port).sync();

        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

private static class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // Write the received object, and flush
        ctx.writeAndFlush(msg);
    }
}

The Vert.x toolkit, based on top of Netty, provides higher-level features to build reactive applications such as HTTP clients and servers, messaging clients, etc. Typically, the same echo TCP server using Vert.x looks like Example 4-6.

Example 4-6. An echo server using Vert.x (chapter-4/non-blocking-io/src/main/java/org/acme/vertx/VertxEchoServer.java)
Vertx vertx = Vertx.vertx();
// Create a TCP server
vertx.createNetServer()
        // Invoke the given function for each connection
        .connectHandler(socket -> {
            // Just write the content back
            socket.handler(buffer -> socket.write(buffer));
        })
        .listen(9999);

Most Java frameworks offering Reactive capabilities are based on Netty or Vert.x. As shown in Figure 4-9, they all follow the same type of blueprint.

The common architecture of reactive frameworks
Figure 4-9. The common architecture of reactive frameworks

At the bottom, you have the nonblocking I/O. Generally, frameworks use Netty or Vert.x. This layer handles client connections, outbound requests, and response writing. In other words, it manages the I/O part. Most of the time, this layer implements the reactor pattern (or a variant), and so provides an event-loop-based model.

Then, in the second layer, you have the reactive framework per se. The role of this layer is to provide high-level APIs that are easy to use. You use these APIs to write your application code. Instead of having to handle nonblocking I/O channels, this layer provides high-level objects such as HTTP requests, responses, Kafka messages, and so on. Much easier!

Finally, in the top layer, you have your application. Your code does not need to touch nonblocking I/O concepts, thanks to the reactive framework. It can focus on incoming events and handle them. Your code is just a collection of event handlers. It can use the features provided by the reactive framework to interact with other services or middleware.

But there is a catch. The event handler from your code is invoked using the event loop thread (an I/O thread). If your code blocks this thread, no other concurrent events can be processed. It would be a disaster in terms of responsiveness and concurrency. The consequence of such an architecture is clear: your code must be nonblocking. It must never block the I/O threads, as they are rare and are used to handle multiple concurrent requests. To achieve this, you could offload the processing of some events to a worker thread (using the proactor pattern). While it can discard some of the benefits of nonblocking I/O, it is sometimes the most rational choice (Figure 4-10). Nevertheless, we should not abuse this as it would discard the reactive benefits and make the application slow. The multiple context switches required to handle an event on a worker thread penalizes the response time.

Running some event handlers on worker threads
Figure 4-10. Running some event handlers on worker threads

Typically, our applications from Chapter 2 and Chapter 3 rely on such a mechanism.

Another possibility is to rely only on nonblocking code, relying on asynchronous APIs provided by the reactive framework. These APIs would be nonblocking, and if the business logic involved I/O, it uses nonblocking I/O. Every time an event handler executes an asynchronous operation, another handler (the continuation) is registered, and when the expected event arrives, the event loop invokes it. Thus, the processing is divided into smaller handlers running asynchronously. That model is the most efficient and embraces the concepts entirely behind Reactive.

Summary

Reactive systems are about building better distributed systems. They don’t aim to hide the nature of distributed systems but, on the contrary, embrace it.

In this chapter, you learned the following:

  • The four pillars of reactive systems (asynchronous message passing, elasticity, resilience, and responsiveness)

  • How asynchronous message passing enables elasticity and resilience, and increases the autonomy of each individual component

  • The role of commands and events in a distributed system

  • How nonblocking I/O improves resource utilization in reactive applications

But this last point has a significant drawback, as we need to write nonblocking code. What a coincidence! The next chapter is precisely about that!

1 “Don’t Build a Distributed Monolith” by Ben Christensen is an interesting talk about distributed monoliths and why you should avoid them.

2 This pattern is called Change Data Capture. Frameworks such as Debezium are a key element of reactive systems when using databases, as the events are emitted without any impact on the application code.

3 We are referring to the traditional Spring Framework. Reactive Spring is based on nonblocking I/O.

4 “Measuring Context Switching and Memory Overheads for Linux Threads” by Eli Bendersky provides interesting data about the cost of threads on Linux.

Get Reactive Systems in Java now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.