Ball bearings
Ball bearings (source: Hans)

Beating the C10k Problem

The C10k problem was an area of research and optimization that tried to achieve 10,000 concurrent connections on a single commodity server. Even these days, solving this engineering task with the traditional Java toolkit is a challenge. There are many reactive approaches that easily achieve C10k, and RxJava makes them very approachable. In this chapter, we explore several implementation techniques that will improve scalability by several orders of magnitude. All of them will circle around the concept of reactive programming. If you are lucky enough to work on a greenfield project, you might consider implementing your application in a reactive manner top to bottom. Such an application should never synchronously wait for any computation or action. The architecture must be entirely event-driven and asynchronous in order to avoid blocking. We will go through several examples of a simple HTTP server and observe how it behaves with respect to design choices we made. Admittedly, performance and scalability does have a complexity price tag. But with RxJava the additional complexity will be reduced significantly.

The classic thread per connection model struggles to solve the C10k problem. With 10,000 threads we do the following:

  • Consume several gigabytes of RAM for stack space

  • Put great pressure on the garbage collection mechanism, despite that stack space is not garbage-collected (lots of GC roots and live objects)

  • Waste significant amount of CPU time simply switching cores to run different threads (context switching).

The classic thread-per-Socket model served us really well, and as a matter of fact it works quite good in many applications to this day. However, after you reach certain level of concurrency, the number of threads becomes dangerous. A thousand concurrent connections handled by a single commodity server is not something unusual, especially with long-living TCP/IP connections like HTTP with a Keep-Alive header, server-sent events, or WebSockets. However, each thread occupies a little bit of memory (stack space), regardless of whether it is computing something or just waiting idle for data.

There are two independent approaches to scalability: horizontal and vertical. To handle more concurrent connections we can simply spin up more servers, each managing a subset of the load. This requires a frontend load-balancer and does not solve the original C10k problem that expects just one server. On the other hand, vertical scalability means purchasing bigger and more capable servers. However, with blocking I/O we need a disproportional amount of memory compared to heavily underutilized CPU. Even if a big enterprise server can handle hundreds of thousands of concurrent connections (at very high price), it is far from solving C10M problem—ten million concurrent connections. This number is not a coincidence; a couple of years ago, a properly designed Java application reached that enormous level on a typical server.

This chapter takes you on a journey through different ways of implementing an HTTP server. From single-threaded servers, through thread pools, to entirely event-driven architectures. The idea behind this exercise is to compare the implementation complexity versus performance and throughput. In the end, you will notice that the version using RxJava combines both relative simplicity and outstanding performance.

Traditional Thread-Based HTTP Servers

The purpose of this section is to compare how blocking servers, even when written properly, behave under high load. This is the exercise that we probably all went through during our education: writing a server on top of raw sockets. We will be implementing an extremely simple HTTP server that responds with 200 OKs for every request. As a matter of fact, for the sake of simplicity we will ignore the request altogether.

Single threaded server

The simplest implementation just opens a ServerSocket and handles client connections as they come. When a single client is served, all other requests are queued up. The following code snippet is actually very simple:

class SingleThread {

    public static final byte[] RESPONSE = (
            "HTTP/1.1 200 OK\r\n" +
                    "Content-length: 2\r\n" +
                    "\r\n" +
                    "OK").getBytes();

    public static void main(String[] args) throws IOException {
        final ServerSocket serverSocket = new ServerSocket(8080, 100);
        while (!Thread.currentThread().isInterrupted()) {
            final Socket client = serverSocket.accept();
            handle(client);
        }
    }

    private static void handle(Socket client) {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                readFullRequest(client);
                client.getOutputStream().write(RESPONSE);
            }
        } catch (Exception e) {
            e.printStackTrace();
            IOUtils.closeQuietly(client);
        }
    }

    private static void readFullRequest(Socket client) throws IOException {
        BufferedReader reader = new BufferedReader(
                new InputStreamReader(client.getInputStream()));
        String line = reader.readLine();
        while (line != null && !line.isEmpty()) {
            line = reader.readLine();
        }
    }

}

You will not see similar low-level implementations outside of the university, but it works. For each request we ignore whatever was sent to us and return 200 OK responses. Opening localhost:8080 in the browser succeeds with an OK text reply. The class is named SingleThread for a reason. ServerSocket.accept() blocks until any client establishes a connection with us. Then, it returns a client Socket. While we interact with that Socket (read and write to it), we still listen for incoming connections but no one picks them up because our thread is busy handling first client. It is like at the doctor’s office: one patient goes in and everyone else must wait in a queue. Did you notice the extra 100 parameter after 8080 (listening port)? This value (the default is 50) caps the maximum number of pending connections that can wait in a queue. Above that number, they are rejected. To make matters worse, we pretend to implement HTTP/1.1 which uses persistent connections by default. Until the client disconnects we keep the TCP/IP connection open just in case, blocking new clients.

