Kapitel 4. Anwendung der reaktiven Programmierung auf bestehende Anwendungen

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Die Einführung einer neuen Bibliothek, Technologie oder eines neuen Paradigmas in eine Anwendung, sei es auf der grünen Wiese oder in einer Legacy-Codebasis, muss eine sorgfältige Entscheidung sein. RxJava ist da keine Ausnahme. In diesem Kapitel gehen wir einige Muster und Architekturen durch, die in gewöhnlichen Java-Anwendungen zu finden sind, und sehen, wie Rx helfen kann. Dieser Prozess ist nicht einfach und erfordert einen erheblichen Bewusstseinswandel, deshalb werden wir vorsichtig vom imperativen zum funktionalen und reaktiven Stil übergehen. Viele Bibliotheken in Java-Projekten fügen heutzutage einfach nur Ballast hinzu, ohne eine Gegenleistung zu erbringen. Du wirst jedoch sehen, wie RxJava nicht nur traditionelle Projekte vereinfacht, sondern auch welche Vorteile es für Legacy-Plattformen bringt.

Ich bin mir ziemlich sicher, dass du bereits von RxJava begeistert bist. Die eingebauten Operatoren und die Einfachheit machen Rx zu einem erstaunlich mächtigen Werkzeug für die Umwandlung von Ereignisströmen. Wenn du jedoch morgen zurück in dein Büro gehst, wirst du feststellen, dass es keine Streams, keine Echtzeit-Ereignisse von der Börse gibt. Du kannst kaum Ereignisse in deinen Anwendungen finden; es ist nur ein Mischmasch aus Webanfragen, Datenbanken und externen APIs. Du bist so erpicht darauf, dieses neue RxJava-Ding irgendwo jenseits von Hello world auszuprobieren. Doch es scheint, als gäbe es im wirklichen Leben einfach keine Anwendungsfälle, die den Einsatz von Rx rechtfertigen. Dabei kann RxJava ein bedeutender Schritt nach vorne sein, wenn es um architektonische Konsistenz und Robustheit geht. Du musst dich nicht von vorne bis hinten auf einen reaktiven Stil festlegen - das ist zu riskant und erfordert am Anfang zu viel Arbeit. Aber Rx kann auf jeder Schicht eingeführt werden, ohne dass eine Anwendung als Ganzes kaputt geht.

Wir führen dich durch einige gängige Anwendungsmuster und zeigen dir, wie du sie mit RxJava auf nicht-invasive Weise erweitern kannst. Der Schwerpunkt liegt dabei auf Datenbankabfragen, Caching, Fehlerbehandlung und periodischen Aufgaben. Je mehr RxJava du an verschiedenen Stellen deines Stacks einbaust, desto konsistenter wird deine Architektur.

Von Sammlungen zu beobachtbaren Objekten

Wenn deine Plattform nicht gerade in JVM-Frameworks wie Play, Akka Actors oder vielleicht Vert.x entwickelt wurde, befindest du dich wahrscheinlich auf einem Stack mit einem Servlet-Container auf der einen und JDBC oder Webservices auf der anderen Seite. Dazwischen gibt es eine unterschiedliche Anzahl von Schichten, die Geschäftslogik implementieren, die wir nicht alle auf einmal refaktorisieren werden; stattdessen wollen wir mit einem einfachen Beispiel beginnen. Die folgende Klasse stellt ein triviales Repository dar, das von einer Datenbank abstrahiert:

class PersonDao {

    List<Person> listPeople() {
        return query("SELECT * FROM PEOPLE");
    }

    private List<Person> query(String sql) {
        //...
    }

}

Abgesehen von den Implementierungsdetails, was hat das mit Rx zu tun? Bisher haben wir über asynchrone Ereignisse gesprochen, die von vorgelagerten Systemen gepusht werden oder bestenfalls, wenn jemand ein Abonnement abschließt. Was hat dieses banale Dao damit zu tun?Observable ist nicht nur eine Pipe, die Ereignisse nach unten schiebt. Du kannst Observable<T> als eine Datenstruktur betrachten, die mit Iterable<T> identisch ist. Beide enthalten Elemente des Typs T, bieten aber eine völlig andere Schnittstelle. Es sollte also nicht überraschen, dass du das eine einfach durch das andere ersetzen kannst:

Observable<Person> listPeople() {
    final List<Person> people = query("SELECT * FROM PEOPLE");
    return Observable.from(people);
}

An diesem Punkt haben wir die bestehende API verändert. Je nachdem, wie groß dein System ist, kann eine solche Inkompatibilität ein großes Problem darstellen. Deshalb ist es wichtig, RxJava so schnell wie möglich in deine API zu integrieren. Da wir mit einer bestehenden Anwendung arbeiten, kann das natürlich nicht der Fall sein.

BlockingObservable: Der Ausstieg aus der reaktiven Welt

Wenn du RxJava mit bestehendem, blockierendem und imperativem Code kombinierst, musst du Observable vielleicht in eine einfache Sammlung umwandeln. Diese Umwandlung ist ziemlich unangenehm, denn sie erfordert das Blockieren von Observable und das Warten auf dessen Fertigstellung. Solange Observable nicht fertiggestellt ist, können wir keine Sammlung erstellen.BlockingObservable ist ein spezieller Typ, der die Arbeit mit Observable in einer nicht-reaktiven Umgebung erleichtert.BlockingObservable sollte deine letzte Wahl sein, wenn du mit RxJava arbeitest, aber es ist unvermeidlich, wenn du blockierenden und nicht-blockierenden Code kombinierst.

In Kapitel 3 haben wir die Methode listPeople() so umgestaltet, dass sie Observable<People> statt List zurückgibt.Observable ist kein Iterable in irgendeinem Sinne, also lässt sich unser Code nicht mehr kompilieren. Wir wollen lieber in kleinen Schritten vorgehen, als massiv umzustrukturieren, also halten wir den Umfang der Änderungen so gering wie möglich. Der Client-Code könnte so aussehen:

List<Person> people = pesonDao.listPeople();
String json = marshal(people);

Wir können uns vorstellen, dass die Methode marshal() Daten aus der Sammlung peopleabruft und sie in JSON serialisiert. Das ist jetzt nicht mehr der Fall, wir können nicht einfach Daten von Observable abrufen, wenn wir wollen.Observable ist dafür zuständig, Daten zu produzieren(zu pushen) und die Abonnenten zu benachrichtigen, wenn es welche gibt. Diese radikale Änderung kann mit BlockingObservable leicht umgangen werden. Diese praktische Klasse ist völlig unabhängig von Observable und kann über die Methode Observable.toBlocking() erreicht werden. Die blockierende Variante von Observable hat oberflächlich betrachtet ähnliche Methoden wie single() oder subscribe(). BlockingObservable ist jedoch in blockierenden Umgebungen, die von Natur aus nicht auf die asynchrone Natur von Observable vorbereitet sind, viel praktischer. Die Operatoren von BlockingObservable blockieren (warten) in der Regel, bis die zugrunde liegende Observable abgeschlossen ist. Dies widerspricht dem Hauptkonzept von Observables, dass alles asynchron, träge und im laufenden Betrieb verarbeitet wird. Observable.forEach() empfängt zum Beispiel asynchron Ereignisse von Observable, sobald sie eintreffen, während BlockingObservable.forEach() blockiert, bis alle Ereignisse verarbeitet und der Stream abgeschlossen ist. Auch Ausnahmen werden nicht mehr als Werte (Ereignisse) weitergegeben, sondern im aufrufenden Thread wieder verworfen.

In unserem Fall wollen wir Observable<Person> zurück in List<Person> umwandeln, um den Umfang des Refactorings zu begrenzen:

Observable<Person> peopleStream = personDao.listPeople();
Observable<List<Person>> peopleList = peopleStream.toList();
BlockingObservable<List<Person>> peopleBlocking = peopleList.toBlocking();
List<Person> people = peopleBlocking.single();

Ich habe absichtlich alle Zwischentypen explizit gelassen, um zu erklären, was passiert. Nach dem Refactoring zu Rx gibt unsere APIObservable<Person> peopleStream zurück. Dieser Stream kann potenziell vollständig reaktiv, asynchron und ereignisgesteuert sein, was überhaupt nicht zu dem passt, was wir brauchen: ein statisches List. In einem ersten Schritt verwandeln wir Observable<Person>in Observable<List<Person>>. Dieser faule Operator puffert allePerson Ereignisse und hält sie im Speicher, bis das onCompleted() Ereignis eintrifft. Zu diesem Zeitpunkt wird ein einzelnes Ereignis vom Typ List<Person> ausgesendet, das alle gesehenen Ereignisse auf einmal enthält, wie im folgenden Marmordiagramm dargestellt:

image

Der resultierende Stream wird sofort nach der Ausgabe eines einzigen List Elements abgeschlossen. Auch dieser Operator ist asynchron; er wartet nicht darauf, dass alle Ereignisse eintreffen, sondern puffert stattdessen faul alle Werte. Das umständlich aussehende Observable<List<Person>> peopleList wird dann inBlockingObservable<List<Person>> peopleBlocking umgewandelt. BlockingObservable ist nur dann eine gute Idee, wenn du eine blockierende, statische Ansicht deines ansonsten asynchronen Observable bereitstellen musst. Während Observable.from(List<T>) die normale Pull-basierte Sammlung in Observable umwandelt, macht toBlocking() genau das Gegenteil. Du fragst dich vielleicht, warum wir zwei Abstraktionen für blockierende und nicht blockierende Operatoren brauchen. Die Autoren von RxJava haben herausgefunden, dass die explizite Angabe der synchronen oder asynchronen Natur des zugrunde liegenden Operators zu wichtig ist, um sie JavaDoc zu überlassen. Mit zwei voneinander unabhängigen Typen ist sichergestellt, dass du immer mit der richtigen Datenstruktur arbeitest. Außerdem ist BlockingObservable dein letztes Mittel; normalerweise solltest du Observableso lange wie möglich zusammensetzen und verketten. Für diese Übung wollen wir jedoch gleich von Observable wegkommen. Der letzte Operator single() lässt die Observablen ganz weg und extrahiert ein, und nur ein einziges Element, das wir vonBlockingObservable<T> erwarten. Ein ähnlicher Operator, first(), gibt einen Wert von T zurück und verwirft alles, was er übrig hat. single() Der Operator Observable hingegen stellt sicher, dass es keine weiteren ausstehenden Ereignisse im zugrunde liegenden gibt, bevor er beendet wird. Das bedeutet, dass single() blockiert und auf den Rückruf von onCompleted() wartet. Hier ist derselbe Codeschnipsel wie zuvor, diesmal mit allen Operatoren verkettet:

List<Person> people = personDao
    .listPeople()
    .toList()
    .toBlocking()
    .single();

Du denkst vielleicht, dass wir uns die Mühe gemacht haben, Observable ohne ersichtlichen Grund ein- und auszuwickeln. Erinnere dich daran, dass dies nur der erste Schritt war. Die nächste Umwandlung wird etwas Faulheit einführen. Unser Code, so wie er jetzt steht, führt immer query("...") aus und wickelt es mit Observable ein. Wie du inzwischen weißt, sind Observable(vor allem kalte) per Definition faul. Solange sich niemand anmeldet, stellen sie nur einen Stream dar, der nie die Chance hatte, Werte zu senden. Die meiste Zeit kannst du Methoden aufrufen, die Observable zurückgeben, und solange du dich nicht anmeldest, wird keine Arbeit geleistet. Observable ist wie Future, weil er einen Wert für die Zukunft verspricht. Aber solange du ihn nicht anforderst, wird ein kalter Observable nicht einmal anfangen, Werte zu senden. So gesehen ist Observable eher mit java.util.function.Supplier<T> vergleichbar, das bei Bedarf Werte vom Typ T erzeugt. Hot Observables sind anders, weil sie Werte ausgeben, egal ob du zuhörst oder nicht, aber die betrachten wir jetzt nicht. Die bloße Existenz von Observable deutet nicht auf einen Hintergrundauftrag oder irgendeinen Nebeneffekt hin, im Gegensatz zu Future, das fast immer auf einen gleichzeitig laufenden Vorgang hindeutet.

Umarmung der Faulheit

Wie machen wir also unsere Observable faul? Die einfachste Methode ist, einen eager Observable mit defer() zu umhüllen:

public Observable<Person> listPeople() {
    return Observable.defer(() ->
        Observable.from(query("SELECT * FROM PEOPLE")));
}

Observable.defer() nimmt einen Lambda-Ausdruck (eine Factory), der Observable erzeugen kann. Observable ist eifrig, also wollen wir seine Erzeugung aufschieben. defer() wartet bis zum letztmöglichen Moment, um Observable tatsächlich zu erzeugen, d.h. bis sich jemand tatsächlich anmeldet. Das hat einige interessante Auswirkungen. DaObservable "lazy" ist, hat der Aufruf von listPeople() keine Nebeneffekte und fast keine Leistungseinbußen. Es wird noch keine Datenbank abgefragt. Du kannst Observable<Person> wie ein Versprechen behandeln, ohne dass eine Hintergrundverarbeitung stattfindet. Beachte, dass es im Moment kein asynchrones Verhalten gibt, sondern nur eine faule Auswertung. Das ist vergleichbar mit der Art und Weise, wie Werte in der Programmiersprache Haskell nur dann ausgewertet werden, wenn sie unbedingt benötigt werden.

Wenn du noch nie in funktionalen Sprachen programmiert hast, bist du vielleicht verwirrt, warum Faulheit so wichtig und bahnbrechend ist. Es stellt sich heraus, dass ein solches Verhalten sehr nützlich ist und die Qualität und Freiheit deiner Implementierung erheblich verbessern kann. Du musst dich zum Beispiel nicht mehr darum kümmern, welche Ressourcen wann und in welcher Reihenfolge abgerufen werden. RxJava lädt sie nur, wenn sie unbedingt benötigt werden.

Nimm als Beispiel diesen trivialen Ausweichmechanismus, den wir alle schon so oft gesehen haben:

void bestBookFor(Person person) {
    Book book;
    try {
        book = recommend(person);
    } catch (Exception e) {
        book = bestSeller();
    }
    display(book.getTitle());
}

void display(String title) {
    //...
}

Du denkst wahrscheinlich, dass an einem solchen Konstrukt nichts auszusetzen ist. In diesem Beispiel versuchen wir, das beste Buch für eine bestimmte Person zu empfehlen, aber wenn es nicht klappt, werden wir gnädig und zeigen den Bestseller an. Die Annahme ist, dass das Abrufen eines Bestsellers schneller ist und im Cache gespeichert werden kann. Aber was wäre, wenn du die Fehlerbehandlung deklarativ hinzufügen könntest, damit try-catch Blöcke nicht die eigentliche Logik verdecken?

void bestBookFor(Person person) {
    Observable<Book> recommended = recommend(person);
    Observable<Book> bestSeller = bestSeller();
    Observable<Book> book = recommended.onErrorResumeNext(bestSeller);
    Observable<String> title = book.map(Book::getTitle);
    title.subscribe(this::display);
}

Da wir RxJava bisher nur erforschen, habe ich all diese Zwischenwerte und Typen belassen. Im echten Leben würde bestBookFor() eher so aussehen:

void bestBookFor(Person person) {
    recommend(person)
            .onErrorResumeNext(bestSeller())
            .map(Book::getTitle)
            .subscribe(this::display);
}

Dieser Code ist schön übersichtlich und lesbar. Finde zuerst eine Empfehlung für person. Im Falle eines Fehlers (onErrorResumeNext), fahre mit einem Bestseller fort. Egal, welcher Erfolg eintrat, map gibt einen Wert zurück, indem es den Titel extrahiert und ihn dann anzeigt. onErrorResumeNext() ist ein mächtiger Operator, der Ausnahmen, die im Vorfeld passieren, abfängt, sie schluckt und eine bereitgestellte Sicherung Observable abonniert. Auf diese Weise implementiert Rx eine try-catch Klausel. Wir werden später in diesem Buch viel mehr Zeit auf die Fehlerbehandlung verwenden (siehe "Deklarative try-catch-Ersetzung"). In der Zwischenzeit können wir bestSeller() aufrufen, ohne uns Sorgen machen zu müssen, dass der Abruf des Bestsellers auch dann erfolgt, wenn eine echte Empfehlung gut gelaufen ist.

