Chapter 4. Applying Reactive Programming to Existing Applications

Introducing a new library, technology, or paradigm to an application, be it greenfield or legacy codebase, must be a careful decision. RxJava is not an exception. In this chapter, we review some patterns and architectures found in ordinary Java applications and see how Rx can help. This process is not straightforward and requires a significant mindset shift, therefore we will carefully transform from imperative to functional and reactive style. Many libraries in Java projects these days simply add bloat without giving anything in return. However, you will see how RxJava not only simplifies traditional projects, but what kinds of benefits it brings to legacy platforms.

I am pretty sure that you’re already very excited about RxJava. Built-in operators and simplicity makes Rx an amazingly powerful tool for transforming streams of events. However, if you go back to your office tomorrow, you will realize that there are no streams, no real-time events from stock exchange. You can hardly find any events in your applications; it’s just a mash-up of web requests, databases, and external APIs. You are so eager to try this new RxJava-thing somewhere beyond Hello world. Yet it seems that there are simply no use cases in real life that justify using Rx. Yet, RxJava can be a significant step forward in terms of architectural consistency and robustness. You do not need to commit to reactive style top-to-bottom—this is too risky and requires too much work in the beginning. But Rx can be introduced at any layer, without breaking an application as a whole.

We take you through some common application patterns and ways by which you can enhance them with RxJava in noninvasive way, with the focus being on database querying, caching, error handling, and periodic tasks. The more RxJava you add in various places of your stack the more consistent your architecture will become.

From Collections to Observables

Unless your platform was built recently in JVM frameworks like Play, Akka actors, or maybe Vert.x, you are probably on a stack with a servlet container on one hand, and JDBC or web services on the other. Between them, there is a varying number of layers implementing business logic, which we will not refactor all at once; instead, let’s begin with a simple example. The following class represents a trivial repository abstracting us from a database:

class PersonDao {

    List<Person> listPeople() {
        return query("SELECT * FROM PEOPLE");
    }

    private List<Person> query(String sql) {
        //...
    }

}

Implementation details aside, how is this related to Rx? So far we have been talking about asynchronous events pushed from upstream systems or, at best, when someone subscribes. How is this mundane Dao relevant here? Observable is not only a pipe pushing events downstream. You can treat Observable<T> as a data structure, dual to Iterable<T>. They both hold items of type T, but providing a radically different interface. So, it shouldn’t come as a surprise that you can simply replace one with the other:

Observable<Person> listPeople() {
    final List<Person> people = query("SELECT * FROM PEOPLE");
    return Observable.from(people);
}

At this point, we made a breaking change to the existing API. Depending on how big your system is, such incompatibility might be a major concern. Thus, it is important to bring RxJava into your API as soon as possible. Obviously, we are working with an existing application so that can’t be the case.

BlockingObservable: Exiting the Reactive World

If you are combining RxJava with existing, blocking and imperative code, might need have to translate Observable to a plain collection. This transformation is rather unpleasant, it requires blocking on an Observable waiting for its completion. Until Observable completes, we are not capable of creating a collection. BlockingObservable is a special type that makes it easier to work with Observable in nonreactive environment. BlockingObservable should be your last choice when working with RxJava, but it is inevitable when combining blocking and nonblocking code.

In Chapter 3, we refactored the listPeople() method so that it returns Observable<People> rather than List. Observable is not an Iterable in any sense, so our code no longer compiles. We want to take baby steps rather than massive refactoring, so let’s keep the scope of changes as minimal as possible. The client code could look like this:

List<Person> people = pesonDao.listPeople();
String json = marshal(people);

We can imagine the marshal() method pulling data from the people collection and serializing them to JSON. That’s no longer the case, we can’t simply pull items from Observable when we want. Observable is in charge of producing (pushing) items and notifying subscribers if any. This radical change can be easily circumvented with BlockingObservable. This handy class is entirely independent from Observable and can be obtained via the Observable.toBlocking() method. The blocking variant of Observable has superficially similar methods like single() or subscribe(). However, BlockingObservable is much more convenient in blocking environments that are inherently unprepared for the asynchronous nature of Observable. Operators on BlockingObservable typically block (wait) until the underlying Observable is completed. This strongly contradicts the main concept of Observables that everything is likely asynchronous, lazy, and processed on the fly. For example, Observable.forEach() will asynchronously receive events from Observable as they come in, whereas BlockingObservable.forEach() will block until all events are processed and stream is completed. Also exceptions are no longer propagated as values (events) but instead are rethrown in the calling thread.

In our case, we want to transform Observable<Person> back into List<Person> to limit the scope of refactoring:

Observable<Person> peopleStream = personDao.listPeople();
Observable<List<Person>> peopleList = peopleStream.toList();
BlockingObservable<List<Person>> peopleBlocking = peopleList.toBlocking();
List<Person> people = peopleBlocking.single();

I intentionally left all intermediate types explicit in order to explain what happens. After refactoring to Rx, our API returns Observable<Person> peopleStream. This stream can potentially be fully reactive, asynchronous, and event driven, which doesn’t match at all what we need: a static List. As the first step, we turn Observable<Person> into Observable<List<Person>>. This lazy operator will buffer all Person events and keep them in memory until the onCompleted() event is received. At this point, a single event of type List<Person> will be emitted, containing all seen events at once, as illustrated in the following marble diagram:

image

The resulting stream completes immediately after emitting a single List item. Again, this operator is asynchronous; it doesn’t wait for all events to arrive but instead lazily buffers all values. The awkward looking Observable<List<Person>> peopleList is then converted to BlockingObservable<List<Person>> peopleBlocking. BlockingObservable is a good idea only when you must provide a blocking, static view of your otherwise asynchronous Observable. Whereas Observable.from(List<T>) converts normal pull-based collection into Observable, toBlocking() does something quite the opposite. You might ask yourself why we need two abstractions for blocking and nonblocking operators. The authors of RxJava figured out that being explicit about synchronous versus asynchronous nature of underlying operator is too crucial to be left for JavaDoc. Having two unrelated types ensures that you always work with the appropriate data structure. Moreover, BlockingObservable is your weapon of last resort; normally, you should compose and chain plain Observables as long as possible. However, for the purpose of this exercise, let’s escape from Observable right away. The last operator single() drops observables altogether and extracts one, and only one, item we expect to receive from BlockingObservable<T>. A similar operator, first(), will return a value of T and discard whatever it has left. single(), on the other hand, makes sure there are no more pending events in underlying Observable before terminating. This means single() will block waiting for onCompleted() callback. Here is the same code snippet as earlier, this time with all operators chained:

List<Person> people = personDao
    .listPeople()
    .toList()
    .toBlocking()
    .single();

You might think that we went through all this hassle of wrapping and unwrapping Observable for no apparent reason. Remember, this was just the first step. The next transformation will introduce some laziness. Our code as it stands right now always executes query("...") and wraps it with Observable. As you know by now, Observables (especially cold ones) are lazy by definition. As long as no one subscribes, they just represent a stream that never had a chance to begin emitting values. Most of the time you can call methods returning Observable and as long as you don’t subscribe, no work will be done. Observable is like a Future because it promises a value to appear in the future. But as long as you don’t request it, a cold Observable will not even begin emitting. From that perspective, Observable is more similar to java.util.function.Supplier<T>, generating values of type T on demand. Hot Observables are different because they emit values whether you are listening or not, but we are not considering them right now. The mere existence of Observable does not indicate a background job or any side effect, unlike Future, which almost always suggests some operation running concurrently.

Embracing Laziness

So how do we make our Observable lazy? The simples technique is to wrap an eager Observable with defer():

public Observable<Person> listPeople() {
    return Observable.defer(() ->
        Observable.from(query("SELECT * FROM PEOPLE")));
}

Observable.defer() takes a lambda expression (a factory) that can produce Observable. The underlying Observable is eager, so we want to postpone its creation. defer() will wait until the last possible moment to actually create Observable; that is, until someone actually subscribes to it. This has some interesting implications. Because Observable is lazy, calling listPeople() has no side effects and almost no performance footprint. No database is queried yet. You can treat Observable<Person> as a promise but without any background processing happening yet. Notice that there is no asynchronous behavior at the moment, just lazy evaluation. This is similar to how values in the Haskell programming language are evaluated lazily only when absolutely needed.

If you never programmed in functional languages, you might be quite confused why laziness is so important and groundbreaking. It turns out that such behavior is quite useful and can improve the quality and freedom of your implementation quite a bit. For example, you no longer have to pay attention to which resources are fetched, when, and in what order. RxJava will load them only when they are absolutely needed.

As an example take this trivial fallback mechanism that we have all seen so many times:

void bestBookFor(Person person) {
    Book book;
    try {
        book = recommend(person);
    } catch (Exception e) {
        book = bestSeller();
    }
    display(book.getTitle());
}

void display(String title) {
    //...
}

You probably think there is nothing wrong with such a construct. In this example, we try to recommend the best book for a given person, but in case of failures, we degrade gracefully and display the best seller. The assumption is that fetching a bestseller is faster and can be cached. But what if you could add error handling declaratively so that try-catch blocks aren’t obscuring real logic?

void bestBookFor(Person person) {
    Observable<Book> recommended = recommend(person);
    Observable<Book> bestSeller = bestSeller();
    Observable<Book> book = recommended.onErrorResumeNext(bestSeller);
    Observable<String> title = book.map(Book::getTitle);
    title.subscribe(this::display);
}

We are only exploring RxJava so far, thus I left all these intermediate values and types. In real life, bestBookFor() would look more like this:

void bestBookFor(Person person) {
    recommend(person)
            .onErrorResumeNext(bestSeller())
            .map(Book::getTitle)
            .subscribe(this::display);
}

This code is beautifully concise and readable. First find a recommendation for person. In case of error (onErrorResumeNext), proceed with a bestseller. No matter which one succeeded, map returns a value by extracting the title and then displays it. onErrorResumeNext() is a powerful operator that intercepts exceptions happening upstream, swallows them, and subscribes to provided backup Observable. This is how Rx implements a try-catch clause. We will spend much more time on error handling later in this book (see “Declarative try-catch Replacement”). For the time being, notice how we can lazily call bestSeller() without worrying that fetching best seller happens even when a real recommendation went fine.

Composing Observables

SELECT * FROM PEOPLE is not really a state-of-the-art SQL query. First, you should not fetch all columns blindly, but fetching all rows is even more damaging. Our old API is not capable of paging results, viewing just a subset of a table. It might look like this, again in traditional enterprise application:

