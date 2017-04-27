The C10k problem was an area of research and optimization that tried to achieve 10,000 concurrent connections on a single commodity server. Even these days, solving this engineering task with the traditional Java toolkit is a challenge. There are many reactive approaches that easily achieve C10k, and RxJava makes them very approachable. In this chapter, we explore several implementation techniques that will improve scalability by several orders of magnitude. All of them will circle around the concept of reactive programming. If you are lucky enough to work on a greenfield project, you might consider implementing your application in a reactive manner top to bottom. Such an application should never synchronously wait for any computation or action. The architecture must be entirely event-driven and asynchronous in order to avoid blocking. We will go through several examples of a simple HTTP server and observe how it behaves with respect to design choices we made. Admittedly, performance and scalability does have a complexity price tag. But with RxJava the additional complexity will be reduced significantly.

The classic thread per connection model struggles to solve the C10k problem. With 10,000 threads we do the following:

The classic thread-per- Socket model served us really well, and as a matter of fact it works quite good in many applications to this day. However, after you reach certain level of concurrency, the number of threads becomes dangerous. A thousand concurrent connections handled by a single commodity server is not something unusual, especially with long-living TCP/IP connections like HTTP with a Keep-Alive header, server-sent events, or WebSockets. However, each thread occupies a little bit of memory (stack space), regardless of whether it is computing something or just waiting idle for data.

There are two independent approaches to scalability: horizontal and vertical. To handle more concurrent connections we can simply spin up more servers, each managing a subset of the load. This requires a frontend load-balancer and does not solve the original C10k problem that expects just one server. On the other hand, vertical scalability means purchasing bigger and more capable servers. However, with blocking I/O we need a disproportional amount of memory compared to heavily underutilized CPU. Even if a big enterprise server can handle hundreds of thousands of concurrent connections (at very high price), it is far from solving C10M problem—ten million concurrent connections. This number is not a coincidence; a couple of years ago, a properly designed Java application reached that enormous level on a typical server.

This chapter takes you on a journey through different ways of implementing an HTTP server. From single-threaded servers, through thread pools, to entirely event-driven architectures. The idea behind this exercise is to compare the implementation complexity versus performance and throughput. In the end, you will notice that the version using RxJava combines both relative simplicity and outstanding performance.

ThreadPool (see “Thread Pool of Connections” ) also uses a thread per connection, but threads are recycled when a client disconnects so that we do not pay the price of thread warm up for every client. This is pretty much how all popular servlet containers like Tomcat and Jetty work, managing 100 to 200 threads in a pool by default. Tomcat has the so-called NIO connector that handles some of the operations on sockets asynchronously, but the real work in servlets and frameworks built on top of them is still blocking. This means that traditional applications are inherently limited to a couple thousand connections, even built on top of modern servlet containers.

ThreadPerConnection (see “Thread per Connection” ) shows how to implement a blocking server that creates a new thread per each client connection. This presumably scales quite well, but such implementation suffers the same problems as fork() in C: starting a new thread takes some time and resources, which is especially wasteful for short-lived connections. Moreover, there is no limit to the maximum number of client threads running at the same time. And when you do not put a limit on something in the computer system, this limit will be applied for you in the worst and least expected place. For example, our program will become unstable and eventually crash with OutOfMemoryError in case of thousands of concurrent connections.

In “fork() Procedure in C Language,” you will find the source code of a simple server written in C language using fork() . Despite superficial simplicity, forking a new process per each client connection, especially for short-living ones, puts significant load on the operating system. Each process needs quite a bit of memory and initial startup takes some time. Also thousands of processes starting and stopping all the time unnecessarily occupy system resources.

Appendix A contains the source code and a discussion of other blocking servers. Rather than spending more time analyzing nonscalable blocking architectures, we will briefly summarize them so that we can proceed to benchmarks and side-by-side comparisons quicker:

Now, coming back to our client connection, we first must read the entire request and then write the response. Both of these operations are potentially blocking and subject to network slowness and congestion. If one client establishes a connection but then waits a few seconds before sending a request, all other clients must wait. Having just a single thread for handling all incoming connections is clearly not very scalable, we barely solved the C1 (one concurrent connection) problem.

You will not see similar low-level implementations outside of the university, but it works. For each request we ignore whatever was sent to us and return 200 OK responses. Opening localhost:8080 in the browser succeeds with an OK text reply. The class is named SingleThread for a reason. ServerSocket.accept() blocks until any client establishes a connection with us. Then, it returns a client Socket . While we interact with that Socket (read and write to it), we still listen for incoming connections but no one picks them up because our thread is busy handling first client. It is like at the doctor’s office: one patient goes in and everyone else must wait in a queue. Did you notice the extra 100 parameter after 8080 (listening port)? This value (the default is 50 ) caps the maximum number of pending connections that can wait in a queue. Above that number, they are rejected. To make matters worse, we pretend to implement HTTP/1.1 which uses persistent connections by default. Until the client disconnects we keep the TCP/IP connection open just in case, blocking new clients.