Observables zusammenstellen

SELECT * FROM PEOPLE ist nicht wirklich eine moderne SQL-Abfrage. Erstens solltest du nicht blind alle Spalten abrufen, aber noch schädlicher ist es, alle Zeilen abzurufen. Unsere alte API ist nicht in der Lage, Ergebnisse zu paginieren und nur eine Teilmenge einer Tabelle anzuzeigen. In einer traditionellen Unternehmensanwendung könnte das so aussehen:

List<Person> listPeople(int page) {
    return query(
            "SELECT * FROM PEOPLE ORDER BY id LIMIT ? OFFSET ?",
            PAGE_SIZE,
            page * PAGE_SIZE
    );
}

Dies ist kein SQL-Buch, also lassen wir die Implementierungsdetails beiseite. Der Autor dieser API war gnadenlos: Wir haben nicht die Freiheit, einen beliebigen Bereich von Datensätzen zu wählen, sondern können nur mit 0-basierten Seitenzahlen arbeiten. In RxJava können wir jedoch aus Faulheit das Lesen einer ganzen Datenbank ab einer bestimmten Seite simulieren:

import static rx.Observable.defer;
import static rx.Observable.from;


Observable<Person> allPeople(int initialPage) {
    return defer(() -> from(listPeople(initialPage)))
            .concatWith(defer(() ->
                    allPeople(initialPage + 1)));
}

Dieses Codeschnipsel lädt die erste Seite der Datenbankeinträge, z.B. 10 Einträge, nach und nach. Wenn sich niemand anmeldet, wird auch diese erste Abfrage nicht aufgerufen. Wenn es einen Abonnenten gibt, der nur ein paar anfängliche Elemente konsumiert (z. B. allPeople(0).take(3)), meldet sich RxJava automatisch von unserem Stream ab und es werden keine weiteren Abfragen ausgeführt. Was passiert also, wenn wir z. B. 11 Elemente abfragen, der erste Aufruf von listPeople() aber nur 10 zurückgegeben hat? Nun, RxJava findet heraus, dass die ursprüngliche Observableerschöpft ist, aber der Konsument immer noch hungrig ist. Zum Glück gibt es den OperatorconcatWith(), der im Grunde genommen besagt: Wenn Observable auf der linken Seite abgeschlossen ist, abonniere Observable auf der rechten Seite, anstatt eine Fertigstellungsmeldung an die Abonnenten weiterzuleiten, und fahre fort, als ob nichts passiert wäre, wie im folgenden Marmordiagramm dargestellt:

image

Mit anderen Worten: concatWith() kann zwei Observables miteinander verbinden, so dass, wenn das erste beendet ist, das zweite übernimmt. Ina.concatWith(b).subscribe(...) empfängt der Abonnent zuerst alle Ereignisse von a, gefolgt von allen Ereignissen von b. In diesem Fall empfängt der Abonnent zuerst 10 Elemente, gefolgt von weiteren 10. Aber sieh genau hin, es gibt eine vermeintlich unendliche Rekursion in unserem Code!allPeople(initialPage) ruft allPeople(initialPage + 1) ohne eine Stopp-Bedingung auf. Das ist in den meisten Sprachen ein Rezept für StackOverflowError, aber nicht hier. Auch hier ist der Aufruf vonallPeople() immer faul, daher ist diese Rekursion in dem Moment beendet, in dem du aufhörst zuzuhören (dich abmeldest). Technisch gesehen kann concatWith() hier immer noch StackOverflowError erzeugen. Warte bis "Die angeforderte Datenmenge berücksichtigen", dann wirst du lernen, wie du mit der schwankenden Nachfrage nach eingehenden Daten umgehen kannst.

Die Technik, Daten Stück für Stück zu laden, ist sehr nützlich, weil sie es dir ermöglicht, dich auf die Geschäftslogik zu konzentrieren und nicht auf das Low-Level-Plumbing. Wir sehen bereits einige Vorteile der Anwendung von RxJava, selbst in kleinem Maßstab. Die Entwicklung einer API mit Rx im Hinterkopf hat keinen Einfluss auf die gesamte Architektur, denn wir können jederzeit auf BlockingObservable und Java Collections zurückgreifen. Aber es ist besser, eine breite Palette von Möglichkeiten zu haben, die wir bei Bedarf weiter einschränken können.

Lazy Paging und Verkettung

Es gibt noch mehr Möglichkeiten, Lazy Paging mit RxJava zu implementieren. Wenn du darüber nachdenkst, ist die einfachste Art, ausgelagerte Daten zu laden, alles zu laden und dann das zu nehmen, was wir brauchen. Es klingt albern, aber dank der Faulheit ist es machbar. Zuerst generieren wir alle möglichen Seitenzahlen und fordern dann das Laden jeder einzelnen Seite an:

Observable<List<Person>> allPages = Observable
            .range(0, Integer.MAX_VALUE)
            .map(this::listPeople)
            .takeWhile(list -> !list.isEmpty());

Wäre dies nicht RxJava, würde der vorangegangene Code eine enorme Menge an Zeit und Speicher beanspruchen, da er im Grunde die gesamte Datenbank in den Speicher laden würde. Aber da Observable faul ist, ist noch keine Abfrage an die Datenbank erfolgt. Außerdem bedeutet eine leere Seite, dass auch alle weiteren Seiten leer sind (wir haben das Ende der Tabelle erreicht). Daher verwenden wir takeWhile() statt filter(). Um allPages zu Observable<Person> zu verflachen, können wir concatMap() verwenden (siehe "Ordnung erhalten mit concatMap()"):

Observable<Person> people = allPages.concatMap(Observable::from);

concatMap() erfordert eine Umwandlung von List<Person> nach Observable<Person>, die für jede Seite ausgeführt wird. Alternativ können wir concatMapIterable() ausprobieren, was dasselbe tut, aber die Umwandlung sollte für jeden vorgelagerten Wert (der zufällig bereits Iterable<Person> ist) einen Iterable<Person> zurückgeben:

Observable<Person> people = allPages.concatMapIterable(page -> page);

Unabhängig davon, welchen Ansatz du wählst, sind alle Transformationen auf dem Person Objekt träge. Solange du die Anzahl der zu verarbeitenden Datensätze begrenzst (z.B. mit people.take(15)), wird das Observable<Person> listPeople() so spät wie möglich aufrufen.

Unbedingte Gleichzeitigkeit

Ich sehe nicht oft explizite Gleichzeitigkeit in Unternehmensanwendungen. Meistens wird eine einzelne Anfrage von einem einzigen Thread bearbeitet. Derselbe Thread macht Folgendes:

  • Akzeptiert TCP/IP-Verbindung

  • Parst HTTP-Anfrage

  • Ruft einen Controller oder ein Servlet auf

  • Blockiert beim Aufruf der Datenbank

  • Verarbeitet Ergebnisse

  • Kodiert die Antwort (z. B. in JSON)

  • Sendet rohe Bytes zurück an den Kunden

Dieses Schichtenmodell wirkt sich auf die Latenz aus, wenn das Backend mehrere unabhängige Anfragen stellt, z. B. an die Datenbank. Sie werden sequentiell ausgeführt, obwohl man sie leicht parallelisieren könnte. Außerdem wird die Skalierbarkeit beeinträchtigt. In Tomcat gibt es zum Beispiel standardmäßig 200 Threads in den Executors, die für die Bearbeitung von Anfragen zuständig sind. Das bedeutet, dass wir nicht mehr als 200 gleichzeitige Verbindungen verarbeiten können. Bei einem plötzlichen, aber kurzen Ansturm werden die eingehenden Verbindungen in eine Warteschlange gestellt und der Server antwortet mit einer höheren Latenzzeit. Diese Situation kann jedoch nicht ewig andauern und Tomcat wird irgendwann anfangen, den eingehenden Datenverkehr abzuweisen. Wir werden einen großen Teil des nächsten Kapitels (siehe "Nonblocking HTTP Server mit Netty und RxNetty") der Frage widmen, wie wir mit diesem eher peinlichen Mangel umgehen. Bleiben wir vorerst bei der traditionellen Architektur: Die Ausführung aller Schritte der Anfragebearbeitung in einem einzigen Thread hat einige Vorteile, zum Beispiel eine bessere Cache-Lokalität und einen minimalen Synchronisations-Overhead.1 Da bei klassischen Anwendungen die Gesamtlatenz die Summe der Latenzen der einzelnen Schichten ist, kann sich eine einzige fehlerhafte Komponente negativ auf die Gesamtlatenz auswirken.2 Außerdem gibt es manchmal viele Schritte, die voneinander unabhängig sind und gleichzeitig ausgeführt werden können. Wir rufen zum Beispiel mehrere externe APIs auf oder führen mehrere unabhängige SQL-Abfragen aus.

Das JDK bietet eine recht gute Unterstützung für Gleichzeitigkeit, vor allem seit Java 5 mit ExecutorService und Java 8 mit CompletableFuture. Trotzdem wird sie nicht so häufig genutzt, wie es möglich wäre. Betrachten wir zum Beispiel das folgende Programm ohne jegliche Gleichzeitigkeit:

Flight lookupFlight(String flightNo) {
    //...
}

Passenger findPassenger(long id) {
    //...
}

Ticket bookTicket(Flight flight, Passenger passenger) {
    //...
}

SmtpResponse sendEmail(Ticket ticket) {
    //...
}

Und auf der Kundenseite:

Flight flight = lookupFlight("LOT 783");
Passenger passenger = findPassenger(42);
Ticket ticket = bookTicket(flight, passenger);
sendEmail(ticket);

Auch hier handelt es sich um einen ganz typischen, klassischen Blocking-Code, wie er in vielen Anwendungen zu finden ist. Aber wenn du dir die Latenzzeit genau ansiehst, hat der vorhergehende Codeschnipsel vier Schritte; die ersten beiden sind jedoch unabhängig voneinander. Nur der dritte Schritt (bookTicket()) benötigt die Ergebnisse von lookupFlight() undfindPassenger(). Es gibt eine offensichtliche Möglichkeit, die Vorteile der Gleichzeitigkeit zu nutzen. Doch nur sehr wenige Entwickler werden diesen Weg tatsächlich gehen, weil er umständliche Thread-Pools, Futures und Rückrufe erfordert. Was wäre aber, wenn die API bereits Rx-kompatibel wäre? Erinnere dich daran, dass du blockierenden Legacy-Code einfach in Observable verpacken kannst, so wie wir es am Anfang dieses Kapitels getan haben:

Observable<Flight> rxLookupFlight(String flightNo) {
    return Observable.defer(() ->
            Observable.just(lookupFlight(flightNo)));
}

Observable<Passenger> rxFindPassenger(long id) {
    return Observable.defer(() ->
            Observable.just(findPassenger(id)));
}

Semantisch gesehen tun die Methoden von rx- genau dasselbe und auf dieselbe Art und Weise; das heißt, sie sind standardmäßig blockierend. Abgesehen von einer ausführlicheren API aus Sicht des Clients haben wir noch nichts gewonnen:

Observable<Flight> flight = rxLookupFlight("LOT 783");
Observable<Passenger> passenger = rxFindPassenger(42);
Observable<Ticket> ticket =
        flight.zipWith(passenger, (f, p) -> bookTicket(f, p));
ticket.subscribe(this::sendEmail);

Sowohl herkömmliche Blockierprogramme als auch das Programm mit Observable funktionieren genau gleich. Es ist zwar standardmäßig langsamer, aber die Reihenfolge der Vorgänge ist im Wesentlichen dieselbe. Zuerst erstellen wir Observable<Flight>, das, wie du bereits weißt, standardmäßig nichts tut. Wenn jemand nicht ausdrücklich nach einer Flight fragt, ist diese Observable nur ein fauler Platzhalter. Wir haben bereits gelernt, dass dies eine wertvolle Eigenschaft von kalten Observables ist. Das Gleiche gilt für Observable<Passenger>; wir haben zwei Platzhalter vom TypFlight und Passenger, allerdings wurden noch keine Seiteneffekte ausgeführt. Keine Datenbankabfrage und kein Web-Service-Aufruf. Wenn wir uns entscheiden, die Verarbeitung hier zu beenden, wurde keine überflüssige Arbeit geleistet.

Um mit bookTicket() fortzufahren, brauchen wir konkrete Instanzen von Flight und Passenger. Es ist verlockend, diese beiden Observables einfach zu blockieren, indem wir den toBlocking() Operator verwenden. Wir möchten jedoch das Blockieren so weit wie möglich vermeiden, um den Ressourcenverbrauch (vor allem den Speicher) zu reduzieren und eine größere Gleichzeitigkeit zu ermöglichen. Eine andere schlechte Lösung ist es,.subscribe() auf die flight und passenger Observable s anzuwenden und irgendwie zu warten, bis beide Callbacks beendet sind. Das ist ziemlich einfach, wennObservable blockiert, aber wenn Callbacks asynchron erscheinen und du einen globalen Zustand synchronisieren musst, der auf beide wartet, wird das schnell zu einem Albtraum. Auch ein verschachteltes subscribe() ist nicht idiomatisch, und normalerweise willst du ein einziges Abonnement für einen Nachrichtenfluss (Anwendungsfall). Der einzige Grund, warum Callbacks in JavaScript einigermaßen anständig funktionieren, ist, dass es nur einen Thread gibt. Die idiomatische Art, mehrere Observables gleichzeitig zu abonnieren, ist zip und zipWith. Du könntest zip als eine Möglichkeit sehen, zwei unabhängige Datenströme paarweise zu verbinden. Aber viel häufiger wird zip einfach verwendet, um zwei Einzel-Elemente Observables zu verbinden.ob1.zip(ob2).subscribe(...) bedeutet im Wesentlichen, dass ein Ereignis empfangen wird, wenn sowohl ob1 als auch ob2 fertig sind (ein eigenes Ereignis ausgeben). Wenn du also zip siehst, ist es wahrscheinlicher, dass jemand einfach einen Join-Schrittfür zwei oder mehr Observables macht, deren Ausführungspfade sich verzweigt haben.zip ist eine Möglichkeit, asynchron auf zwei oder mehr Werte zu warten, egal, welcher zuletzt erscheint.

Kommen wir also zurück zu flight.zipWith(passenger, this::bookTicket) (eine kürzere Syntax mit Methodenreferenz anstelle eines expliziten Lambdas, wie im Codebeispiel). Der Grund, warum ich alle Typinformationen behalte, anstatt Ausdrücke fließend zu verbinden, ist, dass ich möchte, dass du auf die Rückgabetypen achtest. flight.zipWith(passenger, ...) ruft nicht einfach einen Callback auf, wenn sowohl flight als auch passenger fertig sind; es gibt einen neuenObservable zurück, den du sofort als faulen Platzhalter für Daten erkennen solltest. Erstaunlicherweise wurde zu diesem Zeitpunkt auch noch keine Berechnung gestartet. Wir haben einfach ein paar Datenstrukturen zusammengeschustert, aber kein Verhalten ausgelöst. Solange niemand Observable<Ticket> abonniert hat, wird RxJava keinen Backend-Code ausführen. Das passiert schließlich in der letzten Anweisung: ticket.subscribe() fragt explizit nach Ticket.

Wo kann man sich anmelden?

Achte darauf, wo du subscribe() im Domaincode siehst. Oft besteht deine Geschäftslogik nur darin, Observablezusammenzustellen und an eine Art Framework oder Gerüstschicht zurückzugeben. Das eigentliche Abonnement findet hinter den Kulissen in einem Webframework oder in einem Glue-Code statt. Es ist keine schlechte Praxis, subscribe() selbst aufzurufen, aber versuche, es so weit wie möglich nach außen zu verlagern.