List<Person> listPeople(int page) {
    return query(
            "SELECT * FROM PEOPLE ORDER BY id LIMIT ? OFFSET ?",
            PAGE_SIZE,
            page * PAGE_SIZE
    );
}

This is not a SQL book, so we’re going to set the implementation details aside. The author of this API was merciless: we don’t have the freedom to choose any range of records, we can only operate on 0-based page numbers. However in RxJava, due to laziness we can actually simulate reading an entire database starting from given page:

import static rx.Observable.defer;
import static rx.Observable.from;


Observable<Person> allPeople(int initialPage) {
    return defer(() -> from(listPeople(initialPage)))
            .concatWith(defer(() ->
                    allPeople(initialPage + 1)));
}

This code snippet lazily loads the initial page of database records, for example 10 items. If no one subscribes, even this first query is not invoked. If there is a subscriber that only consumes a few initial elements (e.g., allPeople(0).take(3)), RxJava will unsubscribe automatically from our stream and no more queries are executed. So what happens when we request, say, 11 items but the first listPeople() call returned only 10? Well, RxJava figures out that the initial Observable is exhausted but the consumer is still hungry. Luckily, it sees concatWith() operator, that basically says: when the Observable on the left is completed, rather than propagating completion notification to subscribers, subscribe to Observable on the right and continue as if nothing happened, as depicted in the following marble diagram:

image

In other words, concatWith() can join together two Observables so that when the first one completes, the second one takes over. In a.concatWith(b).subscribe(...), subscriber will first receive all events from a, followed by all events from b. In this case, the subscriber first receives an initial 10 items followed by a subsequent 10. However, look carefully, there is an alleged infinite recursion in our code! allPeople(initialPage) calls allPeople(initialPage + 1) without any stop condition. This is a recipe for StackOverflowError in most languages, but not here. Again, calling allPeople() is always lazy, therefore the moment you stop listening (unsubscribe), this recursion is over. Technically concatWith() can still produce StackOverflowError here. Wait until “Honoring the Requested Amount of Data”, you will learn how to deal with the varying demand for incoming data.

The technique of lazily loading data chunk by chunk is quite useful because it allows you to concentrate on business logic, not on low-level plumbing. We already see some benefits of applying RxJava even on a small scale. Designing an API with Rx in mind doesn’t influence the entire architecture, because we can always fall back to BlockingObservable and Java collections. But it’s better to have wide range of possibilities that we can further trim down if necessary.

Lazy paging and concatenation

There are more ways to implement lazy paging with RxJava. If you think about it, the simplest way of loading paged data is to load everything and then take whatever we need. It sounds silly, but thanks to laziness it is feasible. First we generate all possible page numbers and then we request loading each and every page individually:

Observable<List<Person>> allPages = Observable
            .range(0, Integer.MAX_VALUE)
            .map(this::listPeople)
            .takeWhile(list -> !list.isEmpty());

If this were not RxJava, the preceding code would take an enormous amount of time and memory, basically loading the entire database to memory. But because Observable is lazy, no query to the database appeared yet. Moreover, if we find an empty page it means all further pages are empty, as well (we reached the end of the table). Therefore, we use takeWhile() rather than filter(). To flatten allPages to Observable<Person> we can use concatMap() (see “Preserving Order Using concatMap()”):

Observable<Person> people = allPages.concatMap(Observable::from);

concatMap() requires a transformation from List<Person> to Observable<Person>, executed for each page. Alternatively we can try concatMapIterable(), which does the same thing, but the transformation should return an Iterable<Person> for each upstream value (happening to be Iterable<Person> already):

Observable<Person> people = allPages.concatMapIterable(page -> page);

No matter which approach you choose, all transformations on Person object are lazy. As long as you limit the number of records you want to process (for example with people.take(15)), the Observable<Person> will invoke listPeople() as late as possible.

Imperative Concurrency

I don’t often see explicit concurrency in enterprise applications. Most of the time a single request is handled by a single thread. The same thread does the following:

  • Accepts TCP/IP connection

  • Parses HTTP request

  • Calls a controller or servlet

  • Blocks on database call

  • Processes results

  • Encodes response (e.g., in JSON)

  • Pushes raw bytes back to the client

This layered model affects user latency when the backend makes several independent requests for instance to database. They are performed sequentially, whereas one could easily parallelize them. Moreover scalability is affected. For example in Tomcat there are 200 threads by default in the executors that are responsible for handling requests. This means that we can’t handle more than 200 concurrent connections. In case of a sudden but short burst of traffic, incoming connections are queued and the server responds with higher latency. However, this situation can’t last forever, and Tomcat will eventually begin rejecting incoming traffic. We will devote large parts of the next chapter (see “Nonblocking HTTP Server with Netty and RxNetty”) on how to deal with this rather embarrassing shortcoming. For the time being, let’s stay with traditional architecture. Executing every step of request handling within a single thread has some benefits, for example improved cache locality and minimal synchronization overhead.1 Unfortunately, in classic applications, because overall latency is the sum of each layer’s latencies, one malfunctioning component can have a negative impact on total latency.2 Moreover, sometimes there are many steps that are independent from one another and can be executed concurrently. For example, we invoke multiple external APIs or execute several independent SQL queries.

JDK has quite good support for concurrency, especially since Java 5 with ExecutorService and Java 8 with CompletableFuture . Nonetheless, it is not as widely used as it could be. For example, let’s look at the following program with no concurrency whatsoever:

Flight lookupFlight(String flightNo) {
    //...
}

Passenger findPassenger(long id) {
    //...
}

Ticket bookTicket(Flight flight, Passenger passenger) {
    //...
}

SmtpResponse sendEmail(Ticket ticket) {
    //...
}

And on the client side:

Flight flight = lookupFlight("LOT 783");
Passenger passenger = findPassenger(42);
Ticket ticket = bookTicket(flight, passenger);
sendEmail(ticket);

Again, quite typical, classic blocking code, similar to what you can find in many applications. But if you look carefully from a latency perspective, the preceding code snippet has four steps; however, the first two are independent from each other. Only the third step (bookTicket()) needs results from lookupFlight() and findPassenger(). There exists an obvious opportunity to take advantage of concurrency. Yet, very few developers will actually go down this path because it requires awkward thread pools, Futures, and callbacks. What if the API were already Rx-compatible, though? Remember, you can simply wrap blocking, legacy code in Observable, just like we did in the beginning of this chapter:

Observable<Flight> rxLookupFlight(String flightNo) {
    return Observable.defer(() ->
            Observable.just(lookupFlight(flightNo)));
}

Observable<Passenger> rxFindPassenger(long id) {
    return Observable.defer(() ->
            Observable.just(findPassenger(id)));
}

Semantically, the rx- methods do exactly the same thing and in the same way; that is, they are blocking by default. We didn’t gain anything yet, apart from a more verbose API from the client perspective:

Observable<Flight> flight = rxLookupFlight("LOT 783");
Observable<Passenger> passenger = rxFindPassenger(42);
Observable<Ticket> ticket =
        flight.zipWith(passenger, (f, p) -> bookTicket(f, p));
ticket.subscribe(this::sendEmail);

Both traditional blocking programs and the one with Observable work exactly the same way. It’s lazier by default, but the order of operations is essentially the same. First, we create Observable<Flight>, which as you already know, does nothing by default. Unless someone explicitly asks for a Flight, this Observable is just a lazy placeholder. We already learned that this is a valuable property of cold Observables. The same story goes for Observable<Passenger>; we have two placeholders of type Flight and Passenger, however no side-effects were performed yet. No database query or web-service call. If we decide to stop processing here, no superfluous work was done.

To proceed with bookTicket(), we need concrete instances of both Flight and Passenger. It is tempting to just block on these two Observables by using the toBlocking() operator. However, we would like to avoid blocking as much as possible to reduce resource consumption (especially memory) and allow greater concurrency. Another poor solution is to .subscribe() on the flight and passenger Observables and somehow wait for both callbacks to finish. It’s fairly straightforward when Observable is blocking, but if callbacks appear asynchronously and you need to synchronize some global state waiting for both of them, this quickly becomes a nightmare. Also a nested subscribe() is nonidiomatic, and typically you want a single subscription for one message flow (use case). The only reason why callbacks work somewhat decently in JavaScript is because there is just one thread. The idiomatic way of subscribing to multiple Observables at the same time is zip and zipWith. You might perceive zip as a way to join two independent streams of data pair-wise. But far more often, zip is simply used to join together two single-item Observables. ob1.zip(ob2).subscribe(...) essentially means that receiving an event when both ob1 and ob2 are done (emit an event on their own). So whenever you see zip, it’s more likely that someone is simply making a join step on two or more Observables that had forked paths of execution. zip is a way to asynchronously wait for two or more values, no matter which one appears last.

So let’s get back to flight.zipWith(passenger, this::bookTicket) (a shorter syntax using method reference instead of explicit lambda, as in the code sample). The reason I keep all of the type information rather than fluently joining expressions is because I want you to pay attention to return types. flight.zipWith(passenger, ...) doesn’t simply invoke callback when both flight and passenger are done; it returns a new Observable which you should immediately recognize as a lazy placeholder for data. Amazingly, at this point in time no computation was yet started, as well. We simply wrapped few data structures together, but no behavior was triggered. As long as no one subscribes to Observable<Ticket>, RxJava won’t run any backend code. This is what finally happens in last statement: ticket.subscribe() explicitly asks for Ticket.

Where to Subscribe?

Pay attention to where you see subscribe() in domain code. Often your business logic is just composing Observables all the way down and returning them to some sort of framework or scaffolding layer. The actual subscription happens behind the scenes in a web framework or some glue code. It is not a bad practice to call subscribe() yourself, but try to push it out as far as possible.

To understand the flow of execution, it’s useful to look bottom up. We subscribed to ticket, thus RxJava must subscribe transparently to both flight and passenger. At this point the real logic happens. Because both Observables are cold and no concurrency is yet involved, the first subscription to flight invokes the lookupFlight() blocking method right in the calling thread. When lookupFlight() is done, RxJava can subscribe to passenger. However, it already received a Flight instance from synchronous flight. rxFindPassenger() calls findPassenger() in a blocking fashion and receives a Passenger instance. At this juncture, data flows back downstream. Instances of Flight and Passenger are combined using the provided lambda (bookTicket) and passed to ticket.subscribe().