Now, coming back to our client connection, we first must read the entire request and then write the response. Both of these operations are potentially blocking and subject to network slowness and congestion. If one client establishes a connection but then waits a few seconds before sending a request, all other clients must wait. Having just a single thread for handling all incoming connections is clearly not very scalable, we barely solved the C1 (one concurrent connection) problem.

Appendix A contains the source code and a discussion of other blocking servers. Rather than spending more time analyzing nonscalable blocking architectures, we will briefly summarize them so that we can proceed to benchmarks and side-by-side comparisons quicker:

In “fork() Procedure in C Language,” you will find the source code of a simple server written in C language using fork(). Despite superficial simplicity, forking a new process per each client connection, especially for short-living ones, puts significant load on the operating system. Each process needs quite a bit of memory and initial startup takes some time. Also thousands of processes starting and stopping all the time unnecessarily occupy system resources.

ThreadPerConnection (see “Thread per Connection”) shows how to implement a blocking server that creates a new thread per each client connection. This presumably scales quite well, but such implementation suffers the same problems as fork() in C: starting a new thread takes some time and resources, which is especially wasteful for short-lived connections. Moreover, there is no limit to the maximum number of client threads running at the same time. And when you do not put a limit on something in the computer system, this limit will be applied for you in the worst and least expected place. For example, our program will become unstable and eventually crash with OutOfMemoryError in case of thousands of concurrent connections.

ThreadPool (see “Thread Pool of Connections”) also uses a thread per connection, but threads are recycled when a client disconnects so that we do not pay the price of thread warm up for every client. This is pretty much how all popular servlet containers like Tomcat and Jetty work, managing 100 to 200 threads in a pool by default. Tomcat has the so-called NIO connector that handles some of the operations on sockets asynchronously, but the real work in servlets and frameworks built on top of them is still blocking. This means that traditional applications are inherently limited to a couple thousand connections, even built on top of modern servlet containers.

Nonblocking HTTP Server with Netty and RxNetty

We will now focus on event-driven approaches to writing an HTTP server, which are far more promising in terms of scalability. A blocking processing model involving thread-per-request clearly does not scale. We need a way of managing several client connections with just a handful of threads. This has a lot of benefits:

  • Reduced memory consumption

  • Better CPU and CPU cache utilization

  • Greatly improved scalability on a single node

One caveat is the lost simplicity and clarity. Threads are not allowed to block on any operation, we can no longer pretend that receiving or sending data over the wire is the same as a local method invocation. The latency is unpredictable and response times higher by orders of magnitude. By the time you read this, there will probably still be quite a few spinning hard drives out there, which are even slower than a local area networks. In this section, we will develop a tiny event-driven application with the Netty framework and later refactor it to use RxNetty. Finally, we conclude with a benchmark of all solutions.

Netty is entirely event-driven; we never block waiting for data to be sent or received. Instead, raw bytes in the form of ByteBuf instances are pushed to our processing pipeline. TCP/IP gives us an impression of connection and data flowing byte after byte between two computers. But in reality TCP/IP is built on top of IP, which can barely transfer chunks of data known as packets. It is the operating system’s role to assemble them in the correct order and give the illusion of a stream. Netty drops this abstraction and works at a byte-sequence layer, not a stream. Whenever a few bytes arrive to our application, Netty will notify our handler. Whenever we send few bytes, we get a ChannelFuture without blocking (more on futures in a second).

Our example of non-blocking HTTP server has three components. The first simply starts the server and sets up the environment:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

class HttpTcpNettyServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            new ServerBootstrap()
                    .option(ChannelOption.SO_BACKLOG, 50_000)
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HttpInitializer())
                    .bind(8080)
                    .sync()
                    .channel()
                    .closeFuture()
                    .sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

This is the most basic HTTP server in Netty. The crucial part is bossGroup pool responsible for accepting incoming connections and workerGroup that processes events. These pools are not very big: one for bossGroup and close to the number of CPU cores for workerGroup but this is more than enough for a well-written Netty server. We did not specify yet what the server should do, apart from listening on port 8080. This is configurable via ChannelInitializer:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

class HttpInitializer extends ChannelInitializer<SocketChannel> {

    private final HttpHandler httpHandler = new HttpHandler();

    @Override
    public void initChannel(SocketChannel ch) {
        ch
                .pipeline()
                .addLast(new HttpServerCodec())
                .addLast(httpHandler);
    }
}

Rather than providing a single function that handles the connection, we build a pipeline that processes incoming ByteBuf instances as they arrive. The first step of the pipeline decodes raw incoming bytes into higher-level HTTP request objects. This handler is built-in. It is also used for encoding the HTTP response back to raw bytes. In more robust applications you will often see more handlers focused on smaller functionality; for example, frame decoding, protocol decoding, security, and so on. Every piece of data and notification flows through this pipeline.