Um den Ablauf der Ausführung zu verstehen, ist es sinnvoll, von unten nach oben zu schauen. Wir haben ticket abonniert, also muss RxJava sowohl flight als auch passenger transparent abonnieren. An diesem Punkt beginnt die eigentliche Logik. Da beide Observables kalt sind und noch keine Gleichzeitigkeit besteht, ruft die erste Subskription von flight die blockierende Methode lookupFlight() direkt im aufrufenden Thread auf. Wenn lookupFlight() fertig ist, kann RxJava passenger abonnieren. Allerdings hat es bereits eine Flight Instanz von der synchronen flight erhalten. rxFindPassenger() ruft findPassenger() blockierend auf und erhält eine Passenger Instanz. An dieser Stelle fließen die Daten wieder zurück. Die Instanzen von Flight und Passenger werden mithilfe des bereitgestellten Lambdas (bookTicket) kombiniert und an ticket.subscribe() weitergegeben.

Das hört sich nach einer Menge Arbeit an, wenn man bedenkt, dass er sich im Wesentlichen genauso verhält und funktioniert wie unser blockierender Code am Anfang. Aber jetzt können wir deklarativ Gleichzeitigkeit anwenden, ohne die Logik zu ändern. Wenn unsere Geschäftsmethoden Future<Flight> (oderCompletableFuture<Flight>, das spielt keine Rolle) zurückgeben würden, wären zwei Entscheidungen für uns getroffen worden:

  • Der zugrunde liegende Aufruf von lookupFlight() hat bereits begonnen und es gibt keinen Platz für Faulheit. Wir blockieren diese Methode nicht, aber die Arbeit hat bereits begonnen.

  • Wir haben keinerlei Kontrolle über die Gleichzeitigkeit. Es ist die Methodenimplementierung, die entscheidet, ob eine Future Aufgabe in einem Thread-Pool aufgerufen wird, ein neuer Thread pro Anfrage und so weiter.

RxJava gibt den Nutzern mehr Kontrolle. Nur weil Observable<Flight>nicht mit Blick auf die Gleichzeitigkeit implementiert wurde, heißt das nicht, dass wir sie nicht später anwenden können. In der realen Welt ist Observablein der Regel bereits asynchron, aber in seltenen Fällen muss man die Gleichzeitigkeit zu einer bestehenden Observable hinzufügen. Die Verbraucher unserer API, nicht die Implementierer, können den Threading-Mechanismus im Falle der synchronen Observable frei wählen. All dies wird durch die Verwendung des subscribeOn() Operators erreicht:

Observable<Flight> flight =
    rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
Observable<Passenger> passenger =
    rxFindPassenger(42).subscribeOn(Schedulers.io());

An jedem beliebigen Punkt vor dem Abonnieren können wir den subscribeOn() Operator einfügen und eine sogenannte Scheduler Instanz bereitstellen. In diesem Fall habe ich die Schedulers.io() Factory-Methode verwendet, aber wir können genauso gut eine benutzerdefinierte ExecutorService verwenden und sie schnell mit Scheduler umhüllen. Wenn die Subskription erfolgt, wird der anObservable.create() übergebene Lambda-Ausdruck in dem bereitgestellten Scheduler und nicht im Client-Thread ausgeführt. Das ist noch nicht notwendig, aber wir werden Zeitplannungsprogramme im Abschnitt "Was ist ein Zeitplannungsprogramm?" genauer untersuchen. In der Zwischenzeit behandelst du Scheduler wie einen Thread-Pool.

Wie verändert Scheduler das Laufzeitverhalten unseres Programms? Erinnere dich, dass der zip() Operator zwei oder mehrObservables abonniert und auf Paare (oder Tupel) wartet. Wenn die Subskription asynchron erfolgt, können alle vorgelagerten Observables ihren zugrunde liegenden blockierenden Code gleichzeitig aufrufen. Wenn du dein Programm jetzt ausführst, werden lookupFlight() und findPassenger() sofort und gleichzeitig mit der Ausführung beginnen, wenn ticket.subscribe() aufgerufen wird. Dann wirdbookTicket() ausgeführt, sobald der langsamere der vorgenannten Observables einen Wert ausgibt.

Apropos Langsamkeit: Du kannst auch deklarativ eine Zeitüberschreitung festlegen, wenn ein bestimmtes Observable in der angegebenen Zeit keinen Wert ausgibt:

rxLookupFlight("LOT 783")
    .subscribeOn(Schedulers.io())
    .timeout(100, TimeUnit.MILLISECONDS)

Wie immer werden Fehler nicht willkürlich verworfen, sondern nachgelagert weitergegeben. Wenn also die Methode lookupFlight() länger als 100 Millisekunden dauert, wird am Ende TimeoutException anstelle eines ausgegebenen Wertes an alle Abonnenten gesendet. Der Operator timeout() wird in "Timing Out When Events Do Not Occurrence" ausführlich erklärt .

Am Ende haben wir zwei Methoden, die ohne großen Aufwand gleichzeitig laufen, vorausgesetzt, deine API ist bereits Rx-gesteuert. Aber wir haben ein wenig geschummelt, da bookTicket() immer noch Ticket zurückgibt, was definitiv bedeutet, dass sie blockiert. Selbst wenn die Ticketbuchung extrem schnell war, lohnt es sich, sie als solche zu deklarieren, um die Weiterentwicklung der API zu erleichtern. Die Weiterentwicklung könnte bedeuten, dass du Nebenläufigkeit hinzufügst oder sie in vollständig nichtblockierenden Umgebungen verwendest (siehe Kapitel 5). Erinnere dich daran, dass es so einfach ist, eine nicht blockierende API in eine blockierende umzuwandeln, wie den Aufruf von toBlocking(). Das Gegenteil ist oft schwierig und erfordert viele zusätzliche Ressourcen. Außerdem ist es sehr schwierig, die Entwicklung von Methoden wie rxBookTicket() vorherzusagen. Wenn sie jemals das Netzwerk oder das Dateisystem berühren, ganz zu schweigen von der Datenbank, lohnt es sich, sie mit einem Observable zu versehen, der die mögliche Latenz auf der Typebene angibt:

Observable<Ticket> rxBookTicket(Flight flight, Passenger passenger) {
    //...
}

Aber jetzt gibt zipWith() ein unangenehmes Observable<Observable<Ticket>> zurück und der Code lässt sich nicht mehr kompilieren. Eine gute Faustregel ist, dass immer dann, wenn du einen doppelt eingeschlossenen Typ siehst (zum Beispiel Optional<Optional<...>>), irgendwo ein flatMap() -Aufruf fehlt. Das ist auch hier der Fall. zipWith() nimmt ein Paar (oder allgemeiner ein Tupel) von Ereignissen, wendet eine Funktion an, die diese Ereignisse als Argumente verwendet, und stellt das Ergebnis unverändert in das nachgeschaltete Observable ein. Deshalb haben wir zuerst Observable<Ticket>gesehen, aber jetzt ist es Observable<Observable<Ticket>>, wobeiObservable<Ticket> das Ergebnis der von uns gelieferten Funktion ist. Es gibt zwei Möglichkeiten, dieses Problem zu lösen. Eine Möglichkeit besteht darin, ein Zwischenpaar zu verwenden, das vonzipWith zurückgegeben wird:

import org.apache.commons.lang3.tuple.Pair;

Observable<Ticket> ticket = flight
        .zipWith(passenger, (Flight f, Passenger p) -> Pair.of(f, p))
        .flatMap(pair -> rxBookTicket(pair.getLeft(), pair.getRight()));

Wenn die Verwendung einer expliziten Pair aus der Bibliothek eines Drittanbieters den Codefluss nicht genug verwirren würde, würde eine Methodenreferenz tatsächlich funktionieren: Pair::of Aber auch hier haben wir uns entschieden, dass sichtbare Typinformationen wertvoller sind, als ein paar Tastenanschläge zu sparen. Schließlich lesen wir viel mehr Zeit mit dem Lesen von Code, als wir ihn schreiben. Eine Alternative zu einem Zwischenpaar ist die Anwendung einer flatMap mit einer Identitätsfunktion:

Observable<Ticket> ticket = flight
        .zipWith(passenger, this::rxBookTicket)
        .flatMap(obs -> obs);

Dieser obs -> obs Lambda-Ausdruck tut scheinbar nichts, zumindest wenn er ein map() Operator wäre. Aber erinnere dich daran, dass flatMap() eine Funktion auf jeden Wert innerhalb von Observable anwendet, so dass diese Funktion in unserem FallObservable<Ticket> als Argument benötigt. Später wird das Ergebnis nicht direkt in den resultierenden Stream eingefügt, wie bei map(). Stattdessen wird der Rückgabewert (vom Typ Observable<T>) "geglättet", was zu einem Observable<T> und nicht zu einem Observable<Observable<T>> führt. Wenn es um Zeitplannungsprogramme geht, wird der Operator flatMap() noch mächtiger. Man könnte meinen, dass flatMap() nur ein syntaktischer Trick ist, um ein verschachteltes Observable<Observable<...>> Problem zu vermeiden, aber er ist viel grundlegender als das.

Observable.subscribeOn() Anwendungsfälle

Es ist verlockend zu denken, dass subscribeOn() das richtige Werkzeug für die Gleichzeitigkeit in RxJava ist. Dieser Operator funktioniert, aber du solltest die Verwendung von subscribeOn() (und dem noch zu beschreibenden observeOn()) nicht oft sehen. Im wirklichen Leben kommen Observables aus asynchronen Quellen, so dass ein benutzerdefiniertes Zeitplanungsprogramm überhaupt nicht benötigt wird. Wir verwenden subscribeOn() in diesem Kapitel, um explizit zu zeigen, wie man bestehende Anwendungen aufrüsten kann, um reaktive Prinzipien selektiv zu nutzen. Aber in der Praxis sind Schedulers und subscribeOn() Waffen der letzten Instanz, also nichts, was man häufig sieht.

flatMap() als asynchroner Verkettungsoperator

In unserer Beispielanwendung müssen wir nun eine Liste von Ticketper E-Mail versenden. Dabei müssen wir Folgendes beachten:

  1. Die Liste kann ziemlich lang sein.

  2. Das Versenden einer E-Mail kann einige Millisekunden oder sogar Sekunden dauern.

  3. Die Anwendung muss im Falle von Fehlschlägen weiterlaufen, aber am Ende melden, welche Tickets fehlgeschlagen sind.

Die letzte Anforderung schließttickets.forEach(this::sendEmail) schnell aus, weil es eifrig eine Ausnahme wirft und nicht mit Tickets fortfährt, die noch nicht geliefert wurden. Ausnahmen sind eigentlich eine böse Hintertür zum Typsystem und genau wie Rückrufe sind sie nicht sehr freundlich, wenn man sie auf eine robustere Art und Weise verwalten will. Deshalb modelliert RxJava sie explizit als spezielle Benachrichtigungen, aber hab Geduld, wir werden dazu kommen. Angesichts der Anforderung an die Fehlerbehandlung sieht unser Code mehr oder weniger so aus:

List<Ticket> failures = new ArrayList<>();
for(Ticket ticket: tickets) {
    try {
        sendEmail(ticket);
    } catch (Exception e) {
        log.warn("Failed to send {}", ticket, e);
        failures.add(ticket);
    }
}

Die ersten beiden Anforderungen oder Richtlinien werden jedoch nicht berücksichtigt. Es gibt keinen Grund, warum wir E-Mails von einem Thread aus nacheinander versenden. Traditionell könnten wir dafür eine ExecutorService pool verwenden, indem wir jede E-Mail als separate Aufgabe übermitteln:

List<Pair<Ticket, Future<SmtpResponse>>> tasks = tickets
    .stream()
    .map(ticket -> Pair.of(ticket, sendEmailAsync(ticket)))
    .collect(toList());

List<Ticket> failures = tasks.stream()
    .flatMap(pair -> {
        try {
            Future<SmtpResponse> future = pair.getRight();
            future.get(1, TimeUnit.SECONDS);
            return Stream.empty();
        } catch (Exception e) {
            Ticket ticket = pair.getLeft();
            log.warn("Failed to send {}", ticket, e);
            return Stream.of(ticket);
        }
    })
    .collect(toList());

//------------------------------------

private Future<SmtpResponse> sendEmailAsync(Ticket ticket) {
    return pool.submit(() -> sendEmail(ticket));
}

Das ist eine ganze Menge Code, mit dem alle Java-Programmierer vertraut sein sollten. Dennoch scheint er zu langatmig und ungewollt komplex zu sein. Zunächst durchlaufen wir tickets und übermitteln sie an einen Thread-Pool. Um genau zu sein, rufen wir die Hilfsmethode sendEmailAsync() auf, die den in Callable<SmtpResponse> verpacktensendEmail() -Aufruf an einen Threadpool übergibt. Noch genauere Instanzen von Callable werden zunächst in eine unbegrenzte (standardmäßig) Warteschlange vor einem Thread-Pool gestellt. Das Fehlen von Mechanismen, die eine zu schnelle Übergabe von Aufgaben verlangsamen, wenn sie nicht rechtzeitig bearbeitet werden können, führte zu reaktiven Streams und Backpressure-Aufwand (siehe "Backpressure").

Da wir später im Falle eines Fehlers eine Ticket Instanz benötigen, müssen wir nachverfolgen, welche Future für welcheTicket verantwortlich war, wiederum in einem Pair. In echtem Produktionscode solltest du einen aussagekräftigeren und dedizierten Container wie ein TicketAsyncTask Wertobjekt in Betracht ziehen. Wir sammeln alle diese Paare und fahren mit der nächsten Iteration fort. Zu diesem Zeitpunkt führt der Thread-Pool bereits mehrere sendEmail() Aufrufe gleichzeitig aus, und das ist genau das, was wir bezwecken wollten. Die zweite Schleife durchläuft alle Futures und versucht, sie zu dereferenzieren, indem sie blockiert (get()) und auf den Abschluss wartet. Wenn get() erfolgreich zurückkehrt, überspringen wir eine solche Ticket. Wenn es jedoch eine Ausnahme gibt, geben wir die Ticket Instanz zurück, die mit dieser Aufgabe verbunden war - wir wissen, dass sie fehlgeschlagen ist und wollen das später melden. Stream.flatMap() erlaubt es uns, null oder ein Element (oder eigentlich eine beliebige Zahl) zurückzugeben, im Gegensatz zu Stream.map(), das immer eins verlangt.

Du fragst dich vielleicht, warum wir zwei Schleifen brauchen und nicht nur eine wie hier:

//WARNING: code is sequential despite utilizing thread pool
List<Ticket> failures = tickets
        .stream()
        .map(ticket -> Pair.of(ticket, sendEmailAsync(ticket)))
        .flatMap(pair -> {
            //...
        })
        .collect(toList());

Dies ist ein interessanter Fehler, der wirklich schwer zu finden ist, wenn du nicht verstehst, wie Streams in Java 8 funktionieren. Da Streams - genau wie Observables - träge sind, werten sie die zugrundeliegende Sammlung nur ein Element nach dem anderen aus und auch nur dann, wenn eine Terminaloperation angefordert wurde (z. B.collect(toList())). Das bedeutet, dass eine map() Operation, mit der Hintergrundaufgaben gestartet werden, nicht sofort auf allen Tickets ausgeführt wird, sondern eines nach dem anderen, abwechselnd mit einer flatMap() Operation. Außerdem starten wir wirklich eine Future, blockieren das Warten darauf, starten eine zweite Future, blockieren das Warten darauf, und so weiter. Eine Zwischensammlung wird benötigt, um die Auswertung zu erzwingen, nicht wegen der Übersichtlichkeit oder Lesbarkeit. Schließlich ist der TypList<Pair<Ticket, Future<SmtpResponse>>> kaum lesbarer.