This sounds like a lot of work considering it behaves and works essentially just like our blocking code in the beginning. But now we can declaratively apply concurrency without changing any logic. If our business methods returned Future<Flight> (or CompletableFuture<Flight>, it doesn’t really matter), two decisions would have been made for us:

  • The underlying invocation of lookupFlight() already began and there is no place for laziness. We don’t block on such method, but work already started.

  • We have no control over concurrency whatsoever, it is the method implementation that decides whether a Future task is invoked in a thread pool, new thread per request, and so on.

RxJava gives users more control. Just because Observable<Flight> wasn’t implemented with concurrency in mind, this does not mean that we cannot apply it later. Real-world Observables are typically asynchronous already, but in rare cases you must add concurrency to an existing Observable. The consumers of our API, not the implementors, are free to choose the threading mechanism in case of the synchronous Observable. All of this is achieved by using the subscribeOn() operator:

Observable<Flight> flight =
    rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
Observable<Passenger> passenger =
    rxFindPassenger(42).subscribeOn(Schedulers.io());

At any point before subscribing, we can inject subscribeOn() operator and provide a so-called Scheduler instance. In this case, I used the Schedulers.io() factory method, but we can just as well use a custom ExecutorService and quickly wrap it with Scheduler. When subscription occurs, the lambda expression passed to Observable.create() is executed within the supplied Scheduler rather than the client thread. It is not necessary yet but we will examine schedulers in depth in “What Is a Scheduler?” section. For the time being, treat a Scheduler like a thread pool.

How does Scheduler change the runtime behavior of our program? Remember that the zip() operator subscribes to two or more Observables and waits for pairs (or tuples). When subscription occurs asynchronously, all upstream Observables can call their underlying blocking code concurrently. If you now run your program, lookupFlight() and findPassenger() will begin execution immediately and concurrently when ticket.subscribe() is invoked. Then, bookTicket() will be applied as soon as the slower of the aforementioned Observables emits a value.

Talking about slowness, you can declaratively apply a timeout, as well, when a given Observable does not emit any value in the specified amount of time:

rxLookupFlight("LOT 783")
    .subscribeOn(Schedulers.io())
    .timeout(100, TimeUnit.MILLISECONDS)

As always, in case of errors, they are propagated downstream rather than thrown arbitrarily. So if the lookupFlight() method takes more than 100 milliseconds, you will end up with TimeoutException rather than an emitted value sent downstream to every subscriber. The timeout() operator is exhaustively explained in “Timing Out When Events Do Not Occur”.

We ended up with two methods running concurrently without much effort, assuming that your API is already Rx-driven. But we cheated a little bit with bookTicket() still returning Ticket, which definitely means it is blocking. Even if booking ticket was extremely fast, it is still worth declaring it as such, just to make the API easier to evolve. The evolution might mean adding concurrency or using in fully nonblocking environments (see Chapter 5). Remember that turning a nonblocking API into a blocking one is as easy as calling toBlocking(). The opposite is often challenging and requires lots of extra resources. Also, it is very difficult to predict the evolution of methods like rxBookTicket(), if they ever touch the network or filesystem, not to mention database, it is worth it to wrap them with an Observable indicating possible latency on the type level:

Observable<Ticket> rxBookTicket(Flight flight, Passenger passenger) {
    //...
}

But now zipWith() returns an awkward Observable<Observable<Ticket>> and the code no longer compiles. A good rule of thumb is that whenever you see double-wrapped type (for example Optional<Optional<...>>) there is a flatMap() invocation missing somewhere. That’s the case here, as well. zipWith() takes a pair (or more generally a tuple) of events, applies a function taking these events as arguments, and puts the result into the downstream Observable as-is. This is why we saw Observable<Ticket> first but now it’s Observable<Observable<Ticket>>, where Observable<Ticket> is the result of our supplied function. There are two ways to overcome this problem. One way is by using an intermediate pair returned from zipWith:

import org.apache.commons.lang3.tuple.Pair;

Observable<Ticket> ticket = flight
        .zipWith(passenger, (Flight f, Passenger p) -> Pair.of(f, p))
        .flatMap(pair -> rxBookTicket(pair.getLeft(), pair.getRight()));

If using an explicit Pair from third-party library did not obscure flow enough, method reference would actually work: Pair::of, but again, we decided that visible type information is more valuable than saving a few keystrokes. After all we read code for much more time than we write it. An alternative to an intermediate pair is applying a flatMap with an identity function:

Observable<Ticket> ticket = flight
        .zipWith(passenger, this::rxBookTicket)
        .flatMap(obs -> obs);

This obs -> obs lambda expression is seemingly not doing anything, at least if it were a map() operator. But remember that flatMap() applies a function to each value inside Observable, so this function takes Observable<Ticket> as an argument in our case. Later, the result is not placed directly in the resulting stream, like with map(). Instead, the return value (of type Observable<T>) is “flattened,” leading to an Observable<T> rather than Observable<Observable<T>>. When dealing with schedulers, the flatMap() operator becomes even more powerful. You might perceive flatMap() as merely a syntactic trick to avoid a nested Observable<Observable<...>> problem, but it’s much more fundamental than this.

Observable.subscribeOn() Use Cases

It is tempting to think that subscribeOn() is the right tool for concurrency in RxJava. This operator works but you should not see the usage of subscribeOn() (and yet to be described observeOn()) often. In real life, Observables come from asynchronous sources, so custom scheduling is not needed at all. We use subscribeOn() throughout this chapter to explicitly show how to upgrade existing applications to use reactive principles selectively. But in practice, Schedulers and subscribeOn() are weapons of last resort, not something seen commonly.

flatMap() as Asynchronous Chaining Operator

In our sample application, we must now send a list of Tickets via e-mail. But we must keep in mind the following:

  1. The list can be potentially quite long.

  2. Sending an email might take several milliseconds or even seconds.

  3. The application must keep running gracefully in case of failures, but report in the end which tickets failed to be delivered.

The last requirement quickly rules out simple tickets.forEach(this::sendEmail) because it eagerly throws an exception and won’t continue with tickets that were not yet delivered. Exceptions are actually a nasty back door to the type system and just like callbacks are not very friendly when you want to manage them in a more robust way. That is why RxJava models them explicitly as special notifications, but be patient, we will get there. In light of the error-handling requirement, our code looks more-or-less like that:

List<Ticket> failures = new ArrayList<>();
for(Ticket ticket: tickets) {
    try {
        sendEmail(ticket);
    } catch (Exception e) {
        log.warn("Failed to send {}", ticket, e);
        failures.add(ticket);
    }
}

However, the first two requirements or guidelines aren’t addressed. There is no reason why we keep sending emails from one thread sequentially. Traditionally, we could use an ExecutorService pool for that by submitting each email as a separate task:

List<Pair<Ticket, Future<SmtpResponse>>> tasks = tickets
    .stream()
    .map(ticket -> Pair.of(ticket, sendEmailAsync(ticket)))
    .collect(toList());

List<Ticket> failures = tasks.stream()
    .flatMap(pair -> {
        try {
            Future<SmtpResponse> future = pair.getRight();
            future.get(1, TimeUnit.SECONDS);
            return Stream.empty();
        } catch (Exception e) {
            Ticket ticket = pair.getLeft();
            log.warn("Failed to send {}", ticket, e);
            return Stream.of(ticket);
        }
    })
    .collect(toList());

//------------------------------------

private Future<SmtpResponse> sendEmailAsync(Ticket ticket) {
    return pool.submit(() -> sendEmail(ticket));
}

That’s a fair amount of code that all Java programmers should be familiar with. Yet it seems too verbose and accidentally complex. First, we iterate over tickets and submit them to a thread pool. To be precise, we call the sendEmailAsync() helper method that submits sendEmail() invocation wrapped in Callable<SmtpResponse> to a thread pool. Even more precise instances of Callable are first placed in an unbounded (by default) queue in front of a thread pool. Lack of mechanisms that slow down too rapid submission of tasks if they cannot be processed on time led to reactive streams and backpressure effort (see “Backpressure”).

Because later we will need a Ticket instance in case of failure, we must keep track of which Future was responsible for which Ticket, again in a Pair. In real production code, you should consider a more meaningful and dedicated container like a TicketAsyncTask value object. We collect all such pairs and proceed to the next iteration. At this point, the thread pool is already running multiple sendEmail() invocations concurrently, which is precisely what we were aiming at. The second loop goes through all Futures and tries to dereference them by blocking (get()) and awaiting for completion. If get() returns successfully, we skip such a Ticket. However, if there is an exception we return Ticket instance that was associated with this task—we know it failed and we want to report it later. Stream.flatMap() allows us to return zero or one elements (or actually any number), contrary to Stream.map(), which always requires one.

You might be wondering why we need two loops instead of just one like this:

//WARNING: code is sequential despite utilizing thread pool
List<Ticket> failures = tickets
        .stream()
        .map(ticket -> Pair.of(ticket, sendEmailAsync(ticket)))
        .flatMap(pair -> {
            //...
        })
        .collect(toList());

This is an interesting bug that is really difficult to find if you don’t understand how Streams in Java 8 work. Because streams—just like Observables—are lazy, they evaluate the underlying collection one element at a time and only when terminal operation was requested (e.g., collect(toList())). This means that a map() operation starting background tasks is not executed on all tickets immediately; rather, it’s done one at a time, alternately by using a flatMap() operation. Furthermore, we really start one Future, block waiting for it, start a second Future, block waiting on that, and so on. An intermediate collection is needed to force evaluation, not because of clarity or readability. After all, List<Pair<Ticket, Future<SmtpResponse>>> type is hardly more readable.

That’s plenty of work and the possibility of mistake is high, so it’s no wonder that developers are reluctant to apply concurrent code on a daily basis. The little-known ExecutorCompletionService from JDK is sometimes used when there is a pool of asynchronous tasks and we want to process them as they complete. Moreover, Java 8 brings CompletableFuture (see “CompletableFuture and Streams”) that is entirely reactive and nonblocking. But how can RxJava help here? First, assume that an API for sending an email is already retrofitted to use RxJava:

import static rx.Observable.fromCallable;

Observable<SmtpResponse> rxSendEmail(Ticket ticket) {
    //unusual synchronous Observable
    return fromCallable(() -> sendEmail())
}

There is no concurrency involved, just wrapping sendEmail() inside an Observable. This is a rare Observable; ordinarily you would use subscribeOn() in the implementation so that the Observable is asynchronous by default. At this point, we can iterate over all tickets as before:

List<Ticket> failures = Observable.from(tickets)
    .flatMap(ticket ->
        rxSendEmail(ticket)
            .flatMap(response -> Observable.<Ticket>empty())
            .doOnError(e -> log.warn("Failed to send {}", ticket, e))
            .onErrorReturn(err -> ticket))
    .toList()
    .toBlocking()
    .single();