You’re probably beginning to see the analogy with RxJava here. The second step of our pipeline is the business logic component that actually handles the request rather than just intercepting or enriching it. Although HttpServerCodec is inherently stateful (it translates incoming packets to high-level HttpRequest instances), our custom HttpHandler can be a stateless singleton:

import io.netty.channel.*;
import io.netty.handler.codec.http.*;

@Sharable
class HttpHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            sendResponse(ctx);
        }
    }

    private void sendResponse(ChannelHandlerContext ctx) {
        final DefaultFullHttpResponse response = new DefaultFullHttpResponse(
                HTTP_1_1,
                HttpResponseStatus.OK,
                Unpooled.wrappedBuffer("OK".getBytes(UTF_8)));
        response.headers().add("Content-length", 2);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("Error", cause);
        ctx.close();
    }
}

After constructing the response object, we write() back a DefaultFullHttpResponse. However, write() does not block like in ordinary sockets. Instead, it returns a ChannelFuture that we can subscribe via addListener() and asynchronously close the channel:

ctx
    .writeAndFlush(response)
    .addListener(ChannelFutureListener.CLOSE);

Channel is an abstraction over a communication link—for example, an HTTP connection—therefore closing a channel closes the connection. Again, we do not want to do this in order to implement persistent connections.

Netty uses just a handful of threads to process possibly thousands of connections. We do not keep any heavyweight data structures or threads per each connection. This is much closer to what actually happens close to the metal. The computer receives an IP packet and wakes up process listening on the destination port. TCP/IP connections are just an abstraction often implemented using threads. However, when the application is much more demanding in terms of load and the number of connections, operating directly at the packet level is much more robust. We still have channels (lightweight representation of threads) and pipelines with possibly stateful handlers.

Observable server with RxNetty

Netty is an important backbone behind plenty of successful products and frameworks such as Akka, Elasticsearch, HornetQ, Play framework, Ratpack and Vert.x to name a few. There is also a thin wrapper around Netty that bridges between its API and RxJava. Let’s rewrite the nonblocking Netty server into RxNetty. But we will begin with asimple currency server to become familiar with the API:

import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.reactivex.netty.protocol.tcp.server.TcpServer;

class EurUsdCurrencyTcpServer {

    private static final BigDecimal RATE = new BigDecimal("1.06448");

    public static void main(final String[] args) {
        TcpServer
            .newServer(8080)
            .<String, String>pipelineConfigurator(pipeline -> {
                pipeline.addLast(new LineBasedFrameDecoder(1024));
                pipeline.addLast(new StringDecoder(UTF_8));
            })
            .start(connection -> {
                Observable<String> output = connection
                    .getInput()
                    .map(BigDecimal::new)
                    .flatMap(eur -> eurToUsd(eur));
                return connection.writeAndFlushOnEach(output);
            })
            .awaitShutdown();
    }

    static Observable<String> eurToUsd(BigDecimal eur) {
        return Observable
            .just(eur.multiply(RATE))
            .map(amount -> eur + " EUR is " + amount + " USD\n")
            .delay(1, TimeUnit.SECONDS);
    }
}

This is a self-sufficient, standalone TCP/IP server written on top of RxNetty. You should have a rough understanding of its major parts. First, we create a new TCP/IP server listening on port 8080. Netty provides rather low-level abstraction of ByteBuf messages flowing through a pipeline. We must configure such a pipeline, as well. The first handler rearranges (splits and joins when needed) ByteBuf sequences into sequences of lines using built-in LineBasedFrameDecoder. Second, the decoder transforms a ByteBuf containing full lines into actual String instances. From this point, we are working exclusively with Strings.

Every time a new connection arrives, the callback is executed. The connection object allows us to asynchronously send and receive data. First, we begin with connection.getInput(). This object is of type Observable<String> and emits a value every time a new line of the client’s request appears on the server. The getInput() Observable notifies us asynchronously about new input. First, we parse the String into BigDecimal. Then, using the helper method eurToUsd(), we fake calling some currency exchange service. To make the example more realistic, we artificially applied delay() so that we must wait a little bit for the response. Obviously delay() is asynchronous and does not involve any sleeping. In the meantime, we keep receiving and transforming requests along the way.

After all these transformations the output Observable is fed directly into writeAndFlushOnEach(). I believe this is quite understandable—we receive a sequence of inputs, transform them, and use the transformed sequence as a sequence of outputs. Now, let’s interact with this server using telnet. Notice how some responses appear after several requests were consumed due to faked currency server latency:

$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
2.5
2.5 EUR is 2.661200 USD
0.99
0.99 EUR is 1.0538352 USD
0.94
0.94 EUR is 1.0006112 USD
20
30
40
20 EUR is 21.28960 USD
30 EUR is 31.93440 USD
40 EUR is 42.57920 USD

We treat our server like a function of request data into response data. Because the TCP/IP connection is not just a simple function but a stream of sometimes interdependent chunks of data, RxJava works amazingly well in this scenario. A rich set of operators makes it easy to transform input to output in nontrivial ways. Of course, the output stream does not have to be based on input; for example, if you are implementing server-sent events, the server simply publishes data irrespective of incoming data.

The EurUsdCurrencyTcpServer is reactive because it only acts when data comes in. We do not have a dedicated thread per each client. This implementation can easily withstand thousands of concurrent connections, and vertical scalability is limited only by the amount of traffic it must handle, not the number of more-or-less idle connections.

Knowing how RxNetty works in principle, we can go back to the original HTTP server that returns OK responses. RxNetty has built-in support for HTTP clients and servers, but we will begin from a plain implementation based on TCP/IP:

import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.reactivex.netty.examples.AbstractServerExample;
import io.reactivex.netty.protocol.tcp.server.TcpServer;

import static java.nio.charset.StandardCharsets.UTF_8;

class HttpTcpRxNettyServer {

    public static final Observable<String> RESPONSE = Observable.just(
            "HTTP/1.1 200 OK\r\n" +
            "Content-length: 2\r\n" +
            "\r\n" +
            "OK");

    public static void main(final String[] args) {
        TcpServer
            .newServer(8080)
            .<String, String>pipelineConfigurator(pipeline -> {
                pipeline.addLast(new LineBasedFrameDecoder(128));
                pipeline.addLast(new StringDecoder(UTF_8));
            })
            .start(connection -> {
                Observable<String> output = connection
                    .getInput()
                    .flatMap(line -> {
                        if (line.isEmpty()) {
                            return RESPONSE;
                        } else {
                            return Observable.empty();
                        }
                    });
                return connection.writeAndFlushOnEach(output);
            })
            .awaitShutdown();
    }
}

Having EurUsdCurrencyTcpServer in mind understanding HttpTcpRxNettyServer should be fairly simple. Because for educational purposes we are always returning static 200 OK responses, there is no point in parsing the request. However, a well-behaving server should not send a response before it read a request. Therefore, we begin by looking for an empty line in getInput(), marking the end of the HTTP request. Only then do we produce the 200 OK line. The output Observable built this way is passed to connection.writeString(). In other words, the response will be sent to the client as soon as the request contains the first empty line.

Implementing an HTTP server using TCP/IP is an entertaining exercise that helps you to understand the intricacies of HTTP. Luckily, we are not forced to implement HTTP and RESTful web services using TCP/IP abstraction all the time. Similar to Netty, RxNetty also has a bunch of built-in components to serve HTTP:

import io.reactivex.netty.protocol.http.server.HttpServer;

class RxNettyHttpServer {

    private static final Observable<String> RESPONSE_OK =
        Observable.just("OK");

    public static void main(String[] args) {
        HttpServer
            .newServer(8086)
            .start((req, resp) ->
                resp
                    .setHeader(CONTENT_LENGTH, 2)
                    .writeStringAndFlushOnEach(RESPONSE_OK)
            ).awaitShutdown();
    }

}

If you are bored with just returning a static 200 OK, we can build nonblocking RESTful web service with relative ease, again for currency exchange:

class RestCurrencyServer {

    private static final BigDecimal RATE = new BigDecimal("1.06448");

    public static void main(final String[] args) {
        HttpServer
                .newServer(8080)
                .start((req, resp) -> {
                    String amountStr = req.getDecodedPath().substring(1);
                    BigDecimal amount = new BigDecimal(amountStr);
                    Observable<String> response = Observable
                            .just(amount)
                            .map(eur -> eur.multiply(RATE))
                            .map(usd ->
                                    "{\"EUR\": " + amount + ", " +
                                     "\"USD\": " + usd + "}");
                    return resp.writeString(response);
                })
                .awaitShutdown();
    }
}

We can interact with this server using a web browser or curl. The initial substring(1) is required to strip the first slash from the request:

$ curl -v localhost:8080/10.99

> GET /10.99 HTTP/1.1
> User-Agent: curl/7.35.0
> Host: localhost:8080
> Accept: */*
>

< HTTP/1.1 200 OK
< transfer-encoding: chunked
<

{"EUR": 10.99, "USD": 11.6986352}

Having a handful of implementations of this simple HTTP server we can compare them in terms of performance, scalability, and throughput. This is the reason why we abandoned the familiar thread-based model and began using RxJava and asynchronous APIs in the first place.

Article image: Ball bearings (source: Hans).