Das ist viel Arbeit und die Fehlerwahrscheinlichkeit ist hoch. Kein Wunder also, dass Entwicklerinnen und Entwickler davor zurückschrecken, nebenläufigen Code im Alltag anzuwenden. Das wenig bekannte ExecutorCompletionService aus dem JDK wird manchmal verwendet, wenn es einen Pool von asynchronen Aufgaben gibt und wir sie verarbeiten wollen, sobald sie abgeschlossen sind. Außerdem bringt Java 8 CompletableFuture (siehe "CompletableFuture und Streams"), das vollständig reaktiv und nicht blockierend ist. Aber wie kann RxJava hier helfen? Nehmen wir zunächst an, dass eine API zum Versenden einer E-Mail bereits auf die Verwendung von RxJava umgerüstet wurde:

import static rx.Observable.fromCallable;

Observable<SmtpResponse> rxSendEmail(Ticket ticket) {
    //unusual synchronous Observable
    return fromCallable(() -> sendEmail())
}

Es geht nicht um Gleichzeitigkeit, sondern nur darum, sendEmail() in ein Observable zu packen. Dies ist eine seltene Observable; normalerweise würdest du subscribeOn() in der Implementierung verwenden, damit Observable standardmäßig asynchron ist. An diesem Punkt können wir wie zuvor über alle tickets iterieren:

List<Ticket> failures = Observable.from(tickets)
    .flatMap(ticket ->
        rxSendEmail(ticket)
            .flatMap(response -> Observable.<Ticket>empty())
            .doOnError(e -> log.warn("Failed to send {}", ticket, e))
            .onErrorReturn(err -> ticket))
    .toList()
    .toBlocking()
    .single();

Observable.ignoreElements()

Es ist leicht zu erkennen, dass die innere flatMap() in unserem Beispiel response ignoriert und einen leeren Stream zurückgibt. In solchen Fällen ist flatMap() ein Overkill; der ignoreElements() -Operator ist viel effizienter.ignoreElements() ignoriert einfach alle ausgegebenen Werte und leitet onCompleted() oder onError() Benachrichtigungen weiter. Da wir die eigentliche Antwort ignorieren und nur Fehler behandeln, funktioniert ignoreElements() hier hervorragend.

Alles, was uns interessiert, liegt innerhalb der äußeren flatMap(). Wäre es nurflatMap(this::rxSendEmail), würde der Code funktionieren; allerdings würde jeder Fehler, der von rxSendEmail ausgegeben wird, den gesamten Stream beenden. Wir wollen aber alle ausgegebenen Fehler "auffangen" und für den späteren Gebrauch sammeln. Wir verwenden einen ähnlichen Trick wie bei Stream.flatMap(): Wenn response erfolgreich ausgegeben wurde, wandeln wir es in ein leeres Observable um. Das bedeutet im Grunde, dass wir erfolgreiche Tickets verwerfen. Bei Misserfolgen geben wir jedoch ein ticket zurück, das eine Ausnahme ausgelöst hat. Ein zusätzlicherdoOnError() Callback ermöglicht es uns, die Ausnahmen zu protokollieren - natürlich können wir die Protokollierung auch zum onErrorReturn() Operator hinzufügen, aber ich fand diese Trennung der Anliegen funktionaler.

Um mit früheren Implementierungen kompatibel zu bleiben, verwandeln wir Observable in Observable<List<Ticket>>, BlockingObservable<List<Ticket>>, toBlocking() und schließlich List<Ticket> (single()). Interessanterweise bleibt auch BlockingObservable faul. Ein toBlocking() Operator allein erzwingt keine Auswertung, indem er den zugrunde liegenden Stream abonniert, und er blockiert nicht einmal. Die Subskription und damit die Iteration und das Senden von E-Mails wird aufgeschoben, bis single() aufgerufen wird.

Wenn wir die äußere flatMap() durch concatMap() ersetzen (siehe "Ways of Combining Streams: concat(), merge(), and switchOnNext()" und "Preserving Order Using concatMap()"), werden wir auf einen ähnlichen Fehler stoßen, wie er bei Stream von JDK erwähnt wurde. Im Gegensatz zu flatMap() (oder merge), das alle inneren Streams sofort abonniert, abonniert concatMap (oder concat) einen inneren Observable nach dem anderen. Und solange niemand Observable abonniert hat, wird auch nicht gearbeitet.

Bislang wurde eine einfache for Schleife mit try-catch durch eine weniger lesbare und komplexere Observable ersetzt. Um unseren sequenziellen Code in eine Multithreading-Berechnung zu verwandeln, müssen wir jedoch nur einen zusätzlichen Operator hinzufügen:

Observable
        .from(tickets)
        .flatMap(ticket ->
                rxSendEmail(ticket)
                        .ignoreElements()
                        .doOnError(e -> log.warn("Failed to send {}", ticket, e))
                        .onErrorReturn(err -> ticket)
                        .subscribeOn(Schedulers.io()))

Sie ist so unauffällig, dass du sie vielleicht gar nicht bemerkst. Ein zusätzlichersubscribeOn() Operator bewirkt, dass jede einzelne rxSendMail() auf einem bestimmten Scheduler (io(), in diesem Fall) ausgeführt wird. Das ist eine der Stärken von RxJava: Es macht sich keine Gedanken über Threading, sondern setzt auf synchrone Ausführung, erlaubt aber nahtloses und fast transparentes Multithreading. Das bedeutet natürlich nicht, dass du Zeitplanungsprogramme an beliebigen Stellen einbauen kannst. Aber zumindest ist die API weniger langatmig und auf höherem Niveau. Wir werden Zeitplannungsprogramme später in "Multithreading in RxJava" noch genauer untersuchen . Erinnere dich vorerst daran, dass Observablestandardmäßig synchron ist; wir können das jedoch leicht ändern und Gleichzeitigkeit an Stellen anwenden, an denen sie am wenigsten erwartet wird. Das ist vor allem bei bestehenden Legacy-Anwendungen nützlich, die du ohne großen Aufwand optimieren kannst.

Wenn du Observablevon Grund auf neu implementierst, ist es idiomatischer, sie standardmäßig asynchron zu machen. Das bedeutet, dass du subscribeOn() direkt innerhalb von rxSendEmail() platzierst und nicht extern. Andernfalls riskierst du, bereits asynchrone Streams mit einer weiteren Schicht von Zeitplannungsprogrammen zu umhüllen. Wenn der Producer hinter Observable bereits asynchron ist, ist das natürlich noch besser, weil dein Stream nicht an einen bestimmten Thread gebunden ist. Außerdem solltest du das Abonnieren eines Observable so spät wie möglich verschieben, typischerweise in die Nähe des Web-Frameworks unserer Außenwelt. Das ändert deine Denkweise erheblich. Deine gesamte Geschäftslogik ist faul, bis jemand die Ergebnisse tatsächlich sehen will.3

Ersetzen von Rückrufen durch Streams

Herkömmliche APIs sind die meiste Zeit blockierend, das heißt, sie zwingen dich, synchron auf die Ergebnisse zu warten. Dieser Ansatz funktioniert relativ gut, zumindest bevor du von RxJava gehört hast. Aber eine blockierende API ist besonders problematisch, wenn Daten vom API-Produzenten an die Konsumenten weitergegeben werden müssen - das ist ein Bereich, in dem RxJava wirklich glänzt. Es gibt zahlreiche Beispiele für solche Fälle, und die API-Designer verfolgen verschiedene Ansätze. In der Regel müssen wir eine Art Callback bereitstellen, den die API aufruft, oft als Event-Listener bezeichnet. Eines der häufigsten Szenarien dieser Art ist derJava Message Service (JMS). Um JMS zu nutzen, muss normalerweise eine Klasse implementiert werden, die den Anwendungsserver oder Container über jede eingehende Nachricht informiert. Wir können solche Listener relativ einfach durch einen komponierbaren Observable ersetzen, der viel robuster und vielseitiger ist. Der traditionelle Listener sieht ähnlich aus wie diese Klasse, die hier die JMS-Unterstützung imSpring-Framework nutzt, aber unsere Lösung ist technologieunabhängig:

@Component
class JmsConsumer {

    @JmsListener(destination = "orders")
    public void newOrder(Message message) {
        //...
    }
}

Wenn eine JMS message empfangen wird, muss die Klasse JmsConsumer entscheiden, was damit geschehen soll. Normalerweise wird eine Geschäftslogik innerhalb eines Nachrichtenkonsumenten aufgerufen. Wenn eine neue Komponente über solche Nachrichten benachrichtigt werden möchte, muss sie JmsConsumer entsprechend ändern. Stell dir vor, dass Observable<Message> von jedem abonniert werden kann. Außerdem steht ein ganzes Universum von RxJava-Operatoren zur Verfügung, mit denen man Nachrichten zuordnen, filtern und kombinieren kann. Der einfachste Weg, um von einer Push- und Callback-basierten API zu Observable zu wechseln, ist die Verwendung vonSubjects. Jedes Mal, wenn eine neue JMS-Nachricht zugestellt wird, pushen wir diese Nachricht an eine PublishSubject, die von außen wie ein gewöhnlicher HotObservable aussieht:

private final PublishSubject<Message> subject = PublishSubject.create();

@JmsListener(destination = "orders", concurrency="1")
public void newOrder(Message msg) {
    subject.onNext(msg);
}

Observable<Message> observe() {
    return subject;
}

Vergiss nicht, dass Observable<Message> "heiß" ist: Es beginnt mit der Ausgabe von JMS-Nachrichten, sobald sie konsumiert werden. Wenn in diesem Moment niemand abonniert ist, gehen die Nachrichten einfach verloren. ReplaySubject ist eine Alternative, aber da es alle Ereignisse seit dem Start der Anwendung zwischenspeichert, ist es nicht für lang laufende Prozesse geeignet. Wenn du einen Abonnenten hast, der unbedingt alle Nachrichten erhalten muss, musst du sicherstellen, dass er die Nachrichten abonniert, bevor der JMS Message Listener initialisiert wird. Außerdem hat unser Message Listener einen concurrency="1" Parameter, um sicherzustellen, dass Subject nicht von mehreren Threads aus aufgerufen wird. Als Alternative kannst du Subject.toSerialized() verwenden.

Nebenbei bemerkt: Subjectist zwar einfacher zu starten, aber nach einer Weile bekanntermaßen problematisch. In diesem speziellen Fall können wir Subject einfach durch das idiomatischere RxJava Observable ersetzen, das create() direkt verwendet:

public Observable<Message> observe(
    ConnectionFactory connectionFactory,
    Topic topic) {
    return Observable.create(subscriber -> {
        try {
            subscribeThrowing(subscriber, connectionFactory, topic);
        } catch (JMSException e) {
            subscriber.onError(e);
        }
    });
}