Observable.ignoreElements()

It is easy to see that inner flatMap() in our example ignores response and returns an empty stream. In such cases, flatMap() is an overkill; the ignoreElements() operator is far more efficient. ignoreElements() simply ignores all emitted values and forwards onCompleted() or onError() notifications. Because we are ignoring the actual response and just deal with errors, ignoreElements() works great here.

All we are interested in lies inside the outer flatMap(). If it were just flatMap(this::rxSendEmail), code would work; however, any failure emitted from rxSendEmail would terminate the entire stream. But we want to “catch” all emitted errors and collect them for later consumption. We use a similar trick to Stream.flatMap(): if response was successfully emitted, we transform it to an empty Observable. This basically means that we discard successful tickets. However, in case of failures, we return a ticket that raised an exception. An extra doOnError() callback allows us to log exception—of course we can just as well add logging to onErrorReturn() operator, but I found this separation of concerns more functional.

To remain compatible with previous implementations, we transform Observable into Observable<List<Ticket>>, BlockingObservable<List<Ticket>>, toBlocking(), and finally List<Ticket> (single()). Interestingly, even BlockingObservable remains lazy. A toBlocking() operator on its own doesn’t force evaluation by subscribing to the underlying stream and it doesn’t even block. Subscription and thus iteration and sending emails is postponed until single() is invoked.

Note that if we replace the outer flatMap() with concatMap() (see “Ways of Combining Streams: concat(), merge(), and switchOnNext()” and “Preserving Order Using concatMap()”), we will encounter a similar bug as the mentioned with JDK’s Stream. As opposed to flatMap() (or merge) that subscribe immediately to all inner streams, concatMap (or concat) subscribes one inner Observable after another. And as long as no one subscribed to Observable, no work even began.

So far, a simple for loop with a trycatch was replaced with less readable and more complex Observable. However, to turn our sequential code into multithreaded computation we barely need to add one extra operator:

Observable
        .from(tickets)
        .flatMap(ticket ->
                rxSendEmail(ticket)
                        .ignoreElements()
                        .doOnError(e -> log.warn("Failed to send {}", ticket, e))
                        .onErrorReturn(err -> ticket)
                        .subscribeOn(Schedulers.io()))

It is so noninvasive that you might find it hard to spot. One extra subscribeOn() operator causes each individual rxSendMail() to be executed on a specified Scheduler (io(), in this case). This is one of the strengths of RxJava; it is not opinionated about threading, defaulting to synchronous execution but allowing seamless and almost transparent multithreading. Of course, this doesn’t mean that you can safely inject schedulers in arbitrary places. But at least the API is less verbose and higher level. We will explore schedulers in much more detail later in “Multithreading in RxJava”. For the time being remember that Observables are synchronous by default; however, we can easily change that and apply concurrency in places where it was least expected. This is especially valuable in existing legacy applications, which you can optimize without much hassle.

Wrapping up if you are implementing Observables from scratch, making them asynchronous by default is more idiomatic. That means placing subscribeOn() directly inside rxSendEmail() rather than externally. Otherwise, you risk wrapping already asynchronous streams with yet another layer of schedulers. Of course, if the producer behind Observable is already asynchronous, it is even better because your stream does not bind to any particular thread. Additionally, you should postpone subscribing to an Observable as late as possible, typically close to the web framework of our outside world. This changes your mindset significantly. Your entire business logic is lazy until someone actually wants to see the results.3

Replacing Callbacks with Streams

Traditional APIs are blocking most of the time, meaning they force you to wait synchronously for the results. This approach works relatively well, at least before you heard about RxJava. But a blocking API is particularly problematic when data needs to be pushed from the API producer to consumers—this is anarea where RxJava really shines. There are numerous examples of such cases and various approaches are taken by API designers. Typically, we need to provide some sort of a callback that the API invokes, often called event listeners. One of the most common scenarios like that is Java Message Service (JMS). Consuming JMS typically involves implementing a class that the application server or container notifies on every incoming messages. We can replace with relative ease such listeners with a composable Observable, which is much more robust and versatile. The traditional listener looks similar to this class, here using JMS support in Spring framework, but our solution is technology-agnostic:

@Component
class JmsConsumer {

    @JmsListener(destination = "orders")
    public void newOrder(Message message) {
        //...
    }
}

When a JMS message is received, the JmsConsumer class must decide what to do with it. Typically, some business logic is invoked inside a message consumer. When a new component wants to be notified about such messages, it must modify JmsConsumer appropriately. Coversely, imagine Observable<Message> that can be subscribed to by anyone. Moreover, an entire universe of RxJava operators is available, allowing mapping, filtering, and combining capabilities. The easiest way to convert from a push, callback-based API to Observable is to use Subjects. Every time a new JMS message is delivered, we push that message to a PublishSubject that looks like an ordinary hot Observable from the outside:

private final PublishSubject<Message> subject = PublishSubject.create();

@JmsListener(destination = "orders", concurrency="1")
public void newOrder(Message msg) {
    subject.onNext(msg);
}

Observable<Message> observe() {
    return subject;
}

Keep in mind that Observable<Message> is hot; it begins emitting JMS messages as soon as they are consumed. If no one is subscribed at the moment, messages are simply lost. ReplaySubject is an alternative, but because it caches all events since the application startup, it’s not suitable for long-running processes. In case you have a subscriber that absolutely must receive all messages, ensure that it subscribes before the JMS message listener is initialized. Additionally, our message listener has a concurrency="1" parameter to ensure that Subject is not invoked from multiple threads. As an alternative, you can use Subject.toSerialized().

As a side note, Subjects are easier to get started but are known to be problematic after a while. In this particular case, we can easily replace Subject with the more idiomatic RxJava Observable that uses create() directly:

public Observable<Message> observe(
    ConnectionFactory connectionFactory,
    Topic topic) {
    return Observable.create(subscriber -> {
        try {
            subscribeThrowing(subscriber, connectionFactory, topic);
        } catch (JMSException e) {
            subscriber.onError(e);
        }
    });
}