The simplest implementation just opens a ServerSocket and handles client connections as they come. When a single client is served, all other requests are queued up. The following code snippet is actually very simple:

The purpose of this section is to compare how blocking servers, even when written properly, behave under high load. This is the exercise that we probably all went through during our education: writing a server on top of raw sockets. We will be implementing an extremely simple HTTP server that responds with 200 OKs for every request. As a matter of fact, for the sake of simplicity we will ignore the request altogether.

Nonblocking HTTP Server with Netty and RxNetty

We will now focus on event-driven approaches to writing an HTTP server, which are far more promising in terms of scalability. A blocking processing model involving thread-per-request clearly does not scale. We need a way of managing several client connections with just a handful of threads. This has a lot of benefits:

Reduced memory consumption

Better CPU and CPU cache utilization

Greatly improved scalability on a single node

One caveat is the lost simplicity and clarity. Threads are not allowed to block on any operation, we can no longer pretend that receiving or sending data over the wire is the same as a local method invocation. The latency is unpredictable and response times higher by orders of magnitude. By the time you read this, there will probably still be quite a few spinning hard drives out there, which are even slower than a local area networks. In this section, we will develop a tiny event-driven application with the Netty framework and later refactor it to use RxNetty. Finally, we conclude with a benchmark of all solutions.

Netty is entirely event-driven; we never block waiting for data to be sent or received. Instead, raw bytes in the form of ByteBuf instances are pushed to our processing pipeline. TCP/IP gives us an impression of connection and data flowing byte after byte between two computers. But in reality TCP/IP is built on top of IP, which can barely transfer chunks of data known as packets. It is the operating system’s role to assemble them in the correct order and give the illusion of a stream. Netty drops this abstraction and works at a byte-sequence layer, not a stream. Whenever a few bytes arrive to our application, Netty will notify our handler. Whenever we send few bytes, we get a ChannelFuture without blocking (more on futures in a second).

Our example of non-blocking HTTP server has three components. The first simply starts the server and sets up the environment:

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; class HttpTcpNettyServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { new ServerBootstrap() .option(ChannelOption.SO_BACKLOG, 50_000) .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new HttpInitializer()) .bind(8080) .sync() .channel() .closeFuture() .sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

This is the most basic HTTP server in Netty. The crucial part is bossGroup pool responsible for accepting incoming connections and workerGroup that processes events. These pools are not very big: one for bossGroup and close to the number of CPU cores for workerGroup but this is more than enough for a well-written Netty server. We did not specify yet what the server should do, apart from listening on port 8080. This is configurable via ChannelInitializer :

import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; class HttpInitializer extends ChannelInitializer<SocketChannel> { private final HttpHandler httpHandler = new HttpHandler(); @Override public void initChannel(SocketChannel ch) { ch .pipeline() .addLast(new HttpServerCodec()) .addLast(httpHandler); } }

Rather than providing a single function that handles the connection, we build a pipeline that processes incoming ByteBuf instances as they arrive. The first step of the pipeline decodes raw incoming bytes into higher-level HTTP request objects. This handler is built-in. It is also used for encoding the HTTP response back to raw bytes. In more robust applications you will often see more handlers focused on smaller functionality; for example, frame decoding, protocol decoding, security, and so on. Every piece of data and notification flows through this pipeline.

You’re probably beginning to see the analogy with RxJava here. The second step of our pipeline is the business logic component that actually handles the request rather than just intercepting or enriching it. Although HttpServerCodec is inherently stateful (it translates incoming packets to high-level HttpRequest instances), our custom HttpHandler can be a stateless singleton:

import io.netty.channel.*; import io.netty.handler.codec.http.*; @Sharable class HttpHandler extends ChannelInboundHandlerAdapter { @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpRequest) { sendResponse(ctx); } } private void sendResponse(ChannelHandlerContext ctx) { final DefaultFullHttpResponse response = new DefaultFullHttpResponse( HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer("OK".getBytes(UTF_8))); response.headers().add("Content-length", 2); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("Error", cause); ctx.close(); } }

After constructing the response object, we write() back a DefaultFullHttpResponse . However, write() does not block like in ordinary sockets. Instead, it returns a ChannelFuture that we can subscribe via addListener() and asynchronously close the channel:

ctx .writeAndFlush(response) .addListener(ChannelFutureListener.CLOSE);

Channel is an abstraction over a communication link—for example, an HTTP connection—therefore closing a channel closes the connection. Again, we do not want to do this in order to implement persistent connections.

Netty uses just a handful of threads to process possibly thousands of connections. We do not keep any heavyweight data structures or threads per each connection. This is much closer to what actually happens close to the metal. The computer receives an IP packet and wakes up process listening on the destination port. TCP/IP connections are just an abstraction often implemented using threads. However, when the application is much more demanding in terms of load and the number of connections, operating directly at the packet level is much more robust. We still have channels (lightweight representation of threads) and pipelines with possibly stateful handlers.