private void subscribeThrowing(
        Subscriber<? super Message> subscriber,
        ConnectionFactory connectionFactory,
        Topic orders) throws JMSException {
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(true, AUTO_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer(orders);
    consumer.setMessageListener(subscriber::onNext);
    subscriber.add(onUnsubscribe(connection));
    connection.start();
}

private Subscription onUnsubscribe(Connection connection) {
    return Subscriptions.create(() -> {
        try {
            connection.close();
        } catch (Exception e) {
            log.error("Can't close", e);
        }
    });
}

Die JMS-API bietet zwei Möglichkeiten, Nachrichten von einem Broker zu empfangen: synchron über die blockierende Methode receive() und nicht blockierend über MessageListener. Die nicht blockierende API ist aus vielen Gründen vorteilhaft: Sie benötigt zum Beispiel weniger Ressourcen wie Threads und Stack-Speicher. Außerdem passt sie hervorragend zum Rx-Programmierstil. Anstatt eine Instanz von MessageListener zu erstellen und unseren Abonnenten von dort aus aufzurufen, können wir diese knappe Syntax mit Methodenreferenz verwenden:

consumer.setMessageListener(subscriber::onNext)

Außerdem müssen wir uns um die Bereinigung von Ressourcen und die richtige Fehlerbehandlung kümmern. Diese winzige Transformationsschicht ermöglicht es uns, JMS-Nachrichten einfach zu konsumieren, ohne uns um API-Interna zu kümmern. Hier ein Beispiel mit dem beliebten ActiveMQ Messaging Broker, der lokal läuft:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

ConnectionFactory connectionFactory =
    new ActiveMQConnectionFactory("tcp://localhost:61616");
Observable<String> txtMessages =
        observe(connectionFactory, new ActiveMQTopic("orders"))
        .cast(TextMessage.class)
        .flatMap(m -> {
            try {
                return Observable.just(m.getText());
            } catch (JMSException e) {
                return Observable.error(e);
            }
        });

JMS hat genau wie JDBC den Ruf, stark geprüfte JMSException zu verwenden, selbst beim Aufruf von getText() auf einem TextMessage. Um Fehler richtig zu behandeln (siehe "Fehlerbehandlung" für weitere Details), verwenden wir flatMap() und wrap exceptions. Ab diesem Punkt kannst du JMS-Nachrichten wie jeden anderen asynchronen und nicht blockierenden Stream behandeln. Übrigens haben wir den cast() Operator verwendet, der Upstream-Ereignisse optimistisch auf einen bestimmten Typ castet und ansonsten mit onError() fehlschlägt.cast() ist im Grunde ein spezialisierter map() Operator, der sich wie map(x -> (TextMessage)x) verhält.

Regelmäßige Abfrage nach Änderungen

Die schlechteste blockierende API, mit der du arbeiten kannst, erfordert eine Abfrage nach Änderungen. Sie bietet keinen Mechanismus, um Änderungen direkt an dich weiterzugeben, auch nicht mit Callbacks oder durch unendliches Blockieren. Der einzige Mechanismus, den diese API bietet, ist die Abfrage des aktuellen Zustands, und es liegt an dir, herauszufinden, ob er sich vom vorherigen Zustand unterscheidet oder nicht. RxJava hat ein paar wirklich mächtige Operatoren, die du anwenden kannst, um eine gegebene API auf Rx-Stil umzurüsten. Der erste Fall, den ich dir vorstellen möchte, ist eine einfache Methode, die einen einzelnen Wert liefert, der den Zustand repräsentiert, zum Beispiellong getOrderBookLength(). Um Änderungen zu verfolgen, müssen wir diese Methode häufig genug aufrufen und Unterschiede erfassen. Das kannst du in RxJava mit einer sehr einfachen Operator-Komposition erreichen:

Observable
        .interval(10, TimeUnit.MILLISECONDS)
        .map(x -> getOrderBookLength())
        .distinctUntilChanged()

Zunächst erzeugen wir alle 10 Millisekunden einen synthetischen long Wert, der als tickender Zähler dient. Für jeden solchen Wert (d.h. alle 10 Millisekunden) rufen wir getOrderBookLength() auf. Die oben genannte Methode ändert sich jedoch nicht so oft, und wir wollen unsere Abonnenten nicht mit vielen irrelevanten Zustandsänderungen überfluten. Zum Glück können wir einfach distinctUntilChanged() sagen und RxJava überspringt transparent die von getOrderBookLength() zurückgegebenen longWerte, die sich seit dem letzten Aufruf nicht geändert haben, wie das folgende Marmordiagramm zeigt:

distinct

Wir können dieses Muster sogar noch weiter anwenden. Stell dir vor, du beobachtest, ob sich das Dateisystem oder die Datenbanktabelle ändert. Der einzige Mechanismus, der dir zur Verfügung steht, ist die Erstellung eines aktuellen Schnappschusses der Dateien oder Datenbankeinträge. Du baust eine API auf, die die Kunden über jedes neue Element informiert. Natürlich kannst du auch java.nio.file.WatchService oder Datenbank-Trigger verwenden, aber nimm dieses Beispiel als Lehrbeispiel. Auch dieses Mal beginnen wir damit, regelmäßig einen Schnappschuss des aktuellen Zustands zu machen:

Observable<Item> observeNewItems() {
    return Observable
            .interval(1, TimeUnit.SECONDS)
            .flatMapIterable(x -> query())
            .distinct();
}

List<Item> query() {
    //take snapshot of file system directory
    //or database table
}

Der Operator distinct() speichert alle Elemente, die ihn durchlaufen haben (siehe auch "Duplikate mit distinct() und distinctUntilChanged() ausschließen"). Wenn derselbe Eintrag zum zweiten Mal auftaucht, wird er einfach ignoriert. Deshalb können wir die gleiche Liste von Items jede Sekunde pushen. Beim ersten Mal werden sie an alle Abonnenten weitergereicht. Wenn jedoch genau dieselbe Liste eine Sekunde später erscheint, wurden alle Elemente bereits gesehen und werden daher verworfen. Wenn die von query() zurückgegebene Liste zu einem bestimmten Zeitpunkt ein zusätzlichesItem enthält, lässt distinct() es durchgehen, verwirft es aber beim nächsten Mal. Dieses einfache Muster ermöglicht es uns, eine Reihe von Thread.sleep() Aufrufen und manuellem Caching durch periodisches Polling zu ersetzen.Es ist in vielen Bereichen anwendbar, z. B. beim FTP-Polling (File Transfer Protocol), Web Scraping usw.

Multithreading in RxJava

Es gibt APIs von Drittanbietern, die blockieren und wir können einfach nichts dagegen tun. Vielleicht haben wir keinen Quellcode, und das Umschreiben wäre zu riskant. In diesem Fall müssen wir lernen, mit blockierendem Code umzugehen, anstatt ihn zu bekämpfen.

Eines der Markenzeichen von RxJava ist die deklarative Gleichzeitigkeit, im Gegensatz zur imperativen Gleichzeitigkeit. Das manuelle Erstellen und Verwalten von Threads gehört der Vergangenheit an (vgl. "Thread Pool of Connections"). Die meisten von uns verwenden bereits verwaltete Thread Pools (z. B. mit ExecutorService). Aber RxJava geht noch einen Schritt weiter: Observable kann genau wie CompletableFuture in Java 8 nonblocking sein (vgl. "CompletableFuture und Streams"), aber im Gegensatz zu dem anderen ist es auch lazy. Solange du nicht abonnierst, führt ein braves Observable keine Aktion aus. Aber die Leistung von Observable geht noch darüber hinaus.

Eine asynchrone Observable ist diejenige, die deine Subscribers Callback-Methoden (wie onNext()) von einem anderen Thread aus aufruft. Erinnerst du dich an "Mastering Observable.create()", in dem wir untersucht haben, wann subscribe() blockiert und wartet, bis alle Benachrichtigungen ankommen? Im wirklichen Leben kommen die meisten Observables aus Quellen, die von Natur aus asynchron sind. Kapitel 5 ist ganz solchen Observables gewidmet. Aber auch unser einfaches JMS-Beispiel aus "Replacing Callbacks with Streams", das eine eingebaute, nicht blockierende API aus der JMS-Spezifikation verwendet (MessageListener interface). Dies wird vom Typsystem nicht erzwungen oder vorgeschlagen, aber viele Observables sind von Anfang an asynchron, und du solltest davon ausgehen. Eine blockierende subscribe() Methode kommt sehr selten vor, wenn ein Lambda innerhalb von Observable.create() nicht durch einen asynchronen Prozess oder Stream unterstützt wird. Standardmäßig (mit create()) geschieht jedoch alles im Client-Thread (demjenigen, der abonniert wurde). Wenn du onNext() direkt in deinem create() Callback aufrufst, ist kein Multithreading und keine Nebenläufigkeit im Spiel.

Wenn wir auf eine solche ungewöhnliche Observable stoßen, können wir deklarativ die so genannte Scheduler auswählen, die für die Ausgabe von Werten verwendet wird. Im Fall von CompletableFuture haben wir keine Kontrolle über die zugrunde liegenden Threads, die API hat die Entscheidung getroffen und im schlimmsten Fall ist es unmöglich, sie außer Kraft zu setzen. RxJava trifft solche Entscheidungen selten allein und wählt den sicheren Standard: Client-Thread und kein Multithreading. Für die Zwecke dieses Kapitels werden wir eine ganz einfache Logging-"Bibliothek" verwenden,"4 die eine Nachricht zusammen mit dem aktuellen Thread und der Anzahl der Millisekunden seit dem Start des Programms unter System.currentTimeMillis() ausgibt:

void log(Object label) {
    System.out.println(
        System.currentTimeMillis() - start + "\t| " +
        Thread.currentThread().getName()   + "\t| " +
        label);
}

Was ist ein Zeitplannungsprogramm?

RxJava ist unabhängig von Gleichzeitigkeit und führt von sich aus keine Gleichzeitigkeit ein. Einige Abstraktionen für den Umgang mit Threads sind jedoch für den Endbenutzer sichtbar. Außerdem können bestimmte Operatoren ohne Gleichzeitigkeit nicht richtig funktionieren; siehe "Andere Verwendungen für Zeitplanungsprogramme" für einige von ihnen. Glücklicherweise ist die Klasse Scheduler, die einzige, die du beachten musst, ziemlich einfach. Im Prinzip funktioniert sie ähnlich wie ScheduledExecutorService von java.util.concurrent- sie führt beliebige Codeblöcke aus, möglicherweise in der Zukunft. Um den Rx-Vertrag zu erfüllen, bietet sie jedoch einige feinkörnigere Abstraktionen, die du im erweiterten Abschnitt "Übersicht über die Details der Zeitplannungsprogramm-Implementierung" genauer kennenlernen kannst .

Zeitplanungsprogramme werden zusammen mit den Operatoren subscribeOn() und observeOn() sowie bei der Erstellung bestimmter Typen von Observableverwendet. Ein Zeitplanungsprogramm erstellt nur Instanzen von Workers, die für die Planung und Ausführung von Code zuständig sind. Wenn RxJava einen Code planen muss, bittet es zunächst Scheduler, ein Worker bereitzustellen, und verwendet dieses, um die nachfolgenden Aufgaben zu planen. Du wirst später Beispiele für diese API finden, aber mach dich zunächst mit den verfügbaren eingebauten Zeitplanungsprogrammen vertraut:

Schedulers.newThread()

Dieses Zeitplannungsprogramm startet einfach jedes Mal einen neuen Thread, wenn er über subscribeOn() oder observeOn() angefordert wird. newThread() ist fast nie eine gute Wahl, nicht nur wegen der Latenzzeit beim Starten eines Threads, sondern auch, weil dieser Thread nicht wiederverwendet wird. Im Vorfeld muss Stack-Speicherplatz zugewiesen werden (in der Regel etwa ein Megabyte, was durch den -Xss Parameter der JVM gesteuert wird) und das Betriebssystem muss einen neuen nativen Thread starten. Wenn die Worker fertig ist, wird der Thread einfach beendet. Dieses Zeitplannungsprogramm ist nur dann sinnvoll, wenn die Aufgaben grobkörnig sind: Es braucht viel Zeit, um sie zu erledigen, aber es gibt nur sehr wenige davon, so dass es unwahrscheinlich ist, dass Threads überhaupt wiederverwendet werden. Siehe auch: "Thread per Connection". In der Praxis ist es fast immer die bessere Wahl, Schedulers.io() zu folgen.

Schedulers.io()

Dieses Zeitplannungsprogramm ähnelt newThread(), aber bereits gestartete Threads werden recycelt und können möglicherweise zukünftige Anfragen bearbeiten. Diese Implementierung funktioniert ähnlich wie ThreadPoolExecutor von java.util.concurrent mit einem unbegrenzten Pool von Threads. Jedes Mal, wenn eine neue Worker angefordert wird, wird entweder ein neuer Thread gestartet (und später für einige Zeit inaktiv gehalten) oder der inaktive Thread wird wiederverwendet.

Der Name io() ist nicht zufällig gewählt. Du solltest dieses Zeitplannungsprogramm für I/O-gebundene Aufgaben verwenden, die nur sehr wenig CPU-Ressourcen benötigen. Allerdings benötigen sie oft viel Zeit, weil sie auf das Netzwerk oder die Festplatte warten. Daher ist es eine gute Idee, einen relativ großen Pool von Threads zu haben. Sei jedoch vorsichtig mit unbegrenzten Ressourcen jeglicher Art - im Falle von langsamen oder nicht reagierenden externen Abhängigkeiten wie Webservices könnte das Zeitplannungsprogramm io() eine enorme Anzahl von Threads starten, was dazu führen könnte, dass auch deine eigene Anwendung nicht mehr reagiert. Im Abschnitt " Umgang mit Ausfällen mit Hystrix" erfährst du mehr darüber, wie du dieses Problem lösen kannst.

Schedulers.computation()

Du solltest ein Zeitplannungsprogramm verwenden, wenn Aufgaben ausschließlich CPU-gebunden sind, d. h. sie benötigen Rechenleistung und haben keinen blockierenden Code (Lesen von der Festplatte, Netzwerk, Schlafen, Warten auf eine Sperre usw.). Da jede Aufgabe, die mit diesem Zeitplannungsprogramm ausgeführt wird, einen CPU-Kern voll ausnutzen soll, würde es nicht viel bringen, mehr solcher Aufgaben parallel auszuführen, als Kerne zur Verfügung stehen. Deshalb begrenzt das Zeitplannungsprogramm computation() die Anzahl der parallel ausgeführten Threads standardmäßig auf den Wert von availableProcessors(), der in der Dienstprogrammklasse Runtime.getRuntime() zu finden ist.

Wenn du aus irgendeinem Grund eine andere Anzahl von Threads als die Standardanzahl benötigst, kannst du jederzeit die Systemeigenschaft rx.scheduler.max-computation-threads verwenden. Indem du weniger Threads nimmst, stellst du sicher, dass immer ein oder mehrere CPU-Kerne im Leerlauf sind und dass der computation() Thread-Pool deinen Server auch bei hoher Last nicht sättigt. Es ist nicht möglich, mehr Rechenthreads als Kerne zu haben.

computation() Zeitplannungsprogramm verwendet eine unbegrenzte Warteschlange vor jedem Thread, d.h. wenn die Aufgabe geplant ist, aber alle Kerne belegt sind, werden sie in die Warteschlange gestellt. Bei Lastspitzen hält dieses Zeitplannungsprogramm die Anzahl der Threads begrenzt. Die Warteschlange vor den einzelnen Threads wird jedoch immer größer.

Zum Glück sorgen eingebaute Operatoren, insbesondere observeOn(), die wir in "Deklarative Gleichzeitigkeit mit observeOn()" kennenlernen werden, dafür, dass diese Scheduler nicht überlastet wird.

Schedulers.from(Executor executor)

SchedulerDa die intern komplexer sind als die Executorvon java.util.concurrent, wurde eine eigene Abstraktion benötigt. Da sie aber konzeptionell sehr ähnlich sind, überrascht es nicht, dass es einen Wrapper gibt, der Executor mit der from() Factory-Methode in Scheduler verwandeln kann:

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import rx.Scheduler;
import rx.schedulers.Schedulers;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

//...

ThreadFactory threadFactory = new ThreadFactoryBuilder()
    .setNameFormat("MyPool-%d")
    .build();
Executor executor = new ThreadPoolExecutor(
    10,  //corePoolSize
    10,  //maximumPoolSize
    0L, TimeUnit.MILLISECONDS, //keepAliveTime, unit
    new LinkedBlockingQueue<>(1000),  //workQueue
    threadFactory
);
Scheduler scheduler = Schedulers.from(executor);

Ich verwende absichtlich diese ausführliche Syntax für die Erstellung von ExecutorService und nicht die einfachere Version:

import java.util.concurrent.Executors;

//...

ExecutorService executor = Executors.newFixedThreadPool(10);

So verlockend die Werksklasse Executors auch sein mag, so sind doch einige Vorgaben fest programmiert, die in Unternehmensanwendungen unpraktisch oder sogar gefährlich sind. So verwendet sie zum Beispiel die unbegrenzte Anzahl von LinkedBlockingQueue, die ins Unendliche wachsen kann, was zu OutOfMemoryError führt, wenn eine große Anzahl von Aufgaben ansteht. Außerdem verwendet die Voreinstellung ThreadFactory sinnlose Thread-Namen wie pool-5-thread-3. Die richtige Benennung von Threads ist ein unschätzbares Werkzeug bei der Profilerstellung oder der Analyse von Thread-Dumps. ThreadFactory von Grund auf neu zu implementieren ist etwas umständlich, daher haben wir den ThreadFactoryBuilder von Guava verwendet. Wenn du dich für das Tuning und die richtige Nutzung von Thread Pools interessierst, findest du weitere Informationen unter "Thread Pool von Verbindungen" und "Verwaltung von Ausfällen mit Hystrix". Die Erstellung von Zeitplannungsprogrammen aus Executor, die wir bewusst konfiguriert haben, ist für Projekte mit hoher Last empfehlenswert. Da RxJava jedoch keine Kontrolle über unabhängig erstellte Threads in einem Executor hat, kann es keine Threads pinnen (d.h. versuchen, die Arbeit derselben Aufgabe auf demselben Thread zu halten, um die Cache-Lokalität zu verbessern). Dieses Scheduler sorgt lediglich dafür, dass ein einzelnes Scheduler.Worker (siehe "Übersicht über die Details der Zeitplannungsprogramm-Implementierung") die Ereignisse sequentiell abarbeitet.

Schedulers.immediate()

Schedulers.immediate() ist ein spezielles Zeitplannungsprogramm, das eine Aufgabe im Client-Thread nicht asynchron, sondern blockierend aufruft. Die Verwendung dieses Zeitplannungsprogramms ist sinnlos, es sei denn, ein Teil deiner API erfordert die Bereitstellung eines Zeitplannungsprogramms, während du mit dem Standardverhalten von Observable völlig zufrieden bist, da es kein Threading erfordert. Tatsächlich hat das Abonnieren eines Observable (mehr dazu in einer Sekunde) über immediate() Scheduler in der Regel denselben Effekt wie das Nicht-Abonnieren mit einem bestimmten Zeitplannungsprogramm. Generell solltest du dieses Zeitplannungsprogramm vermeiden, da es den aufrufenden Thread blockiert und nur von begrenztem Nutzen ist.

Schedulers.trampoline()

Das Zeitplannungsprogramm trampoline() ist immediate() sehr ähnlich, da es die Aufgaben ebenfalls im selben Thread plant, also effektiv blockiert. Im Gegensatz zu immediate() wird die nächste Aufgabe jedoch erst dann ausgeführt, wenn alle zuvor geplanten Aufgaben abgeschlossen sind.immediate() ruft eine bestimmte Aufgabe sofort auf, während trampoline() wartet, bis die aktuelle Aufgabe abgeschlossen ist. Trampoline ist ein Muster in der funktionalen Programmierung, das es ermöglicht, Rekursionen zu implementieren, ohne den Aufrufstapel ins Unendliche wachsen zu lassen. Das lässt sich am besten anhand eines Beispiels erklären, das zunächst immediate() betrifft. Beachte übrigens, dass wir nicht direkt mit einer Scheduler Instanz interagieren, sondern zuerst eine Worker erstellen. Das macht Sinn, wie du in der "Übersicht über die Details der Zeitplannungsprogramme" sehen wirst .

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();

log("Main start");
worker.schedule(() -> {
    log(" Outer start");
    sleepOneSecond();
    worker.schedule(() -> {
        log("  Inner start");
        sleepOneSecond();
        log("  Inner end");
    });
    log(" Outer end");
});
log("Main end");
worker.unsubscribe();

Die Ausgabe ist wie erwartet; du könntest schedule() tatsächlich durch einen einfachen Methodenaufruf ersetzen:

1044    | main  | Main start
1094    | main  |  Outer start
2097    | main  |   Inner start
3097    | main  |   Inner end
3100    | main  |  Outer end
3100    | main  | Main end

Innerhalb des Outer Blocks schedule() Inner Blocks, der sofort aufgerufen wird und die Outer Aufgabe unterbricht. Wenn Inner fertig ist, geht die Kontrolle zurück an Outer. Auch dies ist einfach eine verschlungene Art, eine Aufgabe indirekt über immediate() Scheduler aufzurufen und zu blockieren. Aber was passiert, wenn wir Schedulers.immediate() durch Schedulers.trampoline() ersetzen? Die Ausgabe ist ganz anders:

1030    | main  | Main start
1096    | main  |  Outer start
2101    | main  |  Outer end
2101    | main  |   Inner start
3101    | main  |   Inner end
3101    | main  | Main end

Siehst du, wie Outer es schafft, fertig zu werden, bevor Inner überhaupt startet? Das liegt daran, dass die Aufgabe Inner in der Warteschlange von trampoline() Scheduler stand, die bereits von der Aufgabe Outer belegt war. Als Outer fertig war, begann die erste Aufgabe aus der Warteschlange (Inner). Wir können sogar noch weiter gehen, um sicherzustellen, dass du den Unterschied verstehst:

log("Main start");
worker.schedule(() -> {
    log(" Outer start");
    sleepOneSecond();
    worker.schedule(() -> {
        log("  Middle start");
        sleepOneSecond();
        worker.schedule(() -> {
            log("   Inner start");
            sleepOneSecond();
            log("   Inner end");
        });
        log("  Middle end");
    });
    log(" Outer end");
});
log("Main end");

Die Worker von immediate() Scheduler gibt folgendes aus:

1029    | main  | Main start
1091    | main  |  Outer start
2093    | main  |   Middle start
3095    | main  |    Inner start
4096    | main  |    Inner end
4099    | main  |   Middle end
4099    | main  |  Outer end
4099    | main  | Main end

Gegen den trampoline() Arbeiter:

1041    | main  | Main start
1095    | main  |  Outer start
2099    | main  |  Outer end
2099    | main  |   Middle start
3101    | main  |   Middle end
3101    | main  |    Inner start
4102    | main  |    Inner end
4102    | main  | Main end
Schedulers.test()

Dieses Scheduler wird nur zu Testzwecken verwendet und du wirst es nie im Produktionscode sehen. Sein Hauptvorteil ist die Möglichkeit, die Uhr beliebig vorzustellen und so zu simulieren, dass die Zeit vergeht. TestScheduler wird in "Schedulers in Unit Testing" ausführlich beschrieben . Schedulers allein sind nicht sehr interessant. Wenn du wissen willst, wie sie intern funktionieren und wie du deine eigenen implementieren kannst, schau dir den nächsten Abschnitt an.

Übersicht über die Details des Zeitplannungsprogramms

Hinweis

Dieser Abschnitt ist völlig optional. Du kannst direkt zu "Declarative Subscription with subscribeOn()" springen, wenn du nicht an den Implementierungsdetails interessiert bist.

Scheduler entkoppelt nicht nur die Aufgaben und ihre Ausführung (in der Regel, indem sie in einem anderen Thread ausgeführt werden), sondern abstrahiert auch die Uhr, wie wir in "Virtual Time" lernen werden . Die API von Scheduler ist etwas einfacher als z. B. die von ScheduledExecutorService:

abstract class Scheduler {
    abstract Worker createWorker();

    long now();

    abstract static class Worker implements Subscription {

        abstract Subscription schedule(Action0 action);

        abstract Subscription schedule(Action0 action,
                             long delayTime, TimeUnit unit);

        long now();
    }
}

Wenn RxJava eine Aufgabe planen will (vermutlich, aber nicht unbedingt im Hintergrund), muss es zunächst eine Instanz von Worker anfordern. Die Worker ermöglicht es, die Aufgabe ohne Verzögerung oder zu einem bestimmten Zeitpunkt zu planen. Sowohl Scheduler als auch Worker haben eine überschreibbare Zeitquelle (Methodenow() ), die sie verwenden, um zu bestimmen, wann eine bestimmte Aufgabe ausgeführt werden soll. Naiv betrachtet kannst du dir Scheduler wie einen Thread-Pool und Worker wie einen Thread innerhalb dieses Pools vorstellen.

Die Trennung zwischen Scheduler und Worker ist notwendig, um einige der Richtlinien des Rx-Vertrages einfach umzusetzen, nämlich die Methode von Subscribernacheinander und nicht gleichzeitig aufzurufen. WorkerDer Rx-Vertrag sieht genau das vor: Zwei Aufgaben, die auf demselben Worker geplant sind, werden niemals gleichzeitig ausgeführt. Unabhängige Workers desselben Scheduler können jedoch problemlos Aufgaben gleichzeitig ausführen.

Anstatt die API durchzugehen, wollen wir den Quellcode eines bestehenden Scheduler analysieren, nämlich HandlerScheduler, das im RxAndroid-Projekt zu finden ist. Dieses Scheduler führt einfach alle geplanten Aufgaben in einem Android UI-Thread aus. Die Aktualisierung der Benutzeroberfläche ist nur von diesem Thread aus möglich (weitere Informationen findest du unter "Android-Entwicklung mit RxJava" ). Das ist ähnlich wie der Event Dispatch Thread (EDT) in Swing, wo die meisten Aktualisierungen von Fenstern und Komponenten in einem eigenen Thread (EDT) ausgeführt werden müssen. Es überrascht nicht, dass es auch das RxSwing5 Projekt für diese Aufgabe.

Der folgende Codeschnipsel ist eine abgespeckte und unvollständige Klasse aus RxAndroid, die nur zu Schulungszwecken dient:

package rx.android.schedulers;

import android.os.Handler;
import android.os.Looper;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.TimeUnit;

public final class SimplifiedHandlerScheduler extends Scheduler {

    @Override
    public Worker createWorker() {
        return new HandlerWorker();
    }

    static class HandlerWorker extends Worker {

        private final Handler handler = new Handler(Looper.getMainLooper());

        @Override
        public void unsubscribe() {
            //Implementation coming soon...
        }

        @Override
        public boolean isUnsubscribed() {
            //Implementation coming soon...
            return false;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }

        @Override
        public Subscription schedule(
        Action0 action, long delayTime, TimeUnit unit) {
            ScheduledAction scheduledAction = new ScheduledAction(action);
            handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

            scheduledAction.add(Subscriptions.create(() ->
                    handler.removeCallbacks(scheduledAction)));

            return scheduledAction;
        }
    }
}

Die Details der Android-API sind im Moment nicht wichtig. Was hier passiert, ist, dass jedes Mal, wenn wir etwas auf HandlerWorker planen, der Codeblock an eine spezielle postDelayed() Methode übergeben wird, die ihn in einem speziellen Android-Thread ausführt. Es gibt nur einen solchen Thread, so dass Ereignisse nicht nur innerhalb, sondern auch über Workerhinaus serialisiert werden.

Bevor wir action zur Ausführung übergeben, wickeln wir es mit ScheduledAction ein, das sowohl Runnable als auch Subscription implementiert. RxJava ist faul, wann immer es geht - das gilt auch für das Planen von Aufgaben. Wenn du aus irgendeinem Grund beschließt, dass eine bestimmte action doch nicht ausgeführt werden soll (das macht Sinn, wenn die Aktion in der Zukunft und nicht sofort geplant wurde), führe einfach unsubscribe() auf dem von schedule() zurückgegebenen Subscription aus. Es liegt in der Verantwortung von Worker, die Abmeldung ordnungsgemäß zu handhaben (zumindest so gut es geht).

Der Client-Code kann auch beschließen, unsubscribe() von Worker vollständig zu verlassen. Dadurch werden alle Aufgaben in der Warteschlange abbestellt und der Worker freigegeben, so dass der zugrunde liegende Thread später wieder verwendet werden kann. Der folgende Codeschnipsel erweitert die SimplifiedHandlerScheduler um den Worker Abmeldefluss (nur die geänderten Methoden sind enthalten):

private CompositeSubscription compositeSubscription =
    new CompositeSubscription();

@Override
public void unsubscribe() {
    compositeSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
    return compositeSubscription.isUnsubscribed();
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    if (compositeSubscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }

    final ScheduledAction scheduledAction = new ScheduledAction(action);
    scheduledAction.addParent(compositeSubscription);
    compositeSubscription.add(scheduledAction);

    handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

    scheduledAction.add(Subscriptions.create(() ->
            handler.removeCallbacks(scheduledAction)));

    return scheduledAction;
}

In "Steuerung von Listenern mit Hilfe von Subscription und Subscriber<T>" haben wir die Schnittstelle Subscription erkundet, uns aber nie wirklich mit den Implementierungsdetails befasst. CompositeSubscription ist eine von vielen verfügbaren Implementierungen, die selbst nur ein Container für untergeordnete Subscriptions ist (ein Composite-Designmuster ). Wenn du dich von CompositeSubscription abmeldest, meldest du dich von allen Kindern ab. Du kannst auch die von CompositeSubscription verwalteten Kinder hinzufügen und entfernen.

In unserem benutzerdefinierten Scheduler wird CompositeSubscription verwendet, um alle Subscriptions von den vorherigen schedule() Aufrufen zu verfolgen (siehe compositeSubscription.add(scheduledAction)). Andererseits muss das Kind ScheduledAction über sein Elternteil Bescheid wissen (siehe: addParent()), damit es sich selbst entfernen kann, wenn die Aktion abgeschlossen oder abgebrochen wurde. Andernfalls würde Worker für immer veraltete Child Subscriptions anhäufen. Wenn der Kundencode beschließt, dass er eine Instanz von HandlerWorker nicht mehr benötigt, meldet er sich von ihr ab. Die Abmeldung wird an alle ausstehenden untergeordneten Instanzen von Subscriptionweitergegeben (falls vorhanden).

Das war eine sehr kurze Einführung in Schedulers in RxJava. Die Details ihrer Interna sind für die tägliche Arbeit nicht so nützlich; sie sind vielmehr so konzipiert, dass die Verwendung von RxJava intuitiver und vorhersehbarer wird. Trotzdem wollen wir uns kurz ansehen, wie Schedulers viele Gleichzeitigkeitsprobleme in Rx lösen.

Deklaratives Abonnement mit subscribeOn()

In "Mastering Observable.create()" haben wir gesehen, dass subscribe() standardmäßig den Client-Thread verwendet. Um es noch einmal zusammenzufassen: Hier ist das einfachste Abonnement, das du dir ausdenken kannst, bei dem kein Threading im Spiel war:

Observable<String> simple() {
    return Observable.create(subscriber -> {
        log("Subscribed");
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onCompleted();
    });
}

//...

log("Starting");
final Observable<String> obs = simple();
log("Created");
final Observable<String> obs2 = obs
        .map(x -> x)
        .filter(x -> true);
log("Transformed");
obs2.subscribe(
        x -> log("Got " + x),
        Throwable::printStackTrace,
        () -> log("Completed")
);
log("Exiting");

Beachte, wo die Logging-Anweisungen platziert sind, und sieh dir die Ausgabe genau an, insbesondere im Hinblick darauf, welcher Thread die Druckanweisung aufgerufen hat:

33  | main  | Starting
120 | main  | Created
128 | main  | Transformed
133 | main  | Subscribed
133 | main  | Got A
133 | main  | Got B
133 | main  | Completed
134 | main  | Exiting

Pass auf: Die Reihenfolge der Anweisungen ist absolut vorhersehbar. Erstens läuft jede Codezeile im vorangegangenen Codeschnipsel im Thread main, es gibt keine Thread-Pools und keine asynchrone Ausgabe von Ereignissen. Zweitens ist die Reihenfolge der Ausführung auf den ersten Blick vielleicht nicht ganz klar.

Wenn das Programm startet, gibt es Starting aus, was verständlich ist. Nachdem wir eine Instanz von Observable<String> erstellt haben, sehen wir die Nachricht Created. Beachte, dass Subscribed erst später erscheint, wenn wir uns tatsächlich anmelden. Ohne den Aufruf von subscribe() wird der Codeblock in Observable.create() nie ausgeführt. Außerdem haben auch die Operatoren map() und filter() keine sichtbaren Nebeneffekte. Beachte, dass die Nachricht Transformed noch vor Subscribed ausgegeben wird.

Später erhalten wir alle ausgegebenen Ereignisse und die Fertigstellungsmeldung. Schließlich wird die Anweisung Exiting gedruckt und das Programm kann zurückkehren. Das ist eine interessante Beobachtung -subscribe() sollte einen Callback registrieren, wenn Ereignisse asynchron auftreten. Das ist die Annahme, die du standardmäßig machen solltest. In diesem Fall gibt es jedoch kein Threading und subscribe() ist tatsächlich blockierend. Warum ist das so?

Es gibt eine inhärente, aber versteckte Verbindung zwischen subscribe() und create(). Jedes Mal, wenn du subscribe() auf Observable aufrufst, wird die Callback-Methode von OnSubscribe aufgerufen (wobei der Lambda-Ausdruck, den du an create() übergeben hast, verpackt wird). Sie erhält deine Subscriber als Argument. Standardmäßig geschieht dies im selben Thread und ist blockierend, so dass alles, was du innerhalb von create() tust, subscribe() blockiert. Wenn deine Methode create() ein paar Sekunden schläft, wird subscribe() blockiert. Wenn zwischen Observable.create() und deiner Subscriber Methode (Lambda, das als Callback fungiert) Operatoren stehen, werden diese Operatoren im Namen des Threads aufgerufen, der subscribe() aufgerufen hat. RxJava fügt standardmäßig keine Gleichzeitigkeitsfunktionen zwischen Observable und Subscriber ein. Der Grund dafür ist, dass Observablein der Regel durch andere Gleichzeitigkeitsmechanismen wie Ereignisschleifen oder benutzerdefinierte Threads unterstützt wird, so dass Rx dir die volle Kontrolle überlässt und keine Konventionen vorschreibt.

Diese Beobachtung bereitet die Landschaft für den subscribeOn() Operator vor. Indem du subscribeOn() irgendwo zwischen Observable und subscribe() einfügst, wählst du deklarativ Scheduler aus, wo die Callback-Methode OnSubscribe aufgerufen wird. Egal, was du innerhalb von create() tust, diese Arbeit wird auf eine unabhängige Scheduler verlagert und dein subscribe() Aufruf blockiert nicht mehr:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .subscribeOn(schedulerA)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");
35  | main  | Starting
112 | main  | Created
123 | main  | Exiting
123 | Sched-A-0 | Subscribed
124 | Sched-A-0 | Got A
124 | Sched-A-0 | Got B
124 | Sched-A-0 | Completed

Siehst du, dass der Thread main beendet wird, bevor Observable überhaupt mit der Ausgabe von Werten beginnt? Technisch gesehen ist die Reihenfolge der Logmeldungen nicht mehr so vorhersehbar, weil zwei Threads gleichzeitig laufen: main Der Thread Sched-A-0, der sich angemeldet hat und aussteigen will, und der Thread , der Ereignisse sendet, sobald sich jemand angemeldet hat. Sowohl der schedulerA als auch der Sched-A-0 Thread stammen aus den folgenden Zeitplannungsprogrammen, die wir zur Veranschaulichung erstellt haben:

import static java.util.concurrent.Executors.newFixedThreadPool;


ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);

ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);