private void subscribeThrowing(
        Subscriber<? super Message> subscriber,
        ConnectionFactory connectionFactory,
        Topic orders) throws JMSException {
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(true, AUTO_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer(orders);
    consumer.setMessageListener(subscriber::onNext);
    subscriber.add(onUnsubscribe(connection));
    connection.start();
}

private Subscription onUnsubscribe(Connection connection) {
    return Subscriptions.create(() -> {
        try {
            connection.close();
        } catch (Exception e) {
            log.error("Can't close", e);
        }
    });
}

The JMS API provides two ways of receiving messages from a broker: synchronous via blocking receive() method, and nonblocking, using MessageListener. The nonblocking API is beneficial for many reasons; for example, it holds less resources like threads and stack memory. Also it aligns beautifully with the Rx style of programming. Rather than creating a MessageListener instance and calling our subscriber from within it, we can use this terse syntax with method reference:

consumer.setMessageListener(subscriber::onNext)

Also, we must take care of resource cleanup and proper error handling. This tiny transformation layer allows us to easily consume JMS messages without worrying about API internals. Here an example using the popular ActiveMQ messaging broker running locally:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

ConnectionFactory connectionFactory =
    new ActiveMQConnectionFactory("tcp://localhost:61616");
Observable<String> txtMessages =
        observe(connectionFactory, new ActiveMQTopic("orders"))
        .cast(TextMessage.class)
        .flatMap(m -> {
            try {
                return Observable.just(m.getText());
            } catch (JMSException e) {
                return Observable.error(e);
            }
        });

JMS, just like JDBC, has a reputation of heavily using checked JMSException, even when calling getText() on a TextMessage. To properly handle errors (see “Error Handling” for more details) we use flatMap() and wrap exceptions. From that point, you can treat JMS messages flowing in like any other asynchronous and nonblocking stream. And by the way, we used the cast() operator that optimistically casts upstream events to a given type, failing with onError(), otherwise. cast() is basically a specialized map() operator that behaves like map(x -> (TextMessage)x).

Polling Periodically for Changes

The worst blocking API that you can work with requires polling for changes. It provides no mechanism to push changes right at you, even with callbacks or by blocking indefinitely. The only mechanism this API gives is asking for the current state, and it is up to you to figure out if it differs from previous state or not. RxJava has few really powerful operators that you can apply to retrofit a given API to Rx style. The first case I want you to consider is a simple method that delivers a single value that represents state, for example long getOrderBookLength(). To track changes we must call this method frequently enough and capture differences. You can achieve this in RxJava with a very basic operator composition:

Observable
        .interval(10, TimeUnit.MILLISECONDS)
        .map(x -> getOrderBookLength())
        .distinctUntilChanged()

First we produce a synthetic long value every 10 milliseconds which serves as a basic ticking counter. For each such value (that is every 10 milliseconds), we call getOrderBookLength(). However, the aforementioned method doesn’t change that often, and we don’t want to flood our subscribers with lots of irrelevant state changes. Luckily we can simply say distinctUntilChanged() and RxJava will transparently skip long values returned by getOrderBookLength() that have not changed since last invocation, as demonstrated in the following marble diagram:

distinct

We can apply this pattern even further. Imagine that you are watching for filesystem or database table changes. The only mechanism at your disposal is taking a current snapshot of files or database records. You are building an API that will notify clients about every new item. Obviously, you can use java.nio.file.WatchService or database triggers, but take this as an educational example. This time, again, we begin by periodically taking a snapshot of current state:

Observable<Item> observeNewItems() {
    return Observable
            .interval(1, TimeUnit.SECONDS)
            .flatMapIterable(x -> query())
            .distinct();
}

List<Item> query() {
    //take snapshot of file system directory
    //or database table
}

The distinct() operator keeps a record of all items that passed through it (see also “Dropping Duplicates Using distinct() and distinctUntilChanged()”). If the same item appears for the second time, it is simply ignored. That is why we can push the same list of Items every second. The first time they are pushed downstream to all subscribers. However, when the exact same list appears one second later, all items were already seen and are therefore discarded. If at some point in time the list returned from query() contains one extra Item, distinct() will let it go but discard it the next time. This simple pattern allows us to replace a bunch of Thread.sleep() invocations and manual caching with periodic polling. It is applicable in many areas, like File Transfer Protocol (FTP) polling, web scraping, and so on.

Multithreading in RxJava

There are third-party APIs that are blocking and there is simply nothing we can do about it. We might not have source code, rewriting might result in too much risk. In that case, we must learn how to deal with blocking code rather than fighting it.

One of the hallmarks of RxJava is declarative concurrency, as opposed to imperative concurrency. Manually creating and managing threads is a thing of the past (compare with “Thread Pool of Connections”) most of us already use managed thread pools (e.g., with ExecutorService). But RxJava goes one step further: Observable can be nonblocking just like CompletableFuture in Java 8 (see “CompletableFuture and Streams”), but unlike the other, it is also lazy. Unless you subscribe, a well-behaving Observable will not perform any action. But the power of Observable goes even beyond that.

An asynchronous Observable is the one that calls your Subscribers callback methods (like onNext()) from a different thread. Recall “Mastering Observable.create()” in which we explored when subscribe() is blocking, waiting until all notifications arrive? In real life, most Observables come from sources that are asynchronous by their nature. Chapter 5 is entirely devoted to such Observables. But even our simple JMS example from “Replacing Callbacks with Streams”, which uses a built-in, nonblocking API from the JMS specification (MessageListener interface). This is not enforced or suggested by the type system, but many Observables are asynchronous from the very beginning, and you should assume that. A blocking subscribe() method happens very rarely, when a lambda within Observable.create() is not backed by any asynchronous process or stream. However, by default (with create()) everything happens in the client thread (the one that subscribed). If you just poke onNext() directly within your create() callback, no multithreading and concurrency is involved whatsoever.

Encountering such an unusual Observable, we can declaratively select the so-called Scheduler that will be used to emit values. In case of CompletableFuture, we have no control over underlying threads, the API made the decision and in worst case it is impossible to override it. RxJava rarely makes such decisions alone and chooses safe default: client thread and no multithreading involved. For the purposes of this chapter, we will use a really simple logging “library,”4 which will print a message along with the current thread and number of milliseconds since the start of the program using System.currentTimeMillis():

void log(Object label) {
    System.out.println(
        System.currentTimeMillis() - start + "\t| " +
        Thread.currentThread().getName()   + "\t| " +
        label);
}

What Is a Scheduler?

RxJava is concurrency-agnostic, and as a matter of fact it does not introduce concurrency on its own. However, some abstractions to deal with threads are exposed to the end user. Also, certain operators cannot work properly without concurrency; see “Other Uses for Schedulers” for some of them. Luckily, the Scheduler class, the only one you must pay attention to, is fairly simple. In principle it works similarly to ScheduledExecutorService from java.util.concurrent—it executes arbitrary blocks of code, possibly in the future. However, to meet Rx contract, it offers some more fine-grained abstractions, which you can see more of in the advanced section “Scheduler implementation details overview”.

Schedulers are used together with subscribeOn() and observeOn() operators as well as when creating certain types of Observables. A scheduler only creates instances of Workers that are responsible for scheduling and running code. When RxJava needs to schedule some code it first asks Scheduler to provide a Worker and uses the latter to schedule subsequent tasks. You will find examples of this API later on, but first familiarize yourself with available built-in schedulers:

Schedulers.newThread()

This scheduler simply starts a new thread every time it is requested via subscribeOn() or observeOn(). newThread() is hardly ever a good choice, not only because of the latency involved when starting a thread, but also because this thread is not reused. Stack space must be allocated up front (typically around one megabyte, as controlled by the -Xss parameter of the JVM) and the operating system must start new native thread. When the Worker is done, the thread simply terminates. This scheduler can be useful only when tasks are coarse-grained: it takes a lot of time to complete but there are very few of them, so that threads are unlikely to be reused at all. See also: “Thread per Connection”. In practice, following Schedulers.io() is almost always a better choice.

Schedulers.io()

This scheduler is similar to newThread(), but already started threads are recycled and can possibly handle future requests. This implementation works similarly to ThreadPoolExecutor from java.util.concurrent with an unbounded pool of threads. Every time a new Worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused.

The name io() is not a coincidence. Consider using this scheduler for I/O bound tasks which require very little CPU resources. However they tend to take quite some time, waiting for network or disk. Thus, it is a good idea to have a relatively big pool of threads. Still, be careful with unbounded resources of any kind—in case of slow or unresponsive external dependencies like web services, io() scheduler might start an enormous number of threads, leading to your very own application becoming unresponsive, as well. See “Managing Failures with Hystrix” for more details how to tackle this problem.

Schedulers.computation()

You should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code (reading from disk, network, sleeping, waiting for lock, etc.) Because each task executed on this scheduler is supposed to fully utilize one CPU core, executing more such tasks in parallel than there are available cores would not bring much value. Therefore, computation() scheduler by default limits the number of threads running in parallel to the value of availableProcessors(), as found in the Runtime.getRuntime() utility class.

If for some reason you need a different number of threads than the default, you can always use the rx.scheduler.max-computation-threads system property. By taking less threads you ensure that there is always one or more CPU cores idle, and even under heavy load, computation() thread pool does not saturate your server. It is not possible to have more computation threads than cores.

computation() scheduler uses unbounded queue in front of every thread, so if the task is scheduled but all cores are occupied, they are queued. In case of load peak, this scheduler will keep the number of threads limited. However, the queue just before each thread will keep growing.

Luckily, built-in operators, especially observeOn() that we are about to discover in “Declarative Concurrency with observeOn()” ensure that this Scheduler is not overloaded.

Schedulers.from(Executor executor)

Schedulers are internally more complex than Executors from java.util.concurrent, so a separate abstraction was needed. But because they are conceptually quite similar, unsurprisingly there is a wrapper that can turn Executor into Scheduler using the from() factory method:

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import rx.Scheduler;
import rx.schedulers.Schedulers;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

//...

ThreadFactory threadFactory = new ThreadFactoryBuilder()
    .setNameFormat("MyPool-%d")
    .build();
Executor executor = new ThreadPoolExecutor(
    10,  //corePoolSize
    10,  //maximumPoolSize
    0L, TimeUnit.MILLISECONDS, //keepAliveTime, unit
    new LinkedBlockingQueue<>(1000),  //workQueue
    threadFactory
);
Scheduler scheduler = Schedulers.from(executor);

I am intentionally using this verbose syntax for creating ExecutorService rather than the more simple version:

import java.util.concurrent.Executors;

//...

ExecutorService executor = Executors.newFixedThreadPool(10);

Although tempting, the Executors factory class hardcodes several defaults that are impractical or even dangerous in enterprise applications. For examples, it uses unbounded LinkedBlockingQueue that can grow infinitely, resulting in OutOfMemoryError for cases in which there are a of large number of outstanding tasks. Also, the default ThreadFactory uses meaningless thread names like pool-5-thread-3. Naming threads properly is an invaluable tool when profiling or analyzing thread dumps. Implementing ThreadFactory from scratch is a bit cumbersome, so we used ThreadFactoryBuilder from Guava. If you are interested in tuning and properly utilizing thread pools even further, see “Thread Pool of Connections” and “Managing Failures with Hystrix”. Creating schedulers from Executor that we consciously configured is advised for projects dealing with high load. However, because RxJava has no control over independently created threads in an Executor, it cannot pin threads (that is, try to keep work of the same task on the same thread to improve cache locality). This Scheduler barely makes sure a single Scheduler.Worker (see “Scheduler implementation details overview”) processes events sequentially.

Schedulers.immediate()

Schedulers.immediate() is a special scheduler that invokes a task within the client thread in a blocking fashion, rather than asynchronously. Using it is pointless unless some part of your API requires providing a scheduler, whereas you are absolutely fine with default behavior of Observable, not involving any threading at all. In fact, subscribing to an Observable (more on that in a second) via immediate() Scheduler typically has the same effect as not subscribing with any particular scheduler at all. In general, avoid this scheduler, it blocks the calling thread and is of limited use.

Schedulers.trampoline()

The trampoline() scheduler is very similar to immediate() because it also schedules tasks in the same thread, effectively blocking. However, as opposed to immediate(), the upcoming task is executed when all previously scheduled tasks complete. immediate() invokes a given task right away, whereas trampoline() waits for the current task to finish. Trampoline is a pattern in functional programming that allows implementing recursion without infinitely growing the call stack. This is best explained with an example, first involving immediate(). By the way, notice that we do not interact directly with a Scheduler instance but first create a Worker. This makes sense as you will quickly see in “Scheduler implementation details overview”.

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();

log("Main start");
worker.schedule(() -> {
    log(" Outer start");
    sleepOneSecond();
    worker.schedule(() -> {
        log("  Inner start");
        sleepOneSecond();
        log("  Inner end");
    });
    log(" Outer end");
});
log("Main end");
worker.unsubscribe();

The output is as expected; you could actually replace schedule() with a simple method invocation:

1044    | main  | Main start
1094    | main  |  Outer start
2097    | main  |   Inner start
3097    | main  |   Inner end
3100    | main  |  Outer end
3100    | main  | Main end

Inside the Outer block we schedule() Inner block that gets invoked immediately, interrupting the Outer task. When Inner is done, the control goes back to Outer. Again, this is simply a convoluted way of invoking a task in a blocking manner indirectly via immediate() Scheduler. But what happens if we replace Schedulers.immediate() with Schedulers.trampoline()? The output is quite different:

1030    | main  | Main start
1096    | main  |  Outer start
2101    | main  |  Outer end
2101    | main  |   Inner start
3101    | main  |   Inner end
3101    | main  | Main end

Do you see how Outer manages to complete before Inner even starts? This is because the Inner task was queued inside the trampoline() Scheduler, which was already occupied by the Outer task. When Outer finished, the first task from the queue (Inner) began. We can go even further to make sure you understand the difference:

log("Main start");
worker.schedule(() -> {
    log(" Outer start");
    sleepOneSecond();
    worker.schedule(() -> {
        log("  Middle start");
        sleepOneSecond();
        worker.schedule(() -> {
            log("   Inner start");
            sleepOneSecond();
            log("   Inner end");
        });
        log("  Middle end");
    });
    log(" Outer end");
});
log("Main end");

The Worker from immediate() Scheduler outputs the following:

1029    | main  | Main start
1091    | main  |  Outer start
2093    | main  |   Middle start
3095    | main  |    Inner start
4096    | main  |    Inner end
4099    | main  |   Middle end
4099    | main  |  Outer end
4099    | main  | Main end

Versus the trampoline() worker:

1041    | main  | Main start
1095    | main  |  Outer start
2099    | main  |  Outer end
2099    | main  |   Middle start
3101    | main  |   Middle end
3101    | main  |    Inner start
4102    | main  |    Inner end
4102    | main  | Main end
Schedulers.test()

This Scheduler is used only for testing purposes, and you will never see it in production code. Its main advantage is the ability to arbitrarily advance the clock, simulating time passing by. TestScheduler is described to a great extent in “Schedulers in Unit Testing”. Schedulers alone are not very interesting. If you want to discover how they work internally and how to implement your own, check out the next section.

Scheduler implementation details overview

Note

This section is entirely optional, feel free to jump straight to “Declarative Subscription with subscribeOn()” if you are not interested in implementation details.

Scheduler not only decouples tasks and their execution (typically by running them in another thread), but it also abstracts away the clock, as we will learn in “Virtual Time”. The API of the Scheduler is a bit simpler compared to, for example, ScheduledExecutorService:

abstract class Scheduler {
    abstract Worker createWorker();

    long now();

    abstract static class Worker implements Subscription {

        abstract Subscription schedule(Action0 action);

        abstract Subscription schedule(Action0 action,
                             long delayTime, TimeUnit unit);

        long now();
    }
}

When RxJava wants to schedule a task (presumably, but not necessarily in the background), it must first ask for an instance of Worker. It is the Worker that allows scheduling the task without any delay or at some point in time. Both Scheduler and Worker have an overridable source of time (now() method) that it uses to determine when a given task is supposed to run. Naively, you can think of a Scheduler like a thread pool and a Worker like a thread inside that pool.

The separation between Scheduler and Worker is necessary to easily implement some of the guidelines enforced by the Rx contract, namely invoking Subscriber’s method sequentially, not concurrently. Worker’s contract provides just that: two tasks scheduled on the same Worker will never run concurrently. However, independent Workers from the same Scheduler can run tasks concurrently just fine.

Rather than going through the API, let’s analyze the source code of an existing Scheduler, namely HandlerScheduler, as found in the RxAndroid project. This Scheduler simply runs all scheduled tasks on an Android UI thread. Updating the user interface is only allowed from that thread (see “Android Development with RxJava” for more details). This is similar to the Event Dispatch Thread (EDT) as found in Swing, where most of the updates to windows and components must be executed within dedicated thread (EDT). Unsurprisingly, there is also the RxSwing5 project for that.

The code snippet that follows is a stripped down and incomplete class from RxAndroid for education purposes only:

package rx.android.schedulers;

import android.os.Handler;
import android.os.Looper;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.TimeUnit;

public final class SimplifiedHandlerScheduler extends Scheduler {

    @Override
    public Worker createWorker() {
        return new HandlerWorker();
    }

    static class HandlerWorker extends Worker {

        private final Handler handler = new Handler(Looper.getMainLooper());

        @Override
        public void unsubscribe() {
            //Implementation coming soon...
        }

        @Override
        public boolean isUnsubscribed() {
            //Implementation coming soon...
            return false;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }

        @Override
        public Subscription schedule(
        Action0 action, long delayTime, TimeUnit unit) {
            ScheduledAction scheduledAction = new ScheduledAction(action);
            handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

            scheduledAction.add(Subscriptions.create(() ->
                    handler.removeCallbacks(scheduledAction)));

            return scheduledAction;
        }
    }
}

Details of the Android API are not important at the moment. What happens here is that every time we schedule something on a HandlerWorker, the block of code is passed to a special postDelayed() method that executes it on a dedicated Android thread. There is just one such thread, so events are serialized not only within, but also across Workers.

Before we pass action to be executed, we wrap it with ScheduledAction, which implements both Runnable and Subscription. RxJava is lazy whenever it can be—this also applies to scheduling tasks. If for any reason you decide that a given action should not be executed after all (this makes sense when the action was scheduled in the future, not immediately), simply run unsubscribe() on the Subscription returned from schedule(). It is the responsibility of the Worker to properly handle unsubscription (best effort at least).

Client code can also decide to unsubscribe() from Worker in its entirety. This should unsubscribe all queued tasks as well as release the Worker so that the underlying thread can potentially be reused later. The following code snippet enhances the SimplifiedHandlerScheduler by adding Worker unsubscription flow (only modified methods are included):

private CompositeSubscription compositeSubscription =
    new CompositeSubscription();

@Override
public void unsubscribe() {
    compositeSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
    return compositeSubscription.isUnsubscribed();
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    if (compositeSubscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }

    final ScheduledAction scheduledAction = new ScheduledAction(action);
    scheduledAction.addParent(compositeSubscription);
    compositeSubscription.add(scheduledAction);

    handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

    scheduledAction.add(Subscriptions.create(() ->
            handler.removeCallbacks(scheduledAction)));

    return scheduledAction;
}

In “Controlling Listeners by Using Subscription and Subscriber<T>”, we explored the Subscription interface but never really looked at the implementation details. CompositeSubscription is one out of many implementations available that itself is just a container for child Subscriptions (a Composite design pattern). Unsubscribing from CompositeSubscription means unsubscribing from all children. You also can add and remove the children managed by CompositeSubscription.

In our custom Scheduler, CompositeSubscription is used to track all Subscriptions from the previous schedule() invocations (see compositeSubscription.add(scheduledAction)). On the other hand, the child ScheduledAction needs to know about its parent (see: addParent()) so that it can remove itself when the action is completed or canceled. Otherwise, Worker would accumulate stale child Subscriptions forever. When the client code decides that it no longer needs a HandlerWorker instance, it unsubscribes from it. The unsubscription is propagated to all (if any) outstanding child Subscriptions.

That was a very brief introduction to Schedulers in RxJava. The details of their internals are not that useful in daily work; as a matter of fac, they are designed in such as way as to make using RxJava more intuitive and predictable. That being said, let’s quickly see how Schedulers solve many concurrency problems in Rx.

Declarative Subscription with subscribeOn()

In “Mastering Observable.create()” we saw that subscribe() by default uses the client thread. To recap, here is the most simple subscription that you can come up with where no threading was involved whatsoever:

Observable<String> simple() {
    return Observable.create(subscriber -> {
        log("Subscribed");
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onCompleted();
    });
}

//...

log("Starting");
final Observable<String> obs = simple();
log("Created");
final Observable<String> obs2 = obs
        .map(x -> x)
        .filter(x -> true);
log("Transformed");
obs2.subscribe(
        x -> log("Got " + x),
        Throwable::printStackTrace,
        () -> log("Completed")
);
log("Exiting");

Notice where the logging statements are placed and study the output carefully, especially with regard to which thread invoked the print statement:

33  | main  | Starting
120 | main  | Created
128 | main  | Transformed
133 | main  | Subscribed
133 | main  | Got A
133 | main  | Got B
133 | main  | Completed
134 | main  | Exiting

Pay attention: the order of statements is absolutely predictable. First, every line of code in the preceding code snippet runs in the main thread, there are no thread pools and no asynchronous emission of events involved. Second, the order of execution might not be entirely clear at first sight.

When the program starts, it prints Starting, which is understandable. After creating an instance of Observable<String>, we see the Created message. Notice that Subscribed appears later, when we actually subscribe. Without the subscribe() invocation, the block of code inside Observable.create() is never executed. Moreover, even map() and filter() operators do not have any visible side effects, notice how the Transformed message is printed even before Subscribed.

Later, we receive all emitted events and completion notification. Finally, the Exiting statement is printed and the program can return. This is an interesting observation—subscribe() was supposed to be registering a callback when events appear asynchronously. This is the assumption that you should make by default. However in this case there is no threading involved and subscribe() is actually blocking. How is this so?

There is an inherent but hidden connection between subscribe() and create(). Every time you call subscribe() on an Observable, its OnSubscribe callback method is invoked (wrapping the lambda expression you passed to create()). It receives your Subscriber as an argument. By default, this happens in the same thread and is blocking, so whatever you do inside create() will block subscribe(). If your create() method sleeps for few seconds, subscribe() will block. Moreover, if there are operators between Observable.create() and your Subscriber (lambda acting as callback), all these operators are invoked on behalf of the thread that invoked subscribe(). RxJava does not inject any concurrency facilities by default between Observable and Subscriber. The reason behind that is that Observables tend to be backed by other concurrency mechanisms like event loops or custom threads, so Rx lets you take full control rather than imposing any convention.

This observation prepares the landscape for the subscribeOn() operator. By inserting subscribeOn() anywhere between an original Observable and subscribe(), you declaratively select Scheduler where the OnSubscribe callback method will be invoked. No matter what you do inside create(), this work is offloaded to an independent Scheduler and your subscribe() invocation no longer blocks:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .subscribeOn(schedulerA)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");
35  | main  | Starting
112 | main  | Created
123 | main  | Exiting
123 | Sched-A-0 | Subscribed
124 | Sched-A-0 | Got A
124 | Sched-A-0 | Got B
124 | Sched-A-0 | Completed

Do you see how the main thread exits before Observable even begins emitting any values? Technically, the order of log messages is no longer that predictable because two threads are running concurrently: main, which subscribed and wants to exit, and Sched-A-0, which emits events as soon as someone subscribed. The schedulerA as well as Sched-A-0 thread come from the following sample schedulers we built for illustration purposes:

import static java.util.concurrent.Executors.newFixedThreadPool;


ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);

ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);