ExecutorService poolC = newFixedThreadPool(10, threadFactory("Sched-C-%d"));
Scheduler schedulerC = Schedulers.from(poolC);

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
        .setNameFormat(pattern)
        .build();
}

Diese Zeitplannungsprogramme werden in allen Beispielen verwendet, aber sie sind relativ leicht zu erinnern. Drei unabhängige Zeitplannungsprogramme, die jeweils 10 Threads von einem ExecutorService verwalten. Um die Ausgabe übersichtlicher zu gestalten, hat jeder Thread-Pool ein eigenes Namensmuster.

Bevor wir beginnen, musst du verstehen, dass subscribeOn() in ausgereiften Anwendungen nur sehr selten verwendet wird. Normalerweise kommen Observables aus Quellen, die von Natur aus asynchron sind (wie RxNetty, siehe "Nonblocking HTTP Server with Netty and RxNetty") oder von sich aus Scheduling anwenden (wie Hystrix, siehe "Managing Failures with Hystrix"). Du solltest subscribeOn() nur in besonderen Fällen behandeln, wenn das zugrunde liegende Observable bekanntlich synchron ist (create() ist blockierend). Allerdings ist subscribeOn() immer noch eine viel bessere Lösung als das handgefertigte Threading innerhalb von create():

//Don't do this
Observable<String> obs = Observable.create(subscriber -> {
    log("Subscribed");
    Runnable code = () -> {
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onCompleted();
    };
    new Thread(code, "Async").start();
});

Der obige Code vermischt zwei Konzepte: die Produktion von Ereignissen und die Wahl der Gleichzeitigkeitsstrategie. Observable sollte nur für die Produktionslogik zuständig sein, während nur der Client-Code eine vernünftige Entscheidung über die Gleichzeitigkeit treffen kann. Erinnere dich daran, dass Observable faul, aber auch unveränderlich ist, in dem Sinne, dass subscribeOn() nur nachgeschaltete Abonnenten betrifft. Wenn jemand genau dieselbe Observable ohne subscribeOn() dazwischen abonniert, wird standardmäßig keine Gleichzeitigkeit involviert sein.

Vergiss nicht, dass wir uns in diesem Kapitel auf bestehende Anwendungen konzentrieren und RxJava schrittweise einführen. Der subscribeOn() Operator ist unter diesen Umständen sehr nützlich; sobald du jedoch reaktive Erweiterungen verstehst und sie in großem Umfang einsetzt, nimmt der Wert von subscribeOn() ab. In vollständig reaktiven Software-Stacks, wie sie z. B. bei Netflix zu finden sind, wird subscribeOn() fast nie verwendet, obwohl alle Observables asynchron sind. Die meisten Observables stammen aus asynchronen Quellen und werden standardmäßig als asynchron behandelt. Daher wird subscribeOn() nur sehr begrenzt verwendet, meist bei der Nachrüstung bestehender APIs oder Bibliotheken. In Kapitel 5 schreiben wir wirklich asynchrone Anwendungen ganz ohne explizite subscribeOn() und Schedulers.

subscribeOn() Gleichzeitigkeit und Verhalten

Es gibt mehrere Nuancen, wie subscribeOn() funktioniert. Zunächst einmal sollte sich der neugierige Leser fragen, was passiert, wenn zwei Aufrufe von subscribeOn() zwischen Observable und subscribe() erscheinen. Die Antwort ist einfach: subscribeOn(), das dem ursprünglichen Observable am nächsten liegt , gewinnt. Das hat wichtige praktische Auswirkungen. Wenn du eine API entwirfst und intern subscribeOn() verwendest, hat der Client-Code keine Möglichkeit, die Scheduler deiner Wahl zu überschreiben. Das kann eine bewusste Designentscheidung sein; schließlich weiß der API-Designer vielleicht am besten, welche Scheduler geeignet ist. Andererseits ist es immer eine gute Idee, eine überladene Version der besagten API bereitzustellen, die es erlaubt, die gewählte Scheduler zu überschreiben.

Lass uns untersuchen, wie sich subscribeOn() verhält:

log("Starting");
Observable<String> obs = simple();
log("Created");
obs
        .subscribeOn(schedulerA)
        //many other operators
        .subscribeOn(schedulerB)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

Die Ausgabe zeigt nur die Threads von schedulerAan:

17  | main  | Starting
73  | main  | Created
83  | main  | Exiting
84  | Sched-A-0 | Subscribed
84  | Sched-A-0 | Got A
84  | Sched-A-0 | Got B
84  | Sched-A-0 | Completed

Interessanterweise wird das Abonnieren von schedulerB nicht vollständig zugunsten von schedulerA ignoriert.schedulerB wird zwar immer noch für eine kurze Zeitspanne verwendet, aber es plant kaum neue Aktionen auf schedulerA ein, das die ganze Arbeit erledigt. So werden mehrere subscribeOn() nicht nur ignoriert, sondern verursachen auch einen geringen Overhead.