ExecutorService poolC = newFixedThreadPool(10, threadFactory("Sched-C-%d"));
Scheduler schedulerC = Schedulers.from(poolC);

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
        .setNameFormat(pattern)
        .build();
}

These schedulers will be used across all examples, but they are fairly easy to remember. Three independent schedulers, each managing 10 threads from an ExecutorService. To make the output nicer, each thread pool has a distinct naming pattern.

Before we begin, you must understand that in mature applications, in terms of Rx adoption, subscribeOn() is very seldom used. Normally, Observables come from sources that are naturally asynchronous (like RxNetty, see “Nonblocking HTTP Server with Netty and RxNetty”) or apply scheduling on their own (like Hystrix, see “Managing Failures with Hystrix”). You should treat subscribeOn() only in special cases when the underlying Observable is known to be synchronous (create() being blocking). However, subscribeOn() is still a much better solution than hand-crafted threading within create():

//Don't do this
Observable<String> obs = Observable.create(subscriber -> {
    log("Subscribed");
    Runnable code = () -> {
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onCompleted();
    };
    new Thread(code, "Async").start();
});

The preceding code mixes two concepts: producing events and choosing concurrency strategy. Observable should be responsible only for production logic, whereas it is only the client code that can make judicious decision about concurrency. Remember that Observable is lazy but also immutable, in the sense that subscribeOn() affects only downstream subscribers, if someone subscribes to the exact same Observable without subscribeOn() in between, no concurrency will be involved by default.

Keep in mind that in this chapter our focus is on existing applications and introducing RxJava gradually. The subscribeOn() operator is quite useful in such circumstances; however, after you grasp reactive extensions and begin using them on large scale, the value of subscribeOn() diminishes. In entirely reactive software stacks, as found for example at Netflix , subscribeOn() is almost never used, yet all Observables are asynchronous. Most of the time Observables come from asynchronous sources and they are treated as asynchronous by default. Therefore, using subscribeOn() is very limited, mostly when retrofitting existing APIs or libraries. In Chapter 5, we write write truly asynchronous applications without explicit subscribeOn() and Schedulers altogether.

subscribeOn() Concurrency and Behavior

There are several nuances regarding how subscribeOn() works. First, curious reader should be wondering what happens if two invocations of the subscribeOn() appear between Observable and subscribe(). The answer is simple: subscribeOn() closest to the original Observable wins. This has important practical implications. If you are designing an API and you use subscribeOn() internally, the client code has no way of overriding the Scheduler of your choice. This can be a conscious design decision; after all, the API designer might know best which Scheduler is appropriate. On the other hand, providing an overloaded version of said API that allows overriding the chosen Scheduler is always a good idea.

Let’s study how subscribeOn() behaves:

log("Starting");
Observable<String> obs = simple();
log("Created");
obs
        .subscribeOn(schedulerA)
        //many other operators
        .subscribeOn(schedulerB)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

The output reveals only schedulerA’s threads:

17  | main  | Starting
73  | main  | Created
83  | main  | Exiting
84  | Sched-A-0 | Subscribed
84  | Sched-A-0 | Got A
84  | Sched-A-0 | Got B
84  | Sched-A-0 | Completed

Interestingly, subscribing on schedulerB is not entirely ignored in favor of schedulerA. schedulerB is still used for a short period of time, but it barely schedules new action on schedulerA, which does all the work. Thus, multiple subscribeOn() are not only ignored, but also introduce small overhead.

Speaking of operators, we said that the create() method used when there is a new Subscriber is executed within the provided scheduler (if any). But which thread executes all these transformations happening between create() and subscribe()? We already know that when all operators are executed by default in the same thread (scheduler), no concurrency is involved by default:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(this::log)
        .map(x -> x + '1')
        .doOnNext(this::log)
        .map(x -> x + '2')
        .subscribeOn(schedulerA)
        .doOnNext(this::log)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

We sprinkled the pipeline of operators occasionally with doOnNext() to see which thread is in control at this point. Remember that the position of subscribeOn() is not relevant, it can be right after Observable or just before subscribe(). The output is unsurprising:

20  | main  | Starting
104 | main  | Created
123 | main  | Exiting
124 | Sched-A-0 | Subscribed
124 | Sched-A-0 | A
124 | Sched-A-0 | A1
124 | Sched-A-0 | A12
124 | Sched-A-0 | Got A12
124 | Sched-A-0 | B
124 | Sched-A-0 | B1
124 | Sched-A-0 | B12
125 | Sched-A-0 | Got B12

Watch how create() is invoked and produces A and B events. These events travel sequentially through the scheduler’s thread to finally reach the Subscriber. Many newcomers to RxJava believe that using a Scheduler with a large number of threads will automatically fork processing of events concurrently and somehow join all the results together in the end. This is not the case. RxJava creates a single Worker instance (see: “Scheduler implementation details overview”) for the entire pipeline, mostly to guarantee sequential processing of events.

This means that if one of your operators is particularly slow—for example, map() reading data from disk in order to transform events passing by—this costly operation will be invoked within the same thread. A single broken operator can slow down the entire pipeline, from production to consumption. This is an antipattern in RxJava, operators should be nonblocking, fast, and as pure as possible.

Again, flatMap() comes to the rescue. Rather than blocking within map(), we can invoke flatMap() and asynchronously collect all the results. Therefore, flatMap() and merge() are the operators when we want to achieve true parallelism. But even with flatMap() it is not obvious. Imagine a grocery store (let’s call it “RxGroceries”) that provides an API for purchasing goods:

class RxGroceries {

    Observable<BigDecimal> purchase(String productName, int quantity) {
        return Observable.fromCallable(() ->
            doPurchase(productName, quantity));
    }

    BigDecimal doPurchase(String productName, int quantity) {
        log("Purchasing " + quantity + " " + productName);
        //real logic here
        log("Done " + quantity + " " + productName);
        return priceForProduct;
    }

}

Obviously, the implementation of doPurchase() is irrelevant here, just imagine it takes some time and resources to complete. We simulate business logic by adding artificial sleep of one second, slightly higher if quantity is bigger. Blocking Observables like the one returned from purchase() are unusual in a real application, but let’s keep it this way for educational purposes. When purchasing several goods we would like to parallelize as much as possible and calculate total price for all goods in the end. The first attempt is fruitless:

Observable<BigDecimal> totalPrice = Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .subscribeOn(schedulerA)  //BROKEN!!!
        .map(prod -> rxGroceries.doPurchase(prod, 1))
        .reduce(BigDecimal::add)
        .single();

The result is correct, it is an Observable with just a single value: total price, calculated using reduce(). For each product, we invoke doPurchase() with quantity one. However, despite using schedulerA backed by a thread pool of 10, the code is entirely sequential:

144  | Sched-A-0 | Purchasing 1 bread
1144 | Sched-A-0 | Done 1 bread
1146 | Sched-A-0 | Purchasing 1 butter
2146 | Sched-A-0 | Done 1 butter
2146 | Sched-A-0 | Purchasing 1 milk
3147 | Sched-A-0 | Done 1 milk
3147 | Sched-A-0 | Purchasing 1 tomato
4147 | Sched-A-0 | Done 1 tomato
4147 | Sched-A-0 | Purchasing 1 cheese
5148 | Sched-A-0 | Done 1 cheese

Notice how each product blocks subsequent ones from processing. When the purchase of bread is done, butter begins immediately, but not earlier. Strangely, even replacing map() with flatMap() does not help, and the output is exactly the same:

Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .subscribeOn(schedulerA)
        .flatMap(prod -> rxGroceries.purchase(prod, 1))
        .reduce(BigDecimal::add)
        .single();

The code does not work concurrently because there is just a single flow of events, which by design must run sequentially. Otherwise, your Subscriber would need to be aware of concurrent notifications (onNext(), onComplete(), etc.), so it is a fair compromise. Luckily, the idiomatic solution is very close. The main Observable emitting products cannot be parallelized. However, for each product, we create a new, independent Observable as returned from purchase(). Because they are independent, we can safely schedule each one of them concurrently:

Observable<BigDecimal> totalPrice = Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .flatMap(prod ->
                rxGroceries
                        .purchase(prod, 1)
                        .subscribeOn(schedulerA))
        .reduce(BigDecimal::add)
        .single();

Can you spot where subscribeOn() is? The main Observable is not really doing anything, so a special thread pool is unnecessary. However each substream created within flatMap() is supplied with a schedulerA. Every time subscribeOn() is used to the Scheduler gets a chance to return a new Worker, and therefore a separate thread (simplifying a bit):

113  | Sched-A-1 | Purchasing 1 butter
114  | Sched-A-0 | Purchasing 1 bread
125  | Sched-A-2 | Purchasing 1 milk
125  | Sched-A-3 | Purchasing 1 tomato
126  | Sched-A-4 | Purchasing 1 cheese
1126 | Sched-A-2 | Done 1 milk
1126 | Sched-A-0 | Done 1 bread
1126 | Sched-A-1 | Done 1 butter
1128 | Sched-A-3 | Done 1 tomato
1128 | Sched-A-4 | Done 1 cheese

Finally, we achieved true concurrency. Each purchase operation now begins at the same time and they all eventually finish. The flatMap() operator is carefully designed and implemented so that it collects all events from all independent streams and pushes them downstream sequentially. However, as we already learned in “Order of Events After flatMap()”, we can no longer rely on the order of downstream events—they neither begin nor complete in the same order as they were emitted (the original sequence began at bread). When events reach the reduce() operator, they are already sequential and well behaving.

By now, you should slowly move away from the classic Thread model and understand how Schedulers work. But if you find it difficult, here is a simple analogy:

  • Observable without any Scheduler works like a single-threaded program with blocking method calls passing data between one another.

  • Observable with a single subscribeOn() is like starting a big task in the background Thread. The program within that Thread is still sequential, but at least it runs in the background.

  • Observable using flatMap() where each internal Observable has subscribeOn() works like ForkJoinPool from java.util.concurrent, where each substream is a fork of execution and flatMap() is a safe join stage.

Of course, the preceding tips only apply to blocking Observables, which are rarely seen in real applications. If your underlying Observables are already asynchronous, achieving concurrency is a matter of understanding how they are combined and when subscription occurs. For example, merge() on two streams will subscribe to both of them concurrently, whereas the concat() operator waits until the first stream finishes before it subscribes to the second one.

Batching Requests Using groupBy()

Did you notice that RxGroceries.purchase() takes productName and quantity even though the quantity was always one? What if our grocery list had some products multiple times, indicating bigger demand? The first naive implementation simply sends the same request—for example, for egg, multiple times, each time asking for one. Fortunately, we can declaratively batch such requests by using groupBy()—and this still works with declarative concurrency:

import org.apache.commons.lang3.tuple.Pair;

Observable<BigDecimal> totalPrice = Observable
    .just("bread", "butter", "egg", "milk", "tomato",
      "cheese", "tomato", "egg", "egg")
    .groupBy(prod -> prod)
    .flatMap(grouped -> grouped
        .count()
        .map(quantity -> {
            String productName = grouped.getKey();
            return Pair.of(productName, quantity);
        }))
    .flatMap(order -> store
        .purchase(order.getKey(), order.getValue())
        .subscribeOn(schedulerA))
    .reduce(BigDecimal::add)
    .single();

This code is quite complex, so before revealing the output, let’s quickly go through it. First, we group products simply by their name, thus identity function prod -> prod. In return we get an awkward Observable<GroupedObservable<String, String>>. There is nothing wrong with that. Next, flatMap() receives each GroupedObservable<String, String>, representing all products of the same name. So, for example, there will be an ["egg", "egg", "egg"] Observable there with a key "egg", as well. If groupBy() used a different key function, like prod.length(), the same sequence would have a key 3.