Apropos Operatoren: Wir haben gesagt, dass die Methode create(), die verwendet wird, wenn es ein neues Subscriber gibt, innerhalb des bereitgestellten Zeitplannungsprogramms (falls vorhanden) ausgeführt wird. Aber welcher Thread führt all diese Umwandlungen zwischen create() und subscribe() aus? Wir wissen bereits, dass alle Operatoren standardmäßig in demselben Thread (Zeitplannungsprogramm) ausgeführt werden, also keine Gleichzeitigkeit besteht:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(this::log)
        .map(x -> x + '1')
        .doOnNext(this::log)
        .map(x -> x + '2')
        .subscribeOn(schedulerA)
        .doOnNext(this::log)
        .subscribe(
                x -> log("Got " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

Wir haben die Pipeline der Operatoren gelegentlich mit doOnNext() bestreut, um zu sehen, welcher Thread an dieser Stelle die Kontrolle hat. Erinnere dich daran, dass die Position von subscribeOn() nicht relevant ist, sie kann direkt nach Observable oder kurz vor subscribe() sein. Das Ergebnis ist nicht überraschend:

20  | main  | Starting
104 | main  | Created
123 | main  | Exiting
124 | Sched-A-0 | Subscribed
124 | Sched-A-0 | A
124 | Sched-A-0 | A1
124 | Sched-A-0 | A12
124 | Sched-A-0 | Got A12
124 | Sched-A-0 | B
124 | Sched-A-0 | B1
124 | Sched-A-0 | B12
125 | Sched-A-0 | Got B12

Beobachte, wie create() aufgerufen wird und die Ereignisse A und B erzeugt. Diese Ereignisse durchlaufen nacheinander den Thread des Zeitplannungsprogramms und erreichen schließlich Subscriber. Viele RxJava-Neulinge glauben, dass die Verwendung eines Scheduler mit einer großen Anzahl von Threads die gleichzeitige Verarbeitung von Ereignissen automatisch aufspaltet und am Ende alle Ergebnisse irgendwie zusammenführt. Das ist aber nicht der Fall. RxJava erstellt eine einzige Worker Instanz (siehe: "Übersicht über die Details der Zeitplannungsprogramm-Implementierung") für die gesamte Pipeline, vor allem um eine sequenzielle Verarbeitung der Ereignisse zu gewährleisten.

Das bedeutet, dass, wenn einer deiner Operatoren besonders langsam ist - z. B. map(), der Daten von der Festplatte liest, um vorbeiziehende Ereignisse umzuwandeln - dieser kostspielige Vorgang im selben Thread aufgerufen wird. Ein einziger fehlerhafter Operator kann die gesamte Pipeline verlangsamen, von der Produktion bis zum Verbrauch. Das ist in RxJava nicht üblich. Operatoren sollten nicht blockierend, schnell und so rein wie möglich sein.

Auch hier kommt flatMap() zur Rettung. Anstatt innerhalb von map() zu blockieren, können wir flatMap() aufrufen und alle Ergebnisse asynchron sammeln. Daher sind flatMap() und merge() die Operatoren, wenn wir echte Parallelität erreichen wollen. Aber auch mit flatMap() ist es nicht offensichtlich. Stell dir einen Lebensmittelladen vor (nennen wir ihn "RxGroceries"), der eine API für den Einkauf von Waren anbietet:

class RxGroceries {

    Observable<BigDecimal> purchase(String productName, int quantity) {
        return Observable.fromCallable(() ->
            doPurchase(productName, quantity));
    }

    BigDecimal doPurchase(String productName, int quantity) {
        log("Purchasing " + quantity + " " + productName);
        //real logic here
        log("Done " + quantity + " " + productName);
        return priceForProduct;
    }

}

Natürlich ist die Implementierung von doPurchase() hier irrelevant, stell dir einfach vor, dass sie einige Zeit und Ressourcen in Anspruch nimmt. Wir simulieren die Geschäftslogik, indem wir einen künstlichen Schlaf von einer Sekunde hinzufügen, der etwas höher ist, wenn quantity größer ist. Blockierende Observables wie die, die von purchase() zurückgegeben wird, sind in einer realen Anwendung ungewöhnlich, aber wir wollen es zu Lehrzwecken so belassen. Beim Kauf mehrerer Waren möchten wir so viel wie möglich parallelisieren und am Ende den Gesamtpreis für alle Waren berechnen. Der erste Versuch ist erfolglos:

Observable<BigDecimal> totalPrice = Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .subscribeOn(schedulerA)  //BROKEN!!!
        .map(prod -> rxGroceries.doPurchase(prod, 1))
        .reduce(BigDecimal::add)
        .single();

Das Ergebnis ist korrekt, es ist ein Observable mit nur einem einzigen Wert: dem Gesamtpreis, der mit reduce() berechnet wurde. Für jedes Produkt rufen wir doPurchase() mit quantity einmal auf. Obwohl schedulerA von einem Thread-Pool mit 10 Threads unterstützt wird, ist der Code vollständig sequentiell:

144  | Sched-A-0 | Purchasing 1 bread
1144 | Sched-A-0 | Done 1 bread
1146 | Sched-A-0 | Purchasing 1 butter
2146 | Sched-A-0 | Done 1 butter
2146 | Sched-A-0 | Purchasing 1 milk
3147 | Sched-A-0 | Done 1 milk
3147 | Sched-A-0 | Purchasing 1 tomato
4147 | Sched-A-0 | Done 1 tomato
4147 | Sched-A-0 | Purchasing 1 cheese
5148 | Sched-A-0 | Done 1 cheese

Beachte, wie jedes Produkt die nachfolgenden Produkte von der Verarbeitung abhält. Wenn der Kauf von Brot abgeschlossen ist, beginnt die Butter sofort, aber nicht früher. Seltsamerweise hilft auch das Ersetzen von map() durch flatMap() nicht, und das Ergebnis ist genau dasselbe:

Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .subscribeOn(schedulerA)
        .flatMap(prod -> rxGroceries.purchase(prod, 1))
        .reduce(BigDecimal::add)
        .single();

Der Code funktioniert nicht gleichzeitig, weil es nur einen einzigen Fluss von Ereignissen gibt, der von vornherein sequenziell ablaufen muss. Andernfalls müsste dein Subscriber gleichzeitige Benachrichtigungen kennen (onNext(), onComplete(), etc.), also ist es ein fairer Kompromiss. Glücklicherweise ist die idiomatische Lösung sehr nahe dran. Die wichtigsten Observable emittierenden Produkte können nicht parallelisiert werden. Wir erstellen jedoch für jedes Produkt ein neues, unabhängiges Observable, wie es von purchase() zurückgegeben wird. Da sie unabhängig sind, können wir jedes einzelne von ihnen sicher gleichzeitig einplanen:

Observable<BigDecimal> totalPrice = Observable
        .just("bread", "butter", "milk", "tomato", "cheese")
        .flatMap(prod ->
                rxGroceries
                        .purchase(prod, 1)
                        .subscribeOn(schedulerA))
        .reduce(BigDecimal::add)
        .single();

Kannst du erkennen, wo subscribeOn() ist? Der Haupt-Thread Observable macht eigentlich nichts, daher ist ein spezieller Thread-Pool unnötig. Allerdings wird jeder Substream, der innerhalb von flatMap() erstellt wird, mit einem schedulerA versorgt. Jedes Mal, wenn subscribeOn() benutzt wird, bekommt Scheduler die Chance, einen neuen Worker und damit einen separaten Thread zu erstellen (was die Sache ein wenig vereinfacht):

113  | Sched-A-1 | Purchasing 1 butter
114  | Sched-A-0 | Purchasing 1 bread
125  | Sched-A-2 | Purchasing 1 milk
125  | Sched-A-3 | Purchasing 1 tomato
126  | Sched-A-4 | Purchasing 1 cheese
1126 | Sched-A-2 | Done 1 milk
1126 | Sched-A-0 | Done 1 bread
1126 | Sched-A-1 | Done 1 butter
1128 | Sched-A-3 | Done 1 tomato
1128 | Sched-A-4 | Done 1 cheese

Endlich haben wir echte Gleichzeitigkeit erreicht. Jeder Kaufvorgang beginnt nun zur gleichen Zeit und alle werden schließlich abgeschlossen. Der flatMap() Operator ist sorgfältig konzipiert und implementiert, damit er alle Ereignisse aus allen unabhängigen Streams sammelt und sie nacheinander weiterleitet. Wie wir jedoch bereits in "Reihenfolge der Ereignisse nach flatMap()" gelernt haben , können wir uns nicht mehr auf die Reihenfolge der nachgelagerten Ereignisse verlassen - sie beginnen und enden nicht in der gleichen Reihenfolge, in der sie ausgesendet wurden (die ursprüngliche Reihenfolge begann bei Brot). Wenn die Ereignisse den reduce() Operator erreichen, sind sie bereits sequenziell und verhalten sich gut.

Inzwischen solltest du dich langsam vom klassischen Thread Modell lösen und verstehen, wie Schedulers funktioniert. Aber wenn dir das schwerfällt, hier ist eine einfache Analogie:

  • Observable ohne Scheduler funktioniert wie ein Single-Thread-Programm mit blockierenden Methodenaufrufen, die untereinander Daten austauschen.

  • Observable mit einem einzigen subscribeOn() ist wie das Starten einer großen Aufgabe im Hintergrund Thread. Das Programm in diesem Thread ist zwar immer noch sequentiell, aber zumindest läuft es im Hintergrund.

  • Observable Die Verwendung von flatMap(), bei der jeder interne Observable subscribeOn() hat, funktioniert wie ForkJoinPool von java.util.concurrent, wobei jeder Teilstrom eine Gabelung der Ausführung ist und flatMap() eine sichere Join-Phase ist.

Die vorstehenden Tipps gelten natürlich nur für blockierende Observables, die in realen Anwendungen nur selten vorkommen. Wenn deine zugrunde liegenden Observables bereits asynchron sind, ist das Erreichen von Gleichzeitigkeit eine Frage des Verständnisses, wie sie kombiniert werden und wann die Subskription erfolgt. merge() auf zwei Streams wird zum Beispiel beide gleichzeitig abonnieren, während der concat() Operator wartet, bis der erste Stream beendet ist, bevor er den zweiten abonniert.

Batching-Anfragen mit groupBy()

Hast du bemerkt, dass RxGroceries.purchase() productName und quantity nimmt, obwohl die Menge immer eins war? Was wäre, wenn unsere Lebensmittelliste einige Produkte mehrfach enthielte, was auf eine größere Nachfrage hindeutet? Die erste naive Implementierung sendet einfach dieselbe Anfrage, z. B. nach einem Ei, mehrmals und fragt jedes Mal nach einem. Glücklicherweise können wir solche Anfragen deklarativ stapeln, indem wir groupBy()verwenden - und das funktioniert auch mit deklarativer Gleichzeitigkeit:

import org.apache.commons.lang3.tuple.Pair;

Observable<BigDecimal> totalPrice = Observable
    .just("bread", "butter", "egg", "milk", "tomato",
      "cheese", "tomato", "egg", "egg")
    .groupBy(prod -> prod)
    .flatMap(grouped -> grouped
        .count()
        .map(quantity -> {
            String productName = grouped.getKey();
            return Pair.of(productName, quantity);
        }))
    .flatMap(order -> store
        .purchase(order.getKey(), order.getValue())
        .subscribeOn(schedulerA))
    .reduce(BigDecimal::add)
    .single();

Dieser Code ist ziemlich komplex, also gehen wir ihn kurz durch, bevor wir die Ausgabe zeigen. Zuerst gruppieren wir die Produkte einfach nach ihrem Namen, also der Identitätsfunktion prod -> prod. Im Gegenzug erhalten wir eine umständliche Observable<GroupedObservable<String, String>>. Daran gibt es nichts auszusetzen. Als Nächstes erhält flatMap() alle GroupedObservable<String, String>, die alle Produkte mit demselben Namen repräsentieren. So gibt es z.B. ein ["egg", "egg", "egg"] Observable mit einem Schlüssel "egg". Wenn groupBy() eine andere Schlüsselfunktion verwenden würde, wie prod.length(), hätte dieselbe Sequenz einen Schlüssel 3.

An diesem Punkt müssen wir innerhalb von flatMap() ein Observable vom Typ Pair<String, Integer> konstruieren, das jedes einzelne Produkt und seine Menge repräsentiert. Sowohl count() als auch map() geben ein Observable zurück, also passt alles perfekt zusammen. Als zweites erhält flatMap() order vom Typ Pair<String, Integer> und tätigt einen Kauf, diesmal kann die Menge größer sein. Die Ausgabe sieht perfekt aus. Beachte, dass größere Bestellungen etwas langsamer sind, aber es ist immer noch viel schneller als mehrere wiederholte Anfragen:

164  | Sched-A-0 | Purchasing 1 bread
165  | Sched-A-1 | Purchasing 1 butter
166  | Sched-A-2 | Purchasing 3 egg
166  | Sched-A-3 | Purchasing 1 milk
166  | Sched-A-4 | Purchasing 2 tomato
166  | Sched-A-5 | Purchasing 1 cheese
1151 | Sched-A-0 | Done 1 bread
1178 | Sched-A-1 | Done 1 butter
1180 | Sched-A-5 | Done 1 cheese
1183 | Sched-A-3 | Done 1 milk
1253 | Sched-A-4 | Done 2 tomato
1354 | Sched-A-2 | Done 3 egg

Wenn du glaubst, dass dein System auf diese Weise vom Batching profitieren kann, schau dir "Batching and Collapsing Commands" an.

Deklarative Gleichzeitigkeit mit observeOn()

Ob du es glaubst oder nicht, die Gleichzeitigkeit in RxJava kann durch zwei Operatoren beschrieben werden: die bereits erwähnten subscribeOn() und observeOn(). Sie sehen sehr ähnlich aus und sind für Neulinge verwirrend, aber ihre Semantik ist eigentlich ziemlich klar und sinnvoll.

subscribeOn() ermöglicht es, zu wählen, welches Scheduler verwendet wird, um OnSubscribe aufzurufen (Lambda-Ausdruck innerhalb von create()). Daher wird jeglicher Code innerhalb von create() in einen anderen Thread verschoben - zum Beispiel, um ein Blockieren des Hauptthreads zu vermeiden. Umgekehrt steuert observeOn(), welches Scheduler verwendet wird, um nachgelagerte Subscribers aufzurufen, die nach observeOn() auftreten. Zum Beispiel erfolgt der Aufruf von create() im io() Scheduler (über subscribeOn(io())), um ein Blockieren der Benutzeroberfläche zu vermeiden. Die Aktualisierung der Widgets der Benutzeroberfläche muss jedoch im UI-Thread erfolgen (sowohl Swing als auch Android haben diese Einschränkung), also verwenden wir observeOn() zum Beispiel mit AndroidSchedulers.mainThread() vor den Operatoren oder den Abonnenten, die die Benutzeroberfläche ändern. Auf diese Weise können wir ein Scheduler verwenden, um create() und alle Operatoren bis zum ersten observeOn() zu behandeln, aber andere(s), um Transformationen anzuwenden. Das lässt sich am besten anhand eines Beispiels erklären:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(x -> log("Found 1: " + x))
        .observeOn(schedulerA)
        .doOnNext(x -> log("Found 2: " + x))
        .subscribe(
                x -> log("Got 1: " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

observeOn() kommt irgendwo in der Pipeline-Kette vor, und im Gegensatz zu subscribeOn() ist diesmal die Position von observeOn() ziemlich wichtig. Unabhängig davon, welche Scheduler Operatoren oberhalb von observeOn() ausgeführt haben (falls es welche gab), verwenden alle darunter liegenden Operatoren das bereitgestellte Scheduler. In diesem Beispiel gibt es keine subscribeOn(), also wird die Standardeinstellung verwendet (keine Gleichzeitigkeit):

23  | main  | Starting
136 | main  | Created
163 | main  | Subscribed
163 | main  | Found 1: A
163 | main  | Found 1: B
163 | main  | Exiting
163 | Sched-A-0 | Found 2: A
164 | Sched-A-0 | Got 1: A
164 | Sched-A-0 | Found 2: B
164 | Sched-A-0 | Got 1: B
164 | Sched-A-0 | Completed

Alle Operatoren oberhalb von observeOn werden im Client-Thread ausgeführt, was in RxJava der Standard ist. Aber unterhalb von observeOn() werden die Operatoren innerhalb des mitgelieferten Scheduler ausgeführt. Dies wird noch deutlicher, wenn sowohl subscribeOn() als auch mehrere observeOn() innerhalb der Pipeline auftreten:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
        .doOnNext(x -> log("Found 1: " + x))
        .observeOn(schedulerB)
        .doOnNext(x -> log("Found 2: " + x))
        .observeOn(schedulerC)
        .doOnNext(x -> log("Found 3: " + x))
        .subscribeOn(schedulerA)
        .subscribe(
                x -> log("Got 1: " + x),
                Throwable::printStackTrace,
                () -> log("Completed")
        );
log("Exiting");

Kannst du die Ausgabe vorhersagen? Erinnere dich daran, dass alles, was unter observeOn() steht, innerhalb des gelieferten Scheduler ausgeführt wird, natürlich solange, bis ein anderes observeOn() auftaucht. Außerdem kann subscribeOn() überall zwischen Observable und subscribe() vorkommen, aber dieses Mal betrifft es nur die Operatoren bis zum ersten observeOn():

21  | main  | Starting
98  | main  | Created
108 | main  | Exiting
129 | Sched-A-0 | Subscribed
129 | Sched-A-0 | Found 1: A
129 | Sched-A-0 | Found 1: B
130 | Sched-B-0 | Found 2: A
130 | Sched-B-0 | Found 2: B
130 | Sched-C-0 | Found 3: A
130 | Sched-C-0 | Got: A
130 | Sched-C-0 | Found 3: B
130 | Sched-C-0 | Got: B
130 | Sched-C-0 | Completed

Die Subskription erfolgt in schedulerA, weil wir das in subscribeOn() angegeben haben. Auch der "Found 1" Operator wurde innerhalb des Scheduler ausgeführt, weil er vor dem ersten observeOn() steht. Später wird die Situation interessanter. observeOn() schaltet den aktuellen Scheduler auf schedulerB um, und "Found 2" verwendet stattdessen diesen. Der letzte observeOn(schedulerC) wirkt sich sowohl auf den "Found 3" Operator als auch auf Subscriber aus. Erinnere dich daran, dass Subscriber im Kontext des letzten Scheduler funktioniert.

subscribeOn() und observeOn() funktionieren wirklich gut zusammen, wenn du Producer (Observable.create()) und Consumer (Subscriber) physisch entkoppeln willst. Standardmäßig gibt es keine solche Entkopplung, und RxJava verwendet einfach denselben Thread. subscribeOn() reicht nicht aus, wir wählen einfach einen anderen Thread.observeOn() ist besser, aber dann blockieren wir den Client-Thread im Falle von synchronen Observables. Da die meisten Operatoren nicht blockierend sind und die in ihnen verwendeten Lambda-Ausdrücke in der Regel kurz und billig sind, gibt es in der Regel nur einen subscribeOn() und observeOn() in der Pipeline der Operatoren. subscribeOn() kann zur besseren Lesbarkeit in der Nähe des ursprünglichen Observable platziert werden, während observeOn() in der Nähe von subscribe() liegt, so dass nur Subscriber diesen speziellen Scheduler verwendet, während andere Operatoren auf den Scheduler von subscribeOn() zurückgreifen.

Hier ist ein fortgeschrittenes Programm, das die Vorteile dieser beiden Operatoren nutzt:

log("Starting");
Observable<String> obs = Observable.create(subscriber -> {
    log("Subscribed");
    subscriber.onNext("A");
    subscriber.onNext("B");
    subscriber.onNext("C");
    subscriber.onNext("D");
    subscriber.onCompleted();
});
log("Created");
obs
    .subscribeOn(schedulerA)
    .flatMap(record -> store(record).subscribeOn(schedulerB))
    .observeOn(schedulerC)
    .subscribe(
            x -> log("Got: " + x),
            Throwable::printStackTrace,
            () -> log("Completed")
    );
log("Exiting");

Dabei ist store() eine einfache verschachtelte Operation:

Observable<UUID> store(String s) {
    return Observable.create(subscriber -> {
        log("Storing " + s);
        //hard work
        subscriber.onNext(UUID.randomUUID());
        subscriber.onCompleted();
    });
}

Die Erzeugung von Ereignissen findet in schedulerA statt, aber jedes Ereignis wird unabhängig voneinander mit schedulerB verarbeitet, um die Gleichzeitigkeit zu verbessern, eine Technik, die wir in "subscribeOn() Concurrency and Behavior" kennengelernt haben . Die Subskription am Ende geschieht in einem weiteren schedulerC. Wir sind uns ziemlich sicher, dass du jetzt verstehst, welcher Scheduler/Thread welche Aktion ausführt, aber nur für den Fall der Fälle (leere Zeilen wurden zur Klarheit hinzugefügt):

26   | main  | Starting
93   | main  | Created
121  | main  | Exiting

122  | Sched-A-0 | Subscribed
124  | Sched-B-0 | Storing A
124  | Sched-B-1 | Storing B
124  | Sched-B-2 | Storing C
124  | Sched-B-3 | Storing D

1136 | Sched-C-1 | Got: 44b8b999-e687-485f-b17a-a11f6a4bb9ce
1136 | Sched-C-1 | Got: 532ed720-eb35-4764-844e-690327ac4fe8
1136 | Sched-C-1 | Got: 13ddf253-c720-48fa-b248-4737579a2c2a
1136 | Sched-C-1 | Got: 0eced01d-3fa7-45ec-96fb-572ff1e33587
1137 | Sched-C-1 | Completed

observeOn() ist besonders wichtig für Anwendungen mit einer Benutzeroberfläche, bei denen wir den UI-Ereignis-Thread nicht blockieren wollen. Unter Android (siehe "Android-Entwicklung mit RxJava") oder Swing müssen einige Aktionen wie die Aktualisierung der Benutzeroberfläche in einem bestimmten Thread ausgeführt werden. Wenn du aber zu viel in diesem Thread machst, reagiert deine Benutzeroberfläche nicht mehr. In diesen Fällen setzt du observeOn() in die Nähe von subscribe(), damit der Code innerhalb der Subskription im Kontext eines bestimmten Scheduler (z. B. UI-Thread) aufgerufen wird. Andere Transformationen, auch recht billige, sollten jedoch außerhalb des UI-Threads ausgeführt werden. Auf dem Server wird observeOn() selten verwendet, weil die wahre Quelle der Gleichzeitigkeit in den meisten Observables eingebaut ist. Das führt zu einer interessanten Schlussfolgerung: RxJava steuert die Gleichzeitigkeit mit nur zwei Operatoren (subscribeOn() und observeOn()), aber je mehr du reaktive Erweiterungen verwendest, desto seltener wirst du diese im Produktionscode sehen.

Andere Verwendungszwecke für Zeitplanungsprogramme

Es gibt zahlreiche Operatoren, die standardmäßig einen Scheduler verwenden. Normalerweise wird Schedulers.computation() verwendet, wenn kein anderer Operator angegeben wird - in der JavaDoc wird das immer deutlich gemacht. Der delay() Operator nimmt zum Beispiel Upstream-Ereignisse auf und schiebt sie nach einer bestimmten Zeit Downstream. Offensichtlich kann er den ursprünglichen Thread während dieses Zeitraums nicht halten, also muss er einen anderen Scheduler verwenden:

Observable
        .just('A', 'B')
        .delay(1, SECONDS, schedulerA)
        .subscribe(this::log);

Ohne die Angabe eines eigenen schedulerA würden alle Operatoren unterhalb von delay() den computation() Scheduler verwenden. Daran ist an sich nichts auszusetzen, aber wenn dein Subscriber bei der E/A blockiert ist, würde es ein Worker aus dem global geteilten computation() Zeitplannungsprogramm verbrauchen, was möglicherweise das gesamte System beeinträchtigt. Andere wichtige Operatoren, die benutzerdefinierte Scheduler unterstützen, sind: interval(), range(), timer(), repeat(), skip(), take(), timeout() und einige andere, die noch eingeführt werden müssen. Wenn du diesen Operatoren kein Zeitplannungsprogramm zur Verfügung stellst, wird computation() Scheduler verwendet, was in den meisten Fällen eine sichere Voreinstellung ist.

Die Beherrschung von Zeitplannungsprogrammen ist wichtig, um skalierbaren und sicheren Code mit RxJava zu schreiben. Der Unterschied zwischen subscribeOn() und observeOn() ist besonders wichtig bei hoher Last, wo jede Aufgabe genau dann ausgeführt werden muss, wenn wir sie erwarten. In wirklich reaktiven Anwendungen, bei denen alle langlaufenden Operationen asynchron sind, werden nur sehr wenige Threads und damit Schedulers benötigt. Aber es gibt immer diese eine API oder Abhängigkeit, die blockierenden Code erfordert.

Nicht zuletzt müssen wir sicher sein, dass Schedulers, die nachgelagert eingesetzt werden, mit der Last mithalten können, die von Schedulers vorgelagert erzeugt wird. Aber diese Gefahr wird in Kapitel 6 ausführlich erklärt.

Zusammenfassung

In diesem Kapitel wurden verschiedene Muster in traditionellen Anwendungen beschrieben, die durch RxJava ersetzt werden können. Ich hoffe, du hast inzwischen verstanden, dass der Hochfrequenzhandel oder das Streaming von Posts aus sozialen Medien nicht die einzigen Anwendungsfälle für RxJava sind. Tatsächlich kann fast jede API nahtlos durch Observable ersetzt werden. Selbst wenn du die Leistungsfähigkeit reaktiver Erweiterungen im Moment nicht brauchst oder willst, kannst du die Implementierung weiterentwickeln, ohne rückwärtskompatible Änderungen einzuführen. Außerdem ist es der Client, der letztendlich alle Möglichkeiten nutzt, die RxJava bietet, wie Faulheit, deklarative Gleichzeitigkeit oder asynchrone Verkettung. Noch besser: Durch die nahtlose Konvertierung vonObservable zu BlockingObservable können herkömmliche Clients deine API so nutzen, wie sie wollen, und du kannst jederzeit eine einfache Brückenschicht bereitstellen.

Du solltest ziemlich sicher mit RxJava umgehen können und die Vorteile der Anwendung auch in Altsystemen verstehen. Zweifellos ist die Arbeit mit reaktivenObservables anspruchsvoller und hat eine etwas steile Lernkurve. Aber die Vorteile und Wachstumsmöglichkeiten sind einfach nicht zu überschätzen. Stell dir vor, wir könnten ganze Anwendungen mit reaktiven Erweiterungen schreiben, und zwar von vorne bis hinten. Wie ein Projekt auf der grünen Wiese, bei dem wir die Kontrolle über jede API, jede Schnittstelle und jedes externe System haben. In Kapitel 5 wird erläutert, wie du eine solche Anwendung schreiben kannst und welche Auswirkungen das hat.

1 Tatsächlich versucht RxJava, über die Thread-Affinität im Ereignisschleifenmodell auf demselben Thread zu bleiben, um auch dies auszunutzen.

2 Siehe auch "Schottenmuster und Fail-Fast".

3 Vergleiche es mit der faulen Auswertung von Ausdrücken in Haskell.

4 Für ein echtes Projekt wirst du natürlich ein produktionsgerechtes Logging-System wie Logback oder Log4J 2 verwenden.

5 https://github.com/ReactiveX/RxSwing

Get Reaktive Programmierung mit RxJava now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.