At this point, within flatMap() we need to construct an Observable of type Pair<String, Integer> which represents every unique product and its quantity. Both count() and map() return an Observable, so everything lines up perfectly. Second flatMap() receives order of type Pair<String, Integer> and makes a purchase, this time the quantity can be bigger. The output looks perfect; notice that bigger orders are slightly slower, but still it is much faster than having several repeated requests:

164  | Sched-A-0 | Purchasing 1 bread
165  | Sched-A-1 | Purchasing 1 butter
166  | Sched-A-2 | Purchasing 3 egg
166  | Sched-A-3 | Purchasing 1 milk
166  | Sched-A-4 | Purchasing 2 tomato
166  | Sched-A-5 | Purchasing 1 cheese
1151 | Sched-A-0 | Done 1 bread
1178 | Sched-A-1 | Done 1 butter
1180 | Sched-A-5 | Done 1 cheese
1183 | Sched-A-3 | Done 1 milk
1253 | Sched-A-4 | Done 2 tomato
1354 | Sched-A-2 | Done 3 egg

If you believe that your system can benefit from batching this way or the other, check out “Batching and Collapsing Commands”.

Declarative Concurrency with observeOn()

Believe it or not, concurrency in RxJava can be described by two operators: the aformentioned subscribeOn() and observeOn(). They seem very similar and are confusing to newcomers, but their semantics are actually quite clear and reasonable.

subscribeOn() allows choosing which Scheduler will be used to invoke OnSubscribe (lambda expression inside create()). Therefore, any code inside create() is pushed to a different thread—for example, to avoid blocking the main thread. Conversely, observeOn() controls which Scheduler is used to invoke downstream Subscribers occurring after observeOn(). For example, calling create() happens in the io() Scheduler (via subscribeOn(io())) to avoid blocking the user interface. However, updating the user interface widgets must happen in the UI thread (both Swing and Android have this constraint), so we use observeOn() for example with AndroidSchedulers.mainThread() before operators or subscribers changing UI. This way we can use one Scheduler to handle create() and all operators up to the first observeOn(), but other(s) to apply transformations. This is best explained with an example:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(x -> log("Found 1: " + x))
        .observeOn(schedulerA)
        .doOnNext(x -> log("Found 2: " + x))
        .subscribe(
                x -> log("Got 1: " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

observeOn() occurs somewhere in the pipeline chain, and this time, as opposed to subscribeOn(), the position of observeOn() is quite important. No matter what Scheduler was running operators above observeOn() (if any), everything below uses the supplied Scheduler. In this example, there is no subscribeOn(), so the default is applied (no concurrency):

23  | main  | Starting
136 | main  | Created
163 | main  | Subscribed
163 | main  | Found 1: A
163 | main  | Found 1: B
163 | main  | Exiting
163 | Sched-A-0 | Found 2: A
164 | Sched-A-0 | Got 1: A
164 | Sched-A-0 | Found 2: B
164 | Sched-A-0 | Got 1: B
164 | Sched-A-0 | Completed

All of the operators above observeOn are executed within client thread, which happens to be the default in RxJava. But below observeOn(), the operators are executed within the supplied Scheduler. This will become even more obvious when both subscribeOn() and multiple observeOn() occur within the pipeline:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(x -> log("Found 1: " + x))
        .observeOn(schedulerB)
        .doOnNext(x -> log("Found 2: " + x))
        .observeOn(schedulerC)
        .doOnNext(x -> log("Found 3: " + x))
        .subscribeOn(schedulerA)
        .subscribe(
                x -> log("Got 1: " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

Can you predict the output? Remember, everything below observeOn() is run within the supplied Scheduler, of course until another observeOn() is encountered. Additionally subscribeOn() can occur anywhere between Observable and subscribe(), but this time it only affects operators down to the first observeOn():

21  | main  | Starting
98  | main  | Created
108 | main  | Exiting
129 | Sched-A-0 | Subscribed
129 | Sched-A-0 | Found 1: A
129 | Sched-A-0 | Found 1: B
130 | Sched-B-0 | Found 2: A
130 | Sched-B-0 | Found 2: B
130 | Sched-C-0 | Found 3: A
130 | Sched-C-0 | Got: A
130 | Sched-C-0 | Found 3: B
130 | Sched-C-0 | Got: B
130 | Sched-C-0 | Completed

Subscription occurs in schedulerA because that is what we specified in subscribeOn(). Also "Found 1" operator was executed within that Scheduler because it is before the first observeOn(). Later, the situation becomes more interesting. observeOn() switches current Scheduler to schedulerB, and "Found 2" is using this one, instead. The last observeOn(schedulerC) affects both "Found 3" operator as well as Subscriber. Remember that Subscriber works within the context of the last encountered Scheduler.

subscribeOn() and observeOn() work really well together when you want to physically decouple producer (Observable.create()) and consumer (Subscriber). By default, there is no such decoupling, and RxJava simply uses the same thread. subscribeOn() only is not enough, we simply choose a different thread. observeOn() is better, but then we block the client thread in case of synchronous Observables. Because most of the operators are nonblocking and lambda expressions used inside them tend to be short and cheap, typically there is just one subscribeOn() and observeOn() in the pipeline of operators. subscribeOn() can be placed close to the original Observable to improve readability, whereas observeOn() is close to subscribe() so that only Subscriber uses that special Scheduler, other operators rely on the Scheduler from subscribeOn().

Here is a more advanced program that takes advantage of these two operators:

log("Starting");
Observable<String> obs = Observable.create(subscriber -> {
    log("Subscribed");
    subscriber.onNext("A");
    subscriber.onNext("B");
    subscriber.onNext("C");
    subscriber.onNext("D");
    subscriber.onCompleted();
});
log("Created");
obs
    .subscribeOn(schedulerA)
    .flatMap(record -> store(record).subscribeOn(schedulerB))
    .observeOn(schedulerC)
    .subscribe(
            x -> log("Got: " + x),
            Throwable::printStackTrace,
            () -> log("Completed")
    );
log("Exiting");

Where store() is a simple nested operation:

Observable<UUID> store(String s) {
    return Observable.create(subscriber -> {
        log("Storing " + s);
        //hard work
        subscriber.onNext(UUID.randomUUID());
        subscriber.onCompleted();
    });
}

The production of events occurs in schedulerA, but each event is processed independently using schedulerB to improve concurrency, a technique we learned in “subscribeOn() Concurrency and Behavior”. The subscription in the end happens in yet another schedulerC. We are pretty sure you understand by now which Scheduler/thread will execute which action, but just in case (empty lines added for clarity):

26   | main  | Starting
93   | main  | Created
121  | main  | Exiting

122  | Sched-A-0 | Subscribed
124  | Sched-B-0 | Storing A
124  | Sched-B-1 | Storing B
124  | Sched-B-2 | Storing C
124  | Sched-B-3 | Storing D

1136 | Sched-C-1 | Got: 44b8b999-e687-485f-b17a-a11f6a4bb9ce
1136 | Sched-C-1 | Got: 532ed720-eb35-4764-844e-690327ac4fe8
1136 | Sched-C-1 | Got: 13ddf253-c720-48fa-b248-4737579a2c2a
1136 | Sched-C-1 | Got: 0eced01d-3fa7-45ec-96fb-572ff1e33587
1137 | Sched-C-1 | Completed

observeOn() is especially important for applications with a UI for which we do not want to block the UI event-dispatching thread. On Android (see “Android Development with RxJava”) or Swing, some actions like updating the UI must be executed within a specific thread. But doing too much in that thread renders your UI unresponsive. In these cases, you put observeOn() close to subscribe() so that code within the subscription is invoked within the context of a particular Scheduler (like UI-thread). However, other transformations, even rather cheap, should be executed outside UI thread. On the server, observeOn() is seldom used because the true source of concurrency is built into most Observables. This leads to an interesting conclusion: RxJava controls concurrency with just two operators (subscribeOn() and observeOn()), but the more you use reactive extensions, the less frequently you will see these in production code.

Other Uses for Schedulers

There are numerous operators that by default use some Scheduler. Typically, Schedulers.computation() is used if none is supplied—JavaDoc always makes it clear. For example, the delay() operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler:

Observable
        .just('A', 'B')
        .delay(1, SECONDS, schedulerA)
        .subscribe(this::log);

Without supplying a custom schedulerA, all operators below delay() would use the computation() Scheduler. There is nothing inherently wrong with that; however, if your Subscriber is blocked on I/O it would consume one Worker from globally shared computation() scheduler, possibly affecting the entire system. Other important operators that support custom Scheduler are: interval(), range(), timer(), repeat(), skip(), take(), timeout(), and several others that have yet to be introduced. If you do not provide a scheduler to such operators, computation() Scheduler is utilized, which is a safe default in most cases.

Mastering schedulers is essential to writing scalable and safe code using RxJava. The difference between subscribeOn() and observeOn() is especially important under high load where every task must be executed precisely when we expect. In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Schedulers are needed. But there is always this one API or dependency that requires blocking code.

Last but not least, we must be sure that Schedulers used downstream can keep up with the load generated by Schedulers upstream. But this danger will be explained in great detail in Chapter 6.

Summary

This chapter described several patterns in traditional applications that can be replaced with RxJava. I hope you understand by now that high-frequency trading or streaming posts from social media are not the only use cases for RxJava. As a matter of fact, almost any API can be seamlessly replaced with Observable. Even if you don’t want or need the power of reactive extensions at the moment, it will allow you to evolve implementation without introducing backward-incompatible changes. Moreover, it is the client that eventually harvests all the possibilities offered by RxJava, like laziness, declarative concurrency, or asynchronous chaining. Even better, because of seamless conversion from Observable to BlockingObservable, traditional clients can consume your API as they want, and you can always provide a simple bridge layer.

You should be fairly confident with RxJava and understand the benefits of applying it even in legacy systems. Undoubtedly, working with reactive Observables is more challenging and has a somewhat steep learning curve. But the advantages and possibilities of growth simply can’t be exaggerated. Imagine if we could write entire applications using reactive extensions, from top to bottom? Like a greenfield project for which we have control over every API, interface, and external system. Chapter 5 will discuss how you can write such an application and what the implications are.

1 In fact, RxJava tries to stay on the same thread via thread affinity in the event loop model to take advantage of this, as well.

2 See also “Bulkhead Pattern and Fail-Fast”

3 Compare it to lazy evaluation of expressions in Haskell.

4 Obviously, for any real project, you will use a production-grade logging system like Logback or Log4J 2.

5 https://github.com/ReactiveX/RxSwing

Get Reactive Programming with RxJava now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.