Kapitel 4. Anwendung der reaktiven Programmierung auf bestehende Anwendungen
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Die Einführung einer neuen Bibliothek, Technologie oder eines neuen Paradigmas in eine Anwendung, sei es auf der grünen Wiese oder in einer Legacy-Codebasis, muss eine sorgfältige Entscheidung sein. RxJava ist da keine Ausnahme. In diesem Kapitel gehen wir einige Muster und Architekturen durch, die in gewöhnlichen Java-Anwendungen zu finden sind, und sehen, wie Rx helfen kann. Dieser Prozess ist nicht einfach und erfordert einen erheblichen Bewusstseinswandel, deshalb werden wir vorsichtig vom imperativen zum funktionalen und reaktiven Stil übergehen. Viele Bibliotheken in Java-Projekten fügen heutzutage einfach nur Ballast hinzu, ohne eine Gegenleistung zu erbringen. Du wirst jedoch sehen, wie RxJava nicht nur traditionelle Projekte vereinfacht, sondern auch welche Vorteile es für Legacy-Plattformen bringt.
Ich bin mir ziemlich sicher, dass du bereits von RxJava begeistert bist. Die eingebauten Operatoren und die Einfachheit machen Rx zu einem erstaunlich mächtigen Werkzeug für die Umwandlung von Ereignisströmen. Wenn du jedoch morgen zurück in dein Büro gehst, wirst du feststellen, dass es keine Streams, keine Echtzeit-Ereignisse von der Börse gibt. Du kannst kaum Ereignisse in deinen Anwendungen finden; es ist nur ein Mischmasch aus Webanfragen, Datenbanken und externen APIs. Du bist so erpicht darauf, dieses neue RxJava-Ding irgendwo jenseits von Hello world auszuprobieren. Doch es scheint, als gäbe es im wirklichen Leben einfach keine Anwendungsfälle, die den Einsatz von Rx rechtfertigen. Dabei kann RxJava ein bedeutender Schritt nach vorne sein, wenn es um architektonische Konsistenz und Robustheit geht. Du musst dich nicht von vorne bis hinten auf einen reaktiven Stil festlegen - das ist zu riskant und erfordert am Anfang zu viel Arbeit. Aber Rx kann auf jeder Schicht eingeführt werden, ohne dass eine Anwendung als Ganzes kaputt geht.
Wir führen dich durch einige gängige Anwendungsmuster und zeigen dir, wie du sie mit RxJava auf nicht-invasive Weise erweitern kannst. Der Schwerpunkt liegt dabei auf Datenbankabfragen, Caching, Fehlerbehandlung und periodischen Aufgaben. Je mehr RxJava du an verschiedenen Stellen deines Stacks einbaust, desto konsistenter wird deine Architektur.
Von Sammlungen zu beobachtbaren Objekten
Wenn deine Plattform nicht gerade in JVM-Frameworks wie Play, Akka Actors oder vielleicht Vert.x entwickelt wurde, befindest du dich wahrscheinlich auf einem Stack mit einem Servlet-Container auf der einen und JDBC oder Webservices auf der anderen Seite. Dazwischen gibt es eine unterschiedliche Anzahl von Schichten, die Geschäftslogik implementieren, die wir nicht alle auf einmal refaktorisieren werden; stattdessen wollen wir mit einem einfachen Beispiel beginnen. Die folgende Klasse stellt ein triviales Repository dar, das von einer Datenbank abstrahiert:
class
PersonDao
{
List
<
Person
>
listPeople
()
{
return
query
(
"SELECT * FROM PEOPLE"
);
}
private
List
<
Person
>
query
(
String
sql
)
{
//...
}
}
Abgesehen von den Implementierungsdetails, was hat das mit Rx zu tun? Bisher haben wir über asynchrone Ereignisse gesprochen, die von vorgelagerten Systemen gepusht werden oder bestenfalls, wenn jemand ein Abonnement abschließt. Was hat dieses banale Dao
damit zu tun?Observable
ist nicht nur eine Pipe, die Ereignisse nach unten schiebt. Du kannst Observable<T>
als eine Datenstruktur betrachten, die mit Iterable<T>
identisch ist. Beide enthalten Elemente des Typs T
, bieten aber eine völlig andere Schnittstelle. Es sollte also nicht überraschen, dass du das eine einfach durch das andere ersetzen kannst:
Observable
<
Person
>
listPeople
()
{
final
List
<
Person
>
people
=
query
(
"SELECT * FROM PEOPLE"
);
return
Observable
.
from
(
people
);
}
An diesem Punkt haben wir die bestehende API verändert. Je nachdem, wie groß dein System ist, kann eine solche Inkompatibilität ein großes Problem darstellen. Deshalb ist es wichtig, RxJava so schnell wie möglich in deine API zu integrieren. Da wir mit einer bestehenden Anwendung arbeiten, kann das natürlich nicht der Fall sein.
BlockingObservable: Der Ausstieg aus der reaktiven Welt
Wenn du RxJava mit bestehendem, blockierendem und imperativem Code kombinierst, musst du Observable
vielleicht in eine einfache Sammlung umwandeln. Diese Umwandlung ist ziemlich unangenehm, denn sie erfordert das Blockieren von Observable
und das Warten auf dessen Fertigstellung. Solange Observable
nicht fertiggestellt ist, können wir keine Sammlung erstellen.BlockingObservable
ist ein spezieller Typ, der die Arbeit mit Observable
in einer nicht-reaktiven Umgebung erleichtert.BlockingObservable
sollte deine letzte Wahl sein, wenn du mit RxJava arbeitest, aber es ist unvermeidlich, wenn du blockierenden und nicht-blockierenden Code kombinierst.
In Kapitel 3 haben wir die Methode listPeople()
so umgestaltet, dass sie Observable<People>
statt List
zurückgibt.Observable
ist kein Iterable
in irgendeinem Sinne, also lässt sich unser Code nicht mehr kompilieren. Wir wollen lieber in kleinen Schritten vorgehen, als massiv umzustrukturieren, also halten wir den Umfang der Änderungen so gering wie möglich. Der Client-Code könnte so aussehen:
List
<
Person
>
people
=
pesonDao
.
listPeople
();
String
json
=
marshal
(
people
);
Wir können uns vorstellen, dass die Methode marshal()
Daten aus der Sammlung people
abruft und sie in JSON serialisiert. Das ist jetzt nicht mehr der Fall, wir können nicht einfach Daten von Observable
abrufen, wenn wir wollen.Observable
ist dafür zuständig, Daten zu produzieren(zu pushen) und die Abonnenten zu benachrichtigen, wenn es welche gibt. Diese radikale Änderung kann mit BlockingObservable
leicht umgangen werden.
Diese praktische Klasse ist völlig unabhängig von Observable
und kann über die Methode Observable.toBlocking()
erreicht werden. Die blockierende Variante von Observable
hat oberflächlich betrachtet ähnliche Methoden wie single()
oder subscribe()
. BlockingObservable
ist jedoch in blockierenden Umgebungen, die von Natur aus nicht auf die asynchrone Natur von Observable
vorbereitet sind, viel praktischer.
Die Operatoren von BlockingObservable
blockieren (warten) in der Regel, bis die zugrunde liegende Observable
abgeschlossen ist. Dies widerspricht dem Hauptkonzept von Observable
s, dass alles asynchron, träge und im laufenden Betrieb verarbeitet wird. Observable.forEach()
empfängt zum Beispiel asynchron Ereignisse von Observable
, sobald sie eintreffen, während BlockingObservable.forEach()
blockiert, bis alle Ereignisse verarbeitet und der Stream abgeschlossen ist. Auch Ausnahmen werden nicht mehr als Werte (Ereignisse) weitergegeben, sondern im aufrufenden Thread wieder verworfen.
In unserem Fall wollen wir Observable<Person>
zurück in List<Person>
umwandeln, um den Umfang des Refactorings zu begrenzen:
Observable
<
Person
>
peopleStream
=
personDao
.
listPeople
();
Observable
<
List
<
Person
>>
peopleList
=
peopleStream
.
toList
();
BlockingObservable
<
List
<
Person
>>
peopleBlocking
=
peopleList
.
toBlocking
();
List
<
Person
>
people
=
peopleBlocking
.
single
();
Ich habe absichtlich alle Zwischentypen explizit gelassen, um zu erklären, was passiert. Nach dem Refactoring zu Rx gibt unsere APIObservable<Person> peopleStream
zurück. Dieser Stream kann potenziell vollständig reaktiv, asynchron und ereignisgesteuert sein, was überhaupt nicht zu dem passt, was wir brauchen: ein statisches List
. In einem ersten Schritt verwandeln wir Observable<Person>
in Observable<List<Person>>
. Dieser faule Operator puffert allePerson
Ereignisse und hält sie im Speicher, bis das onCompleted()
Ereignis eintrifft. Zu diesem Zeitpunkt wird ein einzelnes Ereignis vom Typ List<Person>
ausgesendet, das alle gesehenen Ereignisse auf einmal enthält, wie im folgenden Marmordiagramm dargestellt:
Der resultierende Stream wird sofort nach der Ausgabe eines einzigen List
Elements abgeschlossen. Auch dieser Operator ist asynchron; er wartet nicht darauf, dass alle Ereignisse eintreffen, sondern puffert stattdessen faul alle Werte. Das umständlich aussehende Observable<List<Person>> peopleList
wird dann inBlockingObservable<List<Person>> peopleBlocking
umgewandelt. BlockingObservable
ist nur dann eine gute Idee, wenn du eine blockierende, statische Ansicht deines ansonsten asynchronen Observable
bereitstellen musst. Während Observable.from(List<T>)
die normale Pull-basierte Sammlung in Observable
umwandelt, macht toBlocking()
genau das Gegenteil. Du fragst dich vielleicht, warum wir zwei Abstraktionen für blockierende und nicht blockierende Operatoren brauchen. Die Autoren von RxJava haben herausgefunden, dass die explizite Angabe der synchronen oder asynchronen Natur des zugrunde liegenden Operators zu wichtig ist, um sie JavaDoc zu überlassen. Mit zwei voneinander unabhängigen Typen ist sichergestellt, dass du immer mit der richtigen Datenstruktur arbeitest. Außerdem ist BlockingObservable
dein letztes Mittel; normalerweise solltest du Observable
so lange wie möglich zusammensetzen und verketten. Für diese Übung wollen wir jedoch gleich von Observable
wegkommen. Der letzte Operator single()
lässt die Observablen ganz weg und extrahiert ein, und nur ein einziges Element, das wir vonBlockingObservable<T>
erwarten. Ein ähnlicher Operator, first()
, gibt einen Wert von T
zurück und verwirft alles, was er übrig hat. single()
Der Operator Observable
hingegen stellt sicher, dass es keine weiteren ausstehenden Ereignisse im zugrunde liegenden gibt, bevor er beendet wird. Das bedeutet, dass single()
blockiert und auf den Rückruf von onCompleted()
wartet. Hier ist derselbe Codeschnipsel wie zuvor, diesmal mit allen Operatoren verkettet:
List
<
Person
>
people
=
personDao
.
listPeople
()
.
toList
()
.
toBlocking
()
.
single
();
Du denkst vielleicht, dass wir uns die Mühe gemacht haben, Observable
ohne ersichtlichen Grund ein- und auszuwickeln. Erinnere dich daran, dass dies nur der erste Schritt war. Die nächste Umwandlung wird etwas Faulheit einführen. Unser Code, so wie er jetzt steht, führt immer query("...")
aus und wickelt es mit Observable
ein.
Wie du inzwischen weißt, sind Observable
(vor allem kalte) per Definition faul. Solange sich niemand anmeldet, stellen sie nur einen Stream dar, der nie die Chance hatte, Werte zu senden. Die meiste Zeit kannst du Methoden aufrufen, die Observable
zurückgeben, und solange du dich nicht anmeldest, wird keine Arbeit geleistet.
Observable
ist wie Future
, weil er einen Wert für die Zukunft verspricht. Aber solange du ihn nicht anforderst, wird ein kalter Observable
nicht einmal anfangen, Werte zu senden. So gesehen ist Observable
eher mit java.util.function.Supplier<T>
vergleichbar, das bei Bedarf Werte vom Typ T
erzeugt.
Hot Observable
s sind anders, weil sie Werte ausgeben, egal ob du zuhörst oder nicht, aber die betrachten wir jetzt nicht. Die bloße Existenz von Observable
deutet nicht auf einen Hintergrundauftrag oder irgendeinen Nebeneffekt hin, im Gegensatz zu Future
, das fast immer auf einen gleichzeitig laufenden Vorgang hindeutet.
Umarmung der Faulheit
Wie machen wir also unsere Observable
faul? Die einfachste Methode ist, einen eager Observable
mit defer()
zu umhüllen:
public
Observable
<
Person
>
listPeople
()
{
return
Observable
.
defer
(()
->
Observable
.
from
(
query
(
"SELECT * FROM PEOPLE"
)));
}
Observable.defer()
nimmt einen Lambda-Ausdruck (eine Factory), der Observable
erzeugen kann. Observable
ist eifrig, also wollen wir seine Erzeugung aufschieben. defer()
wartet bis zum letztmöglichen Moment, um Observable
tatsächlich zu erzeugen, d.h. bis sich jemand tatsächlich anmeldet. Das hat einige interessante Auswirkungen. DaObservable
"lazy" ist, hat der Aufruf von listPeople()
keine Nebeneffekte und fast keine Leistungseinbußen. Es wird noch keine Datenbank abgefragt. Du kannst Observable<Person>
wie ein Versprechen behandeln, ohne dass eine Hintergrundverarbeitung stattfindet. Beachte, dass es im Moment kein asynchrones Verhalten gibt, sondern nur eine faule Auswertung. Das ist vergleichbar mit der Art und Weise, wie Werte in der Programmiersprache Haskell nur dann ausgewertet werden, wenn sie unbedingt benötigt werden.
Wenn du noch nie in funktionalen Sprachen programmiert hast, bist du vielleicht verwirrt, warum Faulheit so wichtig und bahnbrechend ist. Es stellt sich heraus, dass ein solches Verhalten sehr nützlich ist und die Qualität und Freiheit deiner Implementierung erheblich verbessern kann. Du musst dich zum Beispiel nicht mehr darum kümmern, welche Ressourcen wann und in welcher Reihenfolge abgerufen werden. RxJava lädt sie nur, wenn sie unbedingt benötigt werden.
Nimm als Beispiel diesen trivialen Ausweichmechanismus, den wir alle schon so oft gesehen haben:
void
bestBookFor
(
Person
person
)
{
Book
book
;
try
{
book
=
recommend
(
person
);
}
catch
(
Exception
e
)
{
book
=
bestSeller
();
}
display
(
book
.
getTitle
());
}
void
display
(
String
title
)
{
//...
}
Du denkst wahrscheinlich, dass an einem solchen Konstrukt nichts auszusetzen ist. In diesem Beispiel versuchen wir, das beste Buch für eine bestimmte Person zu empfehlen, aber wenn es nicht klappt, werden wir gnädig und zeigen den Bestseller an. Die Annahme ist, dass das Abrufen eines Bestsellers schneller ist und im Cache gespeichert werden kann. Aber was wäre, wenn du die Fehlerbehandlung deklarativ hinzufügen könntest, damit try
-catch
Blöcke nicht die eigentliche Logik verdecken?
void
bestBookFor
(
Person
person
)
{
Observable
<
Book
>
recommended
=
recommend
(
person
);
Observable
<
Book
>
bestSeller
=
bestSeller
();
Observable
<
Book
>
book
=
recommended
.
onErrorResumeNext
(
bestSeller
);
Observable
<
String
>
title
=
book
.
map
(
Book:
:
getTitle
);
title
.
subscribe
(
this
::
display
);
}
Da wir RxJava bisher nur erforschen, habe ich all diese Zwischenwerte und Typen belassen. Im echten Leben würde bestBookFor()
eher so aussehen:
void
bestBookFor
(
Person
person
)
{
recommend
(
person
)
.
onErrorResumeNext
(
bestSeller
())
.
map
(
Book:
:
getTitle
)
.
subscribe
(
this
::
display
);
}
Dieser Code ist schön übersichtlich und lesbar. Finde zuerst eine Empfehlung für person
. Im Falle eines Fehlers (onErrorResumeNext
), fahre mit einem Bestseller fort. Egal, welcher Erfolg eintrat, map
gibt einen Wert zurück, indem es den Titel extrahiert und ihn dann anzeigt. onErrorResumeNext()
ist ein mächtiger Operator, der Ausnahmen, die im Vorfeld passieren, abfängt, sie schluckt und eine bereitgestellte Sicherung Observable
abonniert. Auf diese Weise implementiert Rx eine try
-catch
Klausel. Wir werden später in diesem Buch viel mehr Zeit auf die Fehlerbehandlung verwenden (siehe "Deklarative try-catch-Ersetzung"). In der Zwischenzeit können wir bestSeller()
aufrufen, ohne uns Sorgen machen zu müssen, dass der Abruf des Bestsellers auch dann erfolgt, wenn eine echte Empfehlung gut gelaufen ist.
Observables zusammenstellen
SELECT * FROM PEOPLE
ist nicht wirklich eine moderne SQL-Abfrage. Erstens solltest du nicht blind alle Spalten abrufen, aber noch schädlicher ist es, alle Zeilen abzurufen. Unsere alte API ist nicht in der Lage, Ergebnisse zu paginieren und nur eine Teilmenge einer Tabelle anzuzeigen. In einer traditionellen Unternehmensanwendung könnte das so aussehen:
List
<
Person
>
listPeople
(
int
page
)
{
return
query
(
"SELECT * FROM PEOPLE ORDER BY id LIMIT ? OFFSET ?"
,
PAGE_SIZE
,
page
*
PAGE_SIZE
);
}
Dies ist kein SQL-Buch, also lassen wir die Implementierungsdetails beiseite. Der Autor dieser API war gnadenlos: Wir haben nicht die Freiheit, einen beliebigen Bereich von Datensätzen zu wählen, sondern können nur mit 0-basierten Seitenzahlen arbeiten. In RxJava können wir jedoch aus Faulheit das Lesen einer ganzen Datenbank ab einer bestimmten Seite simulieren:
import
static
rx
.
Observable
.
defer
;
import
static
rx
.
Observable
.
from
;
Observable
<
Person
>
allPeople
(
int
initialPage
)
{
return
defer
(()
->
from
(
listPeople
(
initialPage
)))
.
concatWith
(
defer
(()
->
allPeople
(
initialPage
+
1
)));
}
Dieses Codeschnipsel lädt die erste Seite der Datenbankeinträge, z.B. 10 Einträge, nach und nach. Wenn sich niemand anmeldet, wird auch diese erste Abfrage nicht aufgerufen. Wenn es einen Abonnenten gibt, der nur ein paar anfängliche Elemente konsumiert (z. B. allPeople(0).take(3)
), meldet sich RxJava automatisch von unserem Stream ab und es werden keine weiteren Abfragen ausgeführt. Was passiert also, wenn wir z. B. 11 Elemente abfragen, der erste Aufruf von listPeople()
aber nur 10 zurückgegeben hat? Nun, RxJava findet heraus, dass die ursprüngliche Observable
erschöpft ist, aber der Konsument immer noch hungrig ist. Zum Glück gibt es den OperatorconcatWith()
, der im Grunde genommen besagt: Wenn Observable
auf der linken Seite abgeschlossen ist, abonniere Observable
auf der rechten Seite, anstatt eine Fertigstellungsmeldung an die Abonnenten weiterzuleiten, und fahre fort, als ob nichts passiert wäre, wie im folgenden Marmordiagramm dargestellt:
Mit anderen Worten: concatWith()
kann zwei Observable
s miteinander verbinden, so dass, wenn das erste beendet ist, das zweite übernimmt. Ina.concatWith(b).subscribe(...)
empfängt der Abonnent zuerst alle Ereignisse von a
, gefolgt von allen Ereignissen von b
. In diesem Fall empfängt der Abonnent zuerst 10 Elemente, gefolgt von weiteren 10. Aber sieh genau hin, es gibt eine vermeintlich unendliche Rekursion in unserem Code!allPeople(initialPage)
ruft allPeople(initialPage + 1)
ohne eine Stopp-Bedingung auf. Das ist in den meisten Sprachen ein Rezept für StackOverflowError
, aber nicht hier. Auch hier ist der Aufruf vonallPeople()
immer faul, daher ist diese Rekursion in dem Moment beendet, in dem du aufhörst zuzuhören (dich abmeldest). Technisch gesehen kann concatWith()
hier immer noch StackOverflowError
erzeugen. Warte bis "Die angeforderte Datenmenge berücksichtigen", dann wirst du lernen, wie du mit der schwankenden Nachfrage nach eingehenden Daten umgehen kannst.
Die Technik, Daten Stück für Stück zu laden, ist sehr nützlich, weil sie es dir ermöglicht, dich auf die Geschäftslogik zu konzentrieren und nicht auf das Low-Level-Plumbing. Wir sehen bereits einige Vorteile der Anwendung von RxJava, selbst in kleinem Maßstab. Die Entwicklung einer API mit Rx im Hinterkopf hat keinen Einfluss auf die gesamte Architektur, denn wir können jederzeit auf BlockingObservable
und Java Collections zurückgreifen. Aber es ist besser, eine breite Palette von Möglichkeiten zu haben, die wir bei Bedarf weiter einschränken können.
Lazy Paging und Verkettung
Es gibt noch mehr Möglichkeiten, Lazy Paging mit RxJava zu implementieren. Wenn du darüber nachdenkst, ist die einfachste Art, ausgelagerte Daten zu laden, alles zu laden und dann das zu nehmen, was wir brauchen. Es klingt albern, aber dank der Faulheit ist es machbar. Zuerst generieren wir alle möglichen Seitenzahlen und fordern dann das Laden jeder einzelnen Seite an:
Observable
<
List
<
Person
>>
allPages
=
Observable
.
range
(
0
,
Integer
.
MAX_VALUE
)
.
map
(
this
::
listPeople
)
.
takeWhile
(
list
->
!
list
.
isEmpty
());
Wäre dies nicht RxJava, würde der vorangegangene Code eine enorme Menge an Zeit und Speicher beanspruchen, da er im Grunde die gesamte Datenbank in den Speicher laden würde. Aber da Observable
faul ist, ist noch keine Abfrage an die Datenbank erfolgt. Außerdem bedeutet eine leere Seite, dass auch alle weiteren Seiten leer sind (wir haben das Ende der Tabelle erreicht). Daher verwenden wir takeWhile()
statt filter()
. Um allPages
zu Observable<Person>
zu verflachen, können wir concatMap()
verwenden (siehe "Ordnung erhalten mit concatMap()"):
Observable
<
Person
>
people
=
allPages
.
concatMap
(
Observable:
:
from
);
concatMap()
erfordert eine Umwandlung von List<Person>
nach Observable<Person>
, die für jede Seite ausgeführt wird. Alternativ können wir concatMapIterable()
ausprobieren, was dasselbe tut, aber die Umwandlung sollte für jeden vorgelagerten Wert (der zufällig bereits Iterable<Person>
ist) einen Iterable<Person>
zurückgeben:
Observable
<
Person
>
people
=
allPages
.
concatMapIterable
(
page
->
page
);
Unabhängig davon, welchen Ansatz du wählst, sind alle Transformationen auf dem Person
Objekt träge. Solange du die Anzahl der zu verarbeitenden Datensätze begrenzst (z.B. mit people.take(15)
), wird das Observable<Person>
listPeople()
so spät wie möglich aufrufen.
Unbedingte Gleichzeitigkeit
Ich sehe nicht oft explizite Gleichzeitigkeit in Unternehmensanwendungen. Meistens wird eine einzelne Anfrage von einem einzigen Thread bearbeitet. Derselbe Thread macht Folgendes:
-
Akzeptiert TCP/IP-Verbindung
-
Parst HTTP-Anfrage
-
Ruft einen Controller oder ein Servlet auf
-
Blockiert beim Aufruf der Datenbank
-
Verarbeitet Ergebnisse
-
Kodiert die Antwort (z. B. in JSON)
-
Sendet rohe Bytes zurück an den Kunden
Dieses Schichtenmodell wirkt sich auf die Latenz aus, wenn das Backend mehrere unabhängige Anfragen stellt, z. B. an die Datenbank. Sie werden sequentiell ausgeführt, obwohl man sie leicht parallelisieren könnte. Außerdem wird die Skalierbarkeit beeinträchtigt. In Tomcat gibt es zum Beispiel standardmäßig 200 Threads in den Executors, die für die Bearbeitung von Anfragen zuständig sind. Das bedeutet, dass wir nicht mehr als 200 gleichzeitige Verbindungen verarbeiten können. Bei einem plötzlichen, aber kurzen Ansturm werden die eingehenden Verbindungen in eine Warteschlange gestellt und der Server antwortet mit einer höheren Latenzzeit. Diese Situation kann jedoch nicht ewig andauern und Tomcat wird irgendwann anfangen, den eingehenden Datenverkehr abzuweisen. Wir werden einen großen Teil des nächsten Kapitels (siehe "Nonblocking HTTP Server mit Netty und RxNetty") der Frage widmen, wie wir mit diesem eher peinlichen Mangel umgehen. Bleiben wir vorerst bei der traditionellen Architektur: Die Ausführung aller Schritte der Anfragebearbeitung in einem einzigen Thread hat einige Vorteile, zum Beispiel eine bessere Cache-Lokalität und einen minimalen Synchronisations-Overhead.1 Da bei klassischen Anwendungen die Gesamtlatenz die Summe der Latenzen der einzelnen Schichten ist, kann sich eine einzige fehlerhafte Komponente negativ auf die Gesamtlatenz auswirken.2 Außerdem gibt es manchmal viele Schritte, die voneinander unabhängig sind und gleichzeitig ausgeführt werden können. Wir rufen zum Beispiel mehrere externe APIs auf oder führen mehrere unabhängige SQL-Abfragen aus.
Das JDK bietet eine recht gute Unterstützung für Gleichzeitigkeit, vor allem seit Java 5 mit ExecutorService
und Java 8 mit CompletableFuture
. Trotzdem wird sie nicht so häufig genutzt, wie es möglich wäre. Betrachten wir zum Beispiel das folgende Programm ohne jegliche Gleichzeitigkeit:
Flight
lookupFlight
(
String
flightNo
)
{
//...
}
Passenger
findPassenger
(
long
id
)
{
//...
}
Ticket
bookTicket
(
Flight
flight
,
Passenger
passenger
)
{
//...
}
SmtpResponse
sendEmail
(
Ticket
ticket
)
{
//...
}
Und auf der Kundenseite:
Flight
flight
=
lookupFlight
(
"LOT 783"
);
Passenger
passenger
=
findPassenger
(
42
);
Ticket
ticket
=
bookTicket
(
flight
,
passenger
);
sendEmail
(
ticket
);
Auch hier handelt es sich um einen ganz typischen, klassischen Blocking-Code, wie er in vielen Anwendungen zu finden ist. Aber wenn du dir die Latenzzeit genau ansiehst, hat der vorhergehende Codeschnipsel vier Schritte; die ersten beiden sind jedoch unabhängig voneinander. Nur der dritte Schritt (bookTicket()
) benötigt die Ergebnisse von lookupFlight()
undfindPassenger()
. Es gibt eine offensichtliche Möglichkeit, die Vorteile der Gleichzeitigkeit zu nutzen. Doch nur sehr wenige Entwickler werden diesen Weg tatsächlich gehen, weil er umständliche Thread-Pools, Future
s und Rückrufe erfordert. Was wäre aber, wenn die API bereits Rx-kompatibel wäre? Erinnere dich daran, dass du blockierenden Legacy-Code einfach in Observable
verpacken kannst, so wie wir es am Anfang dieses Kapitels getan haben:
Observable
<
Flight
>
rxLookupFlight
(
String
flightNo
)
{
return
Observable
.
defer
(()
->
Observable
.
just
(
lookupFlight
(
flightNo
)));
}
Observable
<
Passenger
>
rxFindPassenger
(
long
id
)
{
return
Observable
.
defer
(()
->
Observable
.
just
(
findPassenger
(
id
)));
}
Semantisch gesehen tun die Methoden von rx-
genau dasselbe und auf dieselbe Art und Weise; das heißt, sie sind standardmäßig blockierend. Abgesehen von einer ausführlicheren API aus Sicht des Clients haben wir noch nichts gewonnen:
Observable
<
Flight
>
flight
=
rxLookupFlight
(
"LOT 783"
);
Observable
<
Passenger
>
passenger
=
rxFindPassenger
(
42
);
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
(
f
,
p
)
->
bookTicket
(
f
,
p
));
ticket
.
subscribe
(
this
::
sendEmail
);
Sowohl herkömmliche Blockierprogramme als auch das Programm mit Observable
funktionieren genau gleich. Es ist zwar standardmäßig langsamer, aber die Reihenfolge der Vorgänge ist im Wesentlichen dieselbe. Zuerst erstellen wir Observable<Flight>
, das, wie du bereits weißt, standardmäßig nichts tut. Wenn jemand nicht ausdrücklich nach einer Flight
fragt, ist diese Observable
nur ein fauler Platzhalter. Wir haben bereits gelernt, dass dies eine wertvolle Eigenschaft von kalten Observable
s ist. Das Gleiche gilt für Observable<Passenger>
; wir haben zwei Platzhalter vom TypFlight
und Passenger
, allerdings wurden noch keine Seiteneffekte ausgeführt. Keine Datenbankabfrage und kein Web-Service-Aufruf. Wenn wir uns entscheiden, die Verarbeitung hier zu beenden, wurde keine überflüssige Arbeit geleistet.
Um mit bookTicket()
fortzufahren, brauchen wir konkrete Instanzen von Flight
und Passenger
. Es ist verlockend, diese beiden Observable
s einfach zu blockieren, indem wir den toBlocking()
Operator verwenden. Wir möchten jedoch das Blockieren so weit wie möglich vermeiden, um den Ressourcenverbrauch (vor allem den Speicher) zu reduzieren und eine größere Gleichzeitigkeit zu ermöglichen. Eine andere schlechte Lösung ist es,.subscribe()
auf die flight
und passenger
Observable
s anzuwenden und irgendwie zu warten, bis beide Callbacks beendet sind. Das ist ziemlich einfach, wennObservable
blockiert, aber wenn Callbacks asynchron erscheinen und du einen globalen Zustand synchronisieren musst, der auf beide wartet, wird das schnell zu einem Albtraum. Auch ein verschachteltes subscribe()
ist nicht idiomatisch, und normalerweise willst du ein einziges Abonnement für einen Nachrichtenfluss (Anwendungsfall). Der einzige Grund, warum Callbacks in JavaScript einigermaßen anständig funktionieren, ist, dass es nur einen Thread gibt. Die idiomatische Art, mehrere Observable
s gleichzeitig zu abonnieren, ist zip
und zipWith
. Du könntest zip
als eine Möglichkeit sehen, zwei unabhängige Datenströme paarweise zu verbinden. Aber viel häufiger wird zip
einfach verwendet, um zwei Einzel-Elemente Observable
s zu verbinden.ob1.zip(ob2).subscribe(...)
bedeutet im Wesentlichen, dass ein Ereignis empfangen wird, wenn sowohl ob1
als auch ob2
fertig sind (ein eigenes Ereignis ausgeben). Wenn du also zip
siehst, ist es wahrscheinlicher, dass jemand einfach einen Join-Schrittfür zwei oder mehr Observable
s macht, deren Ausführungspfade sich verzweigt haben.zip
ist eine Möglichkeit, asynchron auf zwei oder mehr Werte zu warten, egal, welcher zuletzt erscheint.
Kommen wir also zurück zu flight.zipWith(passenger, this::bookTicket)
(eine kürzere Syntax mit Methodenreferenz anstelle eines expliziten Lambdas, wie im Codebeispiel). Der Grund, warum ich alle Typinformationen behalte, anstatt Ausdrücke fließend zu verbinden, ist, dass ich möchte, dass du auf die Rückgabetypen achtest. flight.zipWith(passenger, ...)
ruft nicht einfach einen Callback auf, wenn sowohl flight
als auch passenger
fertig sind; es gibt einen neuenObservable
zurück, den du sofort als faulen Platzhalter für Daten erkennen solltest. Erstaunlicherweise wurde zu diesem Zeitpunkt auch noch keine Berechnung gestartet. Wir haben einfach ein paar Datenstrukturen zusammengeschustert, aber kein Verhalten ausgelöst. Solange niemand Observable<Ticket>
abonniert hat, wird RxJava keinen Backend-Code ausführen. Das passiert schließlich in der letzten Anweisung: ticket.subscribe()
fragt explizit nach Ticket
.
Wo kann man sich anmelden?
Achte darauf, wo du subscribe()
im Domaincode siehst. Oft besteht deine Geschäftslogik nur darin, Observable
zusammenzustellen und an eine Art Framework oder Gerüstschicht zurückzugeben. Das eigentliche Abonnement findet hinter den Kulissen in einem Webframework oder in einem Glue-Code statt. Es ist keine schlechte Praxis, subscribe()
selbst aufzurufen, aber versuche, es so weit wie möglich nach außen zu verlagern.
Um den Ablauf der Ausführung zu verstehen, ist es sinnvoll, von unten nach oben zu schauen. Wir haben ticket
abonniert, also muss RxJava sowohl flight
als auch passenger
transparent abonnieren. An diesem Punkt beginnt die eigentliche Logik. Da beide Observable
s kalt sind und noch keine Gleichzeitigkeit besteht, ruft die erste Subskription von flight
die blockierende Methode lookupFlight()
direkt im aufrufenden Thread auf. Wenn lookupFlight()
fertig ist, kann RxJava passenger
abonnieren. Allerdings hat es bereits eine Flight
Instanz von der synchronen flight
erhalten. rxFindPassenger()
ruft findPassenger()
blockierend auf und erhält eine Passenger
Instanz. An dieser Stelle fließen die Daten wieder zurück. Die Instanzen von Flight
und Passenger
werden mithilfe des bereitgestellten Lambdas (bookTicket
) kombiniert und an ticket.subscribe()
weitergegeben.
Das hört sich nach einer Menge Arbeit an, wenn man bedenkt, dass er sich im Wesentlichen genauso verhält und funktioniert wie unser blockierender Code am Anfang. Aber jetzt können wir deklarativ Gleichzeitigkeit anwenden, ohne die Logik zu ändern. Wenn unsere Geschäftsmethoden Future<Flight>
(oderCompletableFuture<Flight>
, das spielt keine Rolle) zurückgeben würden, wären zwei Entscheidungen für uns getroffen worden:
-
Der zugrunde liegende Aufruf von
lookupFlight()
hat bereits begonnen und es gibt keinen Platz für Faulheit. Wir blockieren diese Methode nicht, aber die Arbeit hat bereits begonnen. -
Wir haben keinerlei Kontrolle über die Gleichzeitigkeit. Es ist die Methodenimplementierung, die entscheidet, ob eine
Future
Aufgabe in einem Thread-Pool aufgerufen wird, ein neuer Thread pro Anfrage und so weiter.
RxJava gibt den Nutzern mehr Kontrolle. Nur weil Observable<Flight>
nicht mit Blick auf die Gleichzeitigkeit implementiert wurde, heißt das nicht, dass wir sie nicht später anwenden können. In der realen Welt ist Observable
in der Regel bereits asynchron, aber in seltenen Fällen muss man die Gleichzeitigkeit zu einer bestehenden Observable
hinzufügen. Die Verbraucher unserer API, nicht die Implementierer, können den Threading-Mechanismus im Falle der synchronen Observable
frei wählen. All dies wird durch die Verwendung des subscribeOn()
Operators erreicht:
Observable
<
Flight
>
flight
=
rxLookupFlight
(
"LOT 783"
).
subscribeOn
(
Schedulers
.
io
());
Observable
<
Passenger
>
passenger
=
rxFindPassenger
(
42
).
subscribeOn
(
Schedulers
.
io
());
An jedem beliebigen Punkt vor dem Abonnieren können wir den subscribeOn()
Operator einfügen und eine sogenannte Scheduler
Instanz bereitstellen. In diesem Fall habe ich die Schedulers.io()
Factory-Methode verwendet, aber wir können genauso gut eine benutzerdefinierte ExecutorService
verwenden und sie schnell mit Scheduler
umhüllen. Wenn die Subskription erfolgt, wird der anObservable.create()
übergebene Lambda-Ausdruck in dem bereitgestellten Scheduler
und nicht im Client-Thread ausgeführt. Das ist noch nicht notwendig, aber wir werden Zeitplannungsprogramme im Abschnitt "Was ist ein Zeitplannungsprogramm?" genauer untersuchen. In der Zwischenzeit behandelst du Scheduler
wie einen Thread-Pool.
Wie verändert Scheduler
das Laufzeitverhalten unseres Programms? Erinnere dich, dass der zip()
Operator zwei oder mehrObservable
s abonniert und auf Paare (oder Tupel) wartet. Wenn die Subskription asynchron erfolgt, können alle vorgelagerten Observable
s ihren zugrunde liegenden blockierenden Code gleichzeitig aufrufen. Wenn du dein Programm jetzt ausführst, werden lookupFlight()
und findPassenger()
sofort und gleichzeitig mit der Ausführung beginnen, wenn ticket.subscribe()
aufgerufen wird. Dann wirdbookTicket()
ausgeführt, sobald der langsamere der vorgenannten Observable
s einen Wert ausgibt.
Apropos Langsamkeit: Du kannst auch deklarativ eine Zeitüberschreitung festlegen, wenn ein bestimmtes Observable
in der angegebenen Zeit keinen Wert ausgibt:
rxLookupFlight
(
"LOT 783"
)
.
subscribeOn
(
Schedulers
.
io
())
.
timeout
(
100
,
TimeUnit
.
MILLISECONDS
)
Wie immer werden Fehler nicht willkürlich verworfen, sondern nachgelagert weitergegeben. Wenn also die Methode lookupFlight()
länger als 100 Millisekunden dauert, wird am Ende TimeoutException
anstelle eines ausgegebenen Wertes an alle Abonnenten gesendet. Der Operator timeout()
wird in "Timing Out When Events Do Not Occurrence" ausführlich erklärt .
Am Ende haben wir zwei Methoden, die ohne großen Aufwand gleichzeitig laufen, vorausgesetzt, deine API ist bereits Rx-gesteuert. Aber wir haben ein wenig geschummelt, da bookTicket()
immer noch Ticket
zurückgibt, was definitiv bedeutet, dass sie blockiert. Selbst wenn die Ticketbuchung extrem schnell war, lohnt es sich, sie als solche zu deklarieren, um die Weiterentwicklung der API zu erleichtern. Die Weiterentwicklung könnte bedeuten, dass du Nebenläufigkeit hinzufügst oder sie in vollständig nichtblockierenden Umgebungen verwendest (siehe Kapitel 5).
Erinnere dich daran, dass es so einfach ist, eine nicht blockierende API in eine blockierende umzuwandeln, wie den Aufruf von toBlocking()
. Das Gegenteil ist oft schwierig und erfordert viele zusätzliche Ressourcen. Außerdem ist es sehr schwierig, die Entwicklung von Methoden wie rxBookTicket()
vorherzusagen. Wenn sie jemals das Netzwerk oder das Dateisystem berühren, ganz zu schweigen von der Datenbank, lohnt es sich, sie mit einem Observable
zu versehen, der die mögliche Latenz auf der Typebene angibt:
Observable
<
Ticket
>
rxBookTicket
(
Flight
flight
,
Passenger
passenger
)
{
//...
}
Aber jetzt gibt zipWith()
ein unangenehmes Observable<Observable<Ticket>>
zurück und der Code lässt sich nicht mehr kompilieren. Eine gute Faustregel ist, dass immer dann, wenn du einen doppelt eingeschlossenen Typ siehst (zum Beispiel Optional<Optional<...>>
), irgendwo ein flatMap()
-Aufruf fehlt. Das ist auch hier der Fall. zipWith()
nimmt ein Paar (oder allgemeiner ein Tupel) von Ereignissen, wendet eine Funktion an, die diese Ereignisse als Argumente verwendet, und stellt das Ergebnis unverändert in das nachgeschaltete Observable
ein. Deshalb haben wir zuerst Observable<Ticket>
gesehen, aber jetzt ist es Observable<Observable<Ticket>>
, wobeiObservable<Ticket>
das Ergebnis der von uns gelieferten Funktion ist. Es gibt zwei Möglichkeiten, dieses Problem zu lösen. Eine Möglichkeit besteht darin, ein Zwischenpaar zu verwenden, das vonzipWith
zurückgegeben wird:
import
org.apache.commons.lang3.tuple.Pair
;
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
(
Flight
f
,
Passenger
p
)
->
Pair
.
of
(
f
,
p
))
.
flatMap
(
pair
->
rxBookTicket
(
pair
.
getLeft
(),
pair
.
getRight
()));
Wenn die Verwendung einer expliziten Pair
aus der Bibliothek eines Drittanbieters den Codefluss nicht genug verwirren würde, würde eine Methodenreferenz tatsächlich funktionieren: Pair::of
Aber auch hier haben wir uns entschieden, dass sichtbare Typinformationen wertvoller sind, als ein paar Tastenanschläge zu sparen. Schließlich lesen wir viel mehr Zeit mit dem Lesen von Code, als wir ihn schreiben. Eine Alternative zu einem Zwischenpaar ist die Anwendung einer flatMap
mit einer Identitätsfunktion:
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
this
::
rxBookTicket
)
.
flatMap
(
obs
->
obs
);
Dieser obs -> obs
Lambda-Ausdruck tut scheinbar nichts, zumindest wenn er ein map()
Operator wäre. Aber erinnere dich daran, dass flatMap()
eine Funktion auf jeden Wert innerhalb von Observable
anwendet, so dass diese Funktion in unserem FallObservable<Ticket>
als Argument benötigt. Später wird das Ergebnis nicht direkt in den resultierenden Stream eingefügt, wie bei map()
. Stattdessen wird der Rückgabewert (vom Typ Observable<T>
) "geglättet", was zu einem Observable<T>
und nicht zu einem Observable<Observable<T>>
führt. Wenn es um Zeitplannungsprogramme geht, wird der Operator flatMap()
noch mächtiger. Man könnte meinen, dass flatMap()
nur ein syntaktischer Trick ist, um ein verschachteltes Observable<Observable<...>>
Problem zu vermeiden, aber er ist viel grundlegender als das.
Observable.subscribeOn() Anwendungsfälle
Es ist verlockend zu denken, dass subscribeOn()
das richtige Werkzeug für die Gleichzeitigkeit in RxJava ist. Dieser Operator funktioniert, aber du solltest die Verwendung von subscribeOn()
(und dem noch zu beschreibenden observeOn()
) nicht oft sehen. Im wirklichen Leben kommen Observable
s aus asynchronen Quellen, so dass ein benutzerdefiniertes Zeitplanungsprogramm überhaupt nicht benötigt wird.
Wir verwenden subscribeOn()
in diesem Kapitel, um explizit zu zeigen, wie man bestehende Anwendungen aufrüsten kann, um reaktive Prinzipien selektiv zu nutzen. Aber in der Praxis sind Scheduler
s und subscribeOn()
Waffen der letzten Instanz, also nichts, was man häufig sieht.
flatMap() als asynchroner Verkettungsoperator
In unserer Beispielanwendung müssen wir nun eine Liste von Ticket
per E-Mail versenden. Dabei müssen wir Folgendes beachten:
-
Die Liste kann ziemlich lang sein.
-
Das Versenden einer E-Mail kann einige Millisekunden oder sogar Sekunden dauern.
-
Die Anwendung muss im Falle von Fehlschlägen weiterlaufen, aber am Ende melden, welche Tickets fehlgeschlagen sind.
Die letzte Anforderung schließttickets.forEach(this::sendEmail)
schnell aus, weil es eifrig eine Ausnahme wirft und nicht mit Tickets fortfährt, die noch nicht geliefert wurden. Ausnahmen sind eigentlich eine böse Hintertür zum Typsystem und genau wie Rückrufe sind sie nicht sehr freundlich, wenn man sie auf eine robustere Art und Weise verwalten will. Deshalb modelliert RxJava sie explizit als spezielle Benachrichtigungen, aber hab Geduld, wir werden dazu kommen. Angesichts der Anforderung an die Fehlerbehandlung sieht unser Code mehr oder weniger so aus:
List
<
Ticket
>
failures
=
new
ArrayList
<>();
for
(
Ticket
ticket:
tickets
)
{
try
{
sendEmail
(
ticket
);
}
catch
(
Exception
e
)
{
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
);
failures
.
add
(
ticket
);
}
}
Die ersten beiden Anforderungen oder Richtlinien werden jedoch nicht berücksichtigt. Es gibt keinen Grund, warum wir E-Mails von einem Thread aus nacheinander versenden. Traditionell könnten wir dafür eine ExecutorService pool
verwenden, indem wir jede E-Mail als separate Aufgabe übermitteln:
List
<
Pair
<
Ticket
,
Future
<
SmtpResponse
>>>
tasks
=
tickets
.
stream
()
.
map
(
ticket
->
Pair
.
of
(
ticket
,
sendEmailAsync
(
ticket
)))
.
collect
(
toList
());
List
<
Ticket
>
failures
=
tasks
.
stream
()
.
flatMap
(
pair
->
{
try
{
Future
<
SmtpResponse
>
future
=
pair
.
getRight
();
future
.
get
(
1
,
TimeUnit
.
SECONDS
);
return
Stream
.
empty
();
}
catch
(
Exception
e
)
{
Ticket
ticket
=
pair
.
getLeft
();
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
);
return
Stream
.
of
(
ticket
);
}
})
.
collect
(
toList
());
//------------------------------------
private
Future
<
SmtpResponse
>
sendEmailAsync
(
Ticket
ticket
)
{
return
pool
.
submit
(()
->
sendEmail
(
ticket
));
}
Das ist eine ganze Menge Code, mit dem alle Java-Programmierer vertraut sein sollten. Dennoch scheint er zu langatmig und ungewollt komplex zu sein. Zunächst durchlaufen wir tickets
und übermitteln sie an einen Thread-Pool. Um genau zu sein, rufen wir die Hilfsmethode sendEmailAsync()
auf, die den in Callable<SmtpResponse>
verpacktensendEmail()
-Aufruf an einen Threadpool
übergibt. Noch genauere Instanzen von Callable
werden zunächst in eine unbegrenzte (standardmäßig) Warteschlange vor einem Thread-Pool gestellt. Das Fehlen von Mechanismen, die eine zu schnelle Übergabe von Aufgaben verlangsamen, wenn sie nicht rechtzeitig bearbeitet werden können, führte zu reaktiven Streams und Backpressure-Aufwand (siehe "Backpressure").
Da wir später im Falle eines Fehlers eine Ticket
Instanz benötigen, müssen wir nachverfolgen, welche Future
für welcheTicket
verantwortlich war, wiederum in einem Pair
. In echtem Produktionscode solltest du einen aussagekräftigeren und dedizierten Container wie ein TicketAsyncTask
Wertobjekt in Betracht ziehen. Wir sammeln alle diese Paare und fahren mit der nächsten Iteration fort. Zu diesem Zeitpunkt führt der Thread-Pool bereits mehrere sendEmail()
Aufrufe gleichzeitig aus, und das ist genau das, was wir bezwecken wollten. Die zweite Schleife durchläuft alle Future
s und versucht, sie zu dereferenzieren, indem sie blockiert (get()
) und auf den Abschluss wartet. Wenn get()
erfolgreich zurückkehrt, überspringen wir eine solche Ticket
. Wenn es jedoch eine Ausnahme gibt, geben wir die Ticket
Instanz zurück, die mit dieser Aufgabe verbunden war - wir wissen, dass sie fehlgeschlagen ist und wollen das später melden. Stream.flatMap()
erlaubt es uns, null oder ein Element (oder eigentlich eine beliebige Zahl) zurückzugeben, im Gegensatz zu Stream.map()
, das immer eins verlangt.
Du fragst dich vielleicht, warum wir zwei Schleifen brauchen und nicht nur eine wie hier:
//WARNING: code is sequential despite utilizing thread pool
List
<
Ticket
>
failures
=
tickets
.
stream
()
.
map
(
ticket
->
Pair
.
of
(
ticket
,
sendEmailAsync
(
ticket
)))
.
flatMap
(
pair
->
{
//...
})
.
collect
(
toList
());
Dies ist ein interessanter Fehler, der wirklich schwer zu finden ist, wenn du nicht verstehst, wie Stream
s in Java 8 funktionieren. Da Streams - genau wie Observable
s - träge sind, werten sie die zugrundeliegende Sammlung nur ein Element nach dem anderen aus und auch nur dann, wenn eine Terminaloperation angefordert wurde (z. B.collect(toList())
). Das bedeutet, dass eine map()
Operation, mit der Hintergrundaufgaben gestartet werden, nicht sofort auf allen Tickets ausgeführt wird, sondern eines nach dem anderen, abwechselnd mit einer flatMap()
Operation. Außerdem starten wir wirklich eine Future
, blockieren das Warten darauf, starten eine zweite Future
, blockieren das Warten darauf, und so weiter. Eine Zwischensammlung wird benötigt, um die Auswertung zu erzwingen, nicht wegen der Übersichtlichkeit oder Lesbarkeit. Schließlich ist der TypList<Pair<Ticket, Future<SmtpResponse>>>
kaum lesbarer.
Das ist viel Arbeit und die Fehlerwahrscheinlichkeit ist hoch. Kein Wunder also, dass Entwicklerinnen und Entwickler davor zurückschrecken, nebenläufigen Code im Alltag anzuwenden. Das wenig bekannte ExecutorCompletionService
aus dem JDK wird manchmal verwendet, wenn es einen Pool von asynchronen Aufgaben gibt und wir sie verarbeiten wollen, sobald sie abgeschlossen sind.
Außerdem bringt Java 8 CompletableFuture
(siehe "CompletableFuture und Streams"), das vollständig reaktiv und nicht blockierend ist. Aber wie kann RxJava hier helfen? Nehmen wir zunächst an, dass eine API zum Versenden einer E-Mail bereits auf die Verwendung von RxJava umgerüstet wurde:
import
static
rx
.
Observable
.
fromCallable
;
Observable
<
SmtpResponse
>
rxSendEmail
(
Ticket
ticket
)
{
//unusual synchronous Observable
return
fromCallable
(()
->
sendEmail
())
}
Es geht nicht um Gleichzeitigkeit, sondern nur darum, sendEmail()
in ein Observable
zu packen. Dies ist eine seltene Observable
; normalerweise würdest du subscribeOn()
in der Implementierung verwenden, damit Observable
standardmäßig asynchron ist. An diesem Punkt können wir wie zuvor über alle tickets
iterieren:
List
<
Ticket
>
failures
=
Observable
.
from
(
tickets
)
.
flatMap
(
ticket
->
rxSendEmail
(
ticket
)
.
flatMap
(
response
->
Observable
.<
Ticket
>
empty
())
.
doOnError
(
e
->
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
))
.
onErrorReturn
(
err
->
ticket
))
.
toList
()
.
toBlocking
()
.
single
();
Observable.ignoreElements()
Es ist leicht zu erkennen, dass die innere flatMap()
in unserem Beispiel response
ignoriert und einen leeren Stream zurückgibt. In solchen Fällen ist flatMap()
ein Overkill; der ignoreElements()
-Operator ist viel effizienter.ignoreElements()
ignoriert einfach alle ausgegebenen Werte und leitet onCompleted()
oder onError()
Benachrichtigungen weiter. Da wir die eigentliche Antwort ignorieren und nur Fehler behandeln, funktioniert ignoreElements()
hier hervorragend.
Alles, was uns interessiert, liegt innerhalb der äußeren flatMap()
. Wäre es nurflatMap(this::rxSendEmail)
, würde der Code funktionieren; allerdings würde jeder Fehler, der von rxSendEmail
ausgegeben wird, den gesamten Stream beenden. Wir wollen aber alle ausgegebenen Fehler "auffangen" und für den späteren Gebrauch sammeln. Wir verwenden einen ähnlichen Trick wie bei Stream.flatMap()
: Wenn response
erfolgreich ausgegeben wurde, wandeln wir es in ein leeres Observable
um. Das bedeutet im Grunde, dass wir erfolgreiche Tickets verwerfen. Bei Misserfolgen geben wir jedoch ein ticket
zurück, das eine Ausnahme ausgelöst hat. Ein zusätzlicherdoOnError()
Callback ermöglicht es uns, die Ausnahmen zu protokollieren - natürlich können wir die Protokollierung auch zum onErrorReturn()
Operator hinzufügen, aber ich fand diese Trennung der Anliegen funktionaler.
Um mit früheren Implementierungen kompatibel zu bleiben, verwandeln wir Observable
in Observable<List<Ticket>>
, BlockingObservable<List<Ticket>>
, toBlocking()
und schließlich List<Ticket>
(single()
). Interessanterweise bleibt auch BlockingObservable
faul. Ein toBlocking()
Operator allein erzwingt keine Auswertung, indem er den zugrunde liegenden Stream abonniert, und er blockiert nicht einmal. Die Subskription und damit die Iteration und das Senden von E-Mails wird aufgeschoben, bis single()
aufgerufen wird.
Wenn wir die äußere flatMap()
durch concatMap()
ersetzen (siehe "Ways of Combining Streams: concat(), merge(), and switchOnNext()" und "Preserving Order Using concatMap()"), werden wir auf einen ähnlichen Fehler stoßen, wie er bei Stream
von JDK erwähnt wurde.
Im Gegensatz zu flatMap()
(oder merge
), das alle inneren Streams sofort abonniert, abonniert concatMap
(oder concat
) einen inneren Observable
nach dem anderen. Und solange niemand Observable
abonniert hat, wird auch nicht gearbeitet.
Bislang wurde eine einfache for
Schleife mit try
-catch
durch eine weniger lesbare und komplexere Observable
ersetzt. Um unseren sequenziellen Code in eine Multithreading-Berechnung zu verwandeln, müssen wir jedoch nur einen zusätzlichen Operator hinzufügen:
Observable
.
from
(
tickets
)
.
flatMap
(
ticket
->
rxSendEmail
(
ticket
)
.
ignoreElements
()
.
doOnError
(
e
->
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
))
.
onErrorReturn
(
err
->
ticket
)
.
subscribeOn
(
Schedulers
.
io
()))
Sie ist so unauffällig, dass du sie vielleicht gar nicht bemerkst. Ein zusätzlichersubscribeOn()
Operator bewirkt, dass jede einzelne rxSendMail()
auf einem bestimmten Scheduler
(io()
, in diesem Fall) ausgeführt wird. Das ist eine der Stärken von RxJava: Es macht sich keine Gedanken über Threading, sondern setzt auf synchrone Ausführung, erlaubt aber nahtloses und fast transparentes Multithreading. Das bedeutet natürlich nicht, dass du Zeitplanungsprogramme an beliebigen Stellen einbauen kannst. Aber zumindest ist die API weniger langatmig und auf höherem Niveau. Wir werden Zeitplannungsprogramme später in "Multithreading in RxJava" noch genauer untersuchen . Erinnere dich vorerst daran, dass Observable
standardmäßig synchron ist; wir können das jedoch leicht ändern und Gleichzeitigkeit an Stellen anwenden, an denen sie am wenigsten erwartet wird. Das ist vor allem bei bestehenden Legacy-Anwendungen nützlich, die du ohne großen Aufwand optimieren kannst.
Wenn du Observable
von Grund auf neu implementierst, ist es idiomatischer, sie standardmäßig asynchron zu machen. Das bedeutet, dass du subscribeOn()
direkt innerhalb von rxSendEmail()
platzierst und nicht extern. Andernfalls riskierst du, bereits asynchrone Streams mit einer weiteren Schicht von Zeitplannungsprogrammen zu umhüllen.
Wenn der Producer hinter Observable
bereits asynchron ist, ist das natürlich noch besser, weil dein Stream nicht an einen bestimmten Thread gebunden ist. Außerdem solltest du das Abonnieren eines Observable
so spät wie möglich verschieben, typischerweise in die Nähe des Web-Frameworks unserer Außenwelt. Das ändert deine Denkweise erheblich. Deine gesamte Geschäftslogik ist faul, bis jemand die Ergebnisse tatsächlich sehen will.3
Ersetzen von Rückrufen durch Streams
Herkömmliche APIs sind die meiste Zeit blockierend, das heißt, sie zwingen dich, synchron auf die Ergebnisse zu warten. Dieser Ansatz funktioniert relativ gut, zumindest bevor du von RxJava gehört hast. Aber eine blockierende API ist besonders problematisch, wenn Daten vom API-Produzenten an die Konsumenten weitergegeben werden müssen - das ist ein Bereich, in dem RxJava wirklich glänzt. Es gibt zahlreiche Beispiele für solche Fälle, und die API-Designer verfolgen verschiedene Ansätze. In der Regel müssen wir eine Art Callback bereitstellen, den die API aufruft, oft als Event-Listener bezeichnet. Eines der häufigsten Szenarien dieser Art ist derJava Message Service (JMS). Um JMS zu nutzen, muss normalerweise eine Klasse implementiert werden, die den Anwendungsserver oder Container über jede eingehende Nachricht informiert. Wir können solche Listener relativ einfach durch einen komponierbaren Observable
ersetzen, der viel robuster und vielseitiger ist. Der traditionelle Listener sieht ähnlich aus wie diese Klasse, die hier die JMS-Unterstützung imSpring-Framework nutzt, aber unsere Lösung ist technologieunabhängig:
@Component
class
JmsConsumer
{
@JmsListener
(
destination
=
"orders"
)
public
void
newOrder
(
Message
message
)
{
//...
}
}
Wenn eine JMS message
empfangen wird, muss die Klasse JmsConsumer
entscheiden, was damit geschehen soll. Normalerweise wird eine Geschäftslogik innerhalb eines Nachrichtenkonsumenten aufgerufen. Wenn eine neue Komponente über solche Nachrichten benachrichtigt werden möchte, muss sie JmsConsumer
entsprechend ändern. Stell dir vor, dass Observable<Message>
von jedem abonniert werden kann. Außerdem steht ein ganzes Universum von RxJava-Operatoren zur Verfügung, mit denen man Nachrichten zuordnen, filtern und kombinieren kann. Der einfachste Weg, um von einer Push- und Callback-basierten API zu Observable
zu wechseln, ist die Verwendung vonSubject
s. Jedes Mal, wenn eine neue JMS-Nachricht zugestellt wird, pushen wir diese Nachricht an eine PublishSubject
, die von außen wie ein gewöhnlicher HotObservable
aussieht:
private
final
PublishSubject
<
Message
>
subject
=
PublishSubject
.
create
();
@JmsListener
(
destination
=
"orders"
,
concurrency
=
"1"
)
public
void
newOrder
(
Message
msg
)
{
subject
.
onNext
(
msg
);
}
Observable
<
Message
>
observe
()
{
return
subject
;
}
Vergiss nicht, dass Observable<Message>
"heiß" ist: Es beginnt mit der Ausgabe von JMS-Nachrichten, sobald sie konsumiert werden. Wenn in diesem Moment niemand abonniert ist, gehen die Nachrichten einfach verloren. ReplaySubject
ist eine Alternative, aber da es alle Ereignisse seit dem Start der Anwendung zwischenspeichert, ist es nicht für lang laufende Prozesse geeignet. Wenn du einen Abonnenten hast, der unbedingt alle Nachrichten erhalten muss, musst du sicherstellen, dass er die Nachrichten abonniert, bevor der JMS Message Listener initialisiert wird. Außerdem hat unser Message Listener einen concurrency="1"
Parameter, um sicherzustellen, dass Subject
nicht von mehreren Threads aus aufgerufen wird. Als Alternative kannst du Subject.toSerialized()
verwenden.
Nebenbei bemerkt: Subject
ist zwar einfacher zu starten, aber nach einer Weile bekanntermaßen problematisch. In diesem speziellen Fall können wir Subject
einfach durch das idiomatischere RxJava Observable
ersetzen, das create()
direkt verwendet:
public
Observable
<
Message
>
observe
(
ConnectionFactory
connectionFactory
,
Topic
topic
)
{
return
Observable
.
create
(
subscriber
->
{
try
{
subscribeThrowing
(
subscriber
,
connectionFactory
,
topic
);
}
catch
(
JMSException
e
)
{
subscriber
.
onError
(
e
);
}
});
}
private
void
subscribeThrowing
(
Subscriber
<?
super
Message
>
subscriber
,
ConnectionFactory
connectionFactory
,
Topic
orders
)
throws
JMSException
{
Connection
connection
=
connectionFactory
.
createConnection
();
Session
session
=
connection
.
createSession
(
true
,
AUTO_ACKNOWLEDGE
);
MessageConsumer
consumer
=
session
.
createConsumer
(
orders
);
consumer
.
setMessageListener
(
subscriber:
:
onNext
);
subscriber
.
add
(
onUnsubscribe
(
connection
));
connection
.
start
();
}
private
Subscription
onUnsubscribe
(
Connection
connection
)
{
return
Subscriptions
.
create
(()
->
{
try
{
connection
.
close
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Can't close"
,
e
);
}
});
}
Die JMS-API bietet zwei Möglichkeiten, Nachrichten von einem Broker zu empfangen: synchron über die blockierende Methode receive()
und nicht blockierend über MessageListener
. Die nicht blockierende API ist aus vielen Gründen vorteilhaft: Sie benötigt zum Beispiel weniger Ressourcen wie Threads und Stack-Speicher. Außerdem passt sie hervorragend zum Rx-Programmierstil. Anstatt eine Instanz von MessageListener
zu erstellen und unseren Abonnenten von dort aus aufzurufen, können wir diese knappe Syntax mit Methodenreferenz verwenden:
consumer
.
setMessageListener
(
subscriber:
:
onNext
)
Außerdem müssen wir uns um die Bereinigung von Ressourcen und die richtige Fehlerbehandlung kümmern. Diese winzige Transformationsschicht ermöglicht es uns, JMS-Nachrichten einfach zu konsumieren, ohne uns um API-Interna zu kümmern. Hier ein Beispiel mit dem beliebten ActiveMQ Messaging Broker, der lokal läuft:
import
org.apache.activemq.ActiveMQConnectionFactory
;
import
org.apache.activemq.command.ActiveMQTopic
;
ConnectionFactory
connectionFactory
=
new
ActiveMQConnectionFactory
(
"tcp://localhost:61616"
);
Observable
<
String
>
txtMessages
=
observe
(
connectionFactory
,
new
ActiveMQTopic
(
"orders"
))
.
cast
(
TextMessage
.
class
)
.
flatMap
(
m
->
{
try
{
return
Observable
.
just
(
m
.
getText
());
}
catch
(
JMSException
e
)
{
return
Observable
.
error
(
e
);
}
});
JMS hat genau wie JDBC den Ruf, stark geprüfte JMSException
zu verwenden, selbst beim Aufruf von getText()
auf einem TextMessage
. Um Fehler richtig zu behandeln (siehe "Fehlerbehandlung" für weitere Details), verwenden wir flatMap()
und wrap exceptions.
Ab diesem Punkt kannst du JMS-Nachrichten wie jeden anderen asynchronen und nicht blockierenden Stream behandeln. Übrigens haben wir den cast()
Operator verwendet, der Upstream-Ereignisse optimistisch auf einen bestimmten Typ castet und ansonsten mit onError()
fehlschlägt.cast()
ist im Grunde ein spezialisierter map()
Operator, der sich wie map(x -> (TextMessage)x)
verhält.
Regelmäßige Abfrage nach Änderungen
Die schlechteste blockierende API, mit der du arbeiten kannst, erfordert eine Abfrage nach Änderungen. Sie bietet keinen Mechanismus, um Änderungen direkt an dich weiterzugeben, auch nicht mit Callbacks oder durch unendliches Blockieren. Der einzige Mechanismus, den diese API bietet, ist die Abfrage des aktuellen Zustands, und es liegt an dir, herauszufinden, ob er sich vom vorherigen Zustand unterscheidet oder nicht. RxJava hat ein paar wirklich mächtige Operatoren, die du anwenden kannst, um eine gegebene API auf Rx-Stil umzurüsten. Der erste Fall, den ich dir vorstellen möchte, ist eine einfache Methode, die einen einzelnen Wert liefert, der den Zustand repräsentiert, zum Beispiellong getOrderBookLength()
. Um Änderungen zu verfolgen, müssen wir diese Methode häufig genug aufrufen und Unterschiede erfassen. Das kannst du in RxJava mit einer sehr einfachen Operator-Komposition erreichen:
Observable
.
interval
(
10
,
TimeUnit
.
MILLISECONDS
)
.
map
(
x
->
getOrderBookLength
())
.
distinctUntilChanged
()
Zunächst erzeugen wir alle 10 Millisekunden einen synthetischen long
Wert, der als tickender Zähler dient. Für jeden solchen Wert (d.h. alle 10 Millisekunden) rufen wir getOrderBookLength()
auf. Die oben genannte Methode ändert sich jedoch nicht so oft, und wir wollen unsere Abonnenten nicht mit vielen irrelevanten Zustandsänderungen überfluten. Zum Glück können wir einfach distinctUntilChanged()
sagen und RxJava überspringt transparent die von getOrderBookLength()
zurückgegebenen long
Werte, die sich seit dem letzten Aufruf nicht geändert haben, wie das folgende Marmordiagramm zeigt:
Wir können dieses Muster sogar noch weiter anwenden. Stell dir vor, du beobachtest, ob sich das Dateisystem oder die Datenbanktabelle ändert. Der einzige Mechanismus, der dir zur Verfügung steht, ist die Erstellung eines aktuellen Schnappschusses der Dateien oder Datenbankeinträge. Du baust eine API auf, die die Kunden über jedes neue Element informiert. Natürlich kannst du auch java.nio.file.WatchService
oder Datenbank-Trigger verwenden, aber nimm dieses Beispiel als Lehrbeispiel. Auch dieses Mal beginnen wir damit, regelmäßig einen Schnappschuss des aktuellen Zustands zu machen:
Observable
<
Item
>
observeNewItems
()
{
return
Observable
.
interval
(
1
,
TimeUnit
.
SECONDS
)
.
flatMapIterable
(
x
->
query
())
.
distinct
();
}
List
<
Item
>
query
()
{
//take snapshot of file system directory
//or database table
}
Der Operator distinct()
speichert alle Elemente, die ihn durchlaufen haben (siehe auch "Duplikate mit distinct() und distinctUntilChanged() ausschließen"). Wenn derselbe Eintrag zum zweiten Mal auftaucht, wird er einfach ignoriert. Deshalb können wir die gleiche Liste von Item
s jede Sekunde pushen. Beim ersten Mal werden sie an alle Abonnenten weitergereicht. Wenn jedoch genau dieselbe Liste eine Sekunde später erscheint, wurden alle Elemente bereits gesehen und werden daher verworfen. Wenn die von query()
zurückgegebene Liste zu einem bestimmten Zeitpunkt ein zusätzlichesItem
enthält, lässt distinct()
es durchgehen, verwirft es aber beim nächsten Mal. Dieses einfache Muster ermöglicht es uns, eine Reihe von Thread.sleep()
Aufrufen und manuellem Caching durch periodisches Polling zu ersetzen.Es ist in vielen Bereichen anwendbar, z. B. beim FTP-Polling (File Transfer Protocol), Web Scraping usw.
Multithreading in RxJava
Es gibt APIs von Drittanbietern, die blockieren und wir können einfach nichts dagegen tun. Vielleicht haben wir keinen Quellcode, und das Umschreiben wäre zu riskant. In diesem Fall müssen wir lernen, mit blockierendem Code umzugehen, anstatt ihn zu bekämpfen.
Eines der Markenzeichen von RxJava ist die deklarative Gleichzeitigkeit, im Gegensatz zur imperativen Gleichzeitigkeit. Das manuelle Erstellen und Verwalten von Threads gehört der Vergangenheit an (vgl. "Thread Pool of Connections"). Die meisten von uns verwenden bereits verwaltete Thread Pools (z. B. mit ExecutorService
). Aber RxJava geht noch einen Schritt weiter: Observable
kann genau wie CompletableFuture
in Java 8 nonblocking sein (vgl. "CompletableFuture und Streams"), aber im Gegensatz zu dem anderen ist es auch lazy. Solange du nicht abonnierst, führt ein braves Observable
keine Aktion aus. Aber die Leistung von Observable
geht noch darüber hinaus.
Eine asynchrone Observable
ist diejenige, die deine Subscriber
s Callback-Methoden (wie onNext()
) von einem anderen Thread aus aufruft. Erinnerst du dich an "Mastering Observable.create()", in dem wir untersucht haben, wann subscribe()
blockiert und wartet, bis alle Benachrichtigungen ankommen? Im wirklichen Leben kommen die meisten Observable
s aus Quellen, die von Natur aus asynchron sind. Kapitel 5 ist ganz solchen Observable
s gewidmet. Aber auch unser einfaches JMS-Beispiel aus "Replacing Callbacks with Streams", das eine eingebaute, nicht blockierende API aus der JMS-Spezifikation verwendet (MessageListener
interface). Dies wird vom Typsystem nicht erzwungen oder vorgeschlagen, aber viele Observable
s sind von Anfang an asynchron, und du solltest davon ausgehen. Eine blockierende subscribe()
Methode kommt sehr selten vor, wenn ein Lambda innerhalb von Observable.create()
nicht durch einen asynchronen Prozess oder Stream unterstützt wird. Standardmäßig (mit create()
) geschieht jedoch alles im Client-Thread (demjenigen, der abonniert wurde). Wenn du onNext()
direkt in deinem create()
Callback aufrufst, ist kein Multithreading und keine Nebenläufigkeit im Spiel.
Wenn wir auf eine solche ungewöhnliche Observable
stoßen, können wir deklarativ die so genannte Scheduler
auswählen, die für die Ausgabe von Werten verwendet wird. Im Fall von CompletableFuture
haben wir keine Kontrolle über die zugrunde liegenden Threads, die API hat die Entscheidung getroffen und im schlimmsten Fall ist es unmöglich, sie außer Kraft zu setzen. RxJava trifft solche Entscheidungen selten allein und wählt den sicheren Standard: Client-Thread und kein Multithreading. Für die Zwecke dieses Kapitels werden wir eine ganz einfache Logging-"Bibliothek" verwenden,"4 die eine Nachricht zusammen mit dem aktuellen Thread und der Anzahl der Millisekunden seit dem Start des Programms unter System.currentTimeMillis()
ausgibt:
void
log
(
Object
label
)
{
System
.
out
.
println
(
System
.
currentTimeMillis
()
-
start
+
"\t| "
+
Thread
.
currentThread
().
getName
()
+
"\t| "
+
label
);
}
Was ist ein Zeitplannungsprogramm?
RxJava ist unabhängig von Gleichzeitigkeit und führt von sich aus keine Gleichzeitigkeit ein. Einige Abstraktionen für den Umgang mit Threads sind jedoch für den Endbenutzer sichtbar. Außerdem können bestimmte Operatoren ohne Gleichzeitigkeit nicht richtig funktionieren; siehe "Andere Verwendungen für Zeitplanungsprogramme" für einige von ihnen. Glücklicherweise ist die Klasse Scheduler
, die einzige, die du beachten musst, ziemlich einfach. Im Prinzip funktioniert sie ähnlich wie ScheduledExecutorService
von java.util.concurrent
- sie führt beliebige Codeblöcke aus, möglicherweise in der Zukunft. Um den Rx-Vertrag zu erfüllen, bietet sie jedoch einige feinkörnigere Abstraktionen, die du im erweiterten Abschnitt "Übersicht über die Details der Zeitplannungsprogramm-Implementierung" genauer kennenlernen kannst .
Zeitplanungsprogramme werden zusammen mit den Operatoren subscribeOn()
und observeOn()
sowie bei der Erstellung bestimmter Typen von Observable
verwendet. Ein Zeitplanungsprogramm erstellt nur Instanzen von Worker
s, die für die Planung und Ausführung von Code zuständig sind. Wenn RxJava einen Code planen muss, bittet es zunächst Scheduler
, ein Worker
bereitzustellen, und verwendet dieses, um die nachfolgenden Aufgaben zu planen. Du wirst später Beispiele für diese API finden, aber mach dich zunächst mit den verfügbaren eingebauten Zeitplanungsprogrammen vertraut:
Schedulers.newThread()
-
Dieses Zeitplannungsprogramm startet einfach jedes Mal einen neuen Thread, wenn er über
subscribeOn()
oderobserveOn()
angefordert wird.newThread()
ist fast nie eine gute Wahl, nicht nur wegen der Latenzzeit beim Starten eines Threads, sondern auch, weil dieser Thread nicht wiederverwendet wird. Im Vorfeld muss Stack-Speicherplatz zugewiesen werden (in der Regel etwa ein Megabyte, was durch den-Xss
Parameter der JVM gesteuert wird) und das Betriebssystem muss einen neuen nativen Thread starten. Wenn dieWorker
fertig ist, wird der Thread einfach beendet. Dieses Zeitplannungsprogramm ist nur dann sinnvoll, wenn die Aufgaben grobkörnig sind: Es braucht viel Zeit, um sie zu erledigen, aber es gibt nur sehr wenige davon, so dass es unwahrscheinlich ist, dass Threads überhaupt wiederverwendet werden. Siehe auch: "Thread per Connection". In der Praxis ist es fast immer die bessere Wahl,Schedulers.io()
zu folgen. Schedulers.io()
-
Dieses Zeitplannungsprogramm ähnelt
newThread()
, aber bereits gestartete Threads werden recycelt und können möglicherweise zukünftige Anfragen bearbeiten. Diese Implementierung funktioniert ähnlich wieThreadPoolExecutor
vonjava.util.concurrent
mit einem unbegrenzten Pool von Threads. Jedes Mal, wenn eine neueWorker
angefordert wird, wird entweder ein neuer Thread gestartet (und später für einige Zeit inaktiv gehalten) oder der inaktive Thread wird wiederverwendet.Der Name
io()
ist nicht zufällig gewählt. Du solltest dieses Zeitplannungsprogramm für I/O-gebundene Aufgaben verwenden, die nur sehr wenig CPU-Ressourcen benötigen. Allerdings benötigen sie oft viel Zeit, weil sie auf das Netzwerk oder die Festplatte warten. Daher ist es eine gute Idee, einen relativ großen Pool von Threads zu haben. Sei jedoch vorsichtig mit unbegrenzten Ressourcen jeglicher Art - im Falle von langsamen oder nicht reagierenden externen Abhängigkeiten wie Webservices könnte das Zeitplannungsprogrammio()
eine enorme Anzahl von Threads starten, was dazu führen könnte, dass auch deine eigene Anwendung nicht mehr reagiert. Im Abschnitt " Umgang mit Ausfällen mit Hystrix" erfährst du mehr darüber, wie du dieses Problem lösen kannst. Schedulers.computation()
-
Du solltest ein Zeitplannungsprogramm verwenden, wenn Aufgaben ausschließlich CPU-gebunden sind, d. h. sie benötigen Rechenleistung und haben keinen blockierenden Code (Lesen von der Festplatte, Netzwerk, Schlafen, Warten auf eine Sperre usw.). Da jede Aufgabe, die mit diesem Zeitplannungsprogramm ausgeführt wird, einen CPU-Kern voll ausnutzen soll, würde es nicht viel bringen, mehr solcher Aufgaben parallel auszuführen, als Kerne zur Verfügung stehen. Deshalb begrenzt das Zeitplannungsprogramm
computation()
die Anzahl der parallel ausgeführten Threads standardmäßig auf den Wert vonavailableProcessors()
, der in der DienstprogrammklasseRuntime.getRuntime()
zu finden ist.Wenn du aus irgendeinem Grund eine andere Anzahl von Threads als die Standardanzahl benötigst, kannst du jederzeit die Systemeigenschaft
rx.scheduler.max-computation-threads
verwenden. Indem du weniger Threads nimmst, stellst du sicher, dass immer ein oder mehrere CPU-Kerne im Leerlauf sind und dass dercomputation()
Thread-Pool deinen Server auch bei hoher Last nicht sättigt. Es ist nicht möglich, mehr Rechenthreads als Kerne zu haben.computation()
Zeitplannungsprogramm verwendet eine unbegrenzte Warteschlange vor jedem Thread, d.h. wenn die Aufgabe geplant ist, aber alle Kerne belegt sind, werden sie in die Warteschlange gestellt. Bei Lastspitzen hält dieses Zeitplannungsprogramm die Anzahl der Threads begrenzt. Die Warteschlange vor den einzelnen Threads wird jedoch immer größer.Zum Glück sorgen eingebaute Operatoren, insbesondere
observeOn()
, die wir in "Deklarative Gleichzeitigkeit mit observeOn()" kennenlernen werden, dafür, dass dieseScheduler
nicht überlastet wird. Schedulers.from(Executor executor)
-
Scheduler
Da die intern komplexer sind als dieExecutor
vonjava.util.concurrent
, wurde eine eigene Abstraktion benötigt. Da sie aber konzeptionell sehr ähnlich sind, überrascht es nicht, dass es einen Wrapper gibt, derExecutor
mit derfrom()
Factory-Methode inScheduler
verwandeln kann:
import
com.google.common.util.concurrent.ThreadFactoryBuilder
;
import
rx.Scheduler
;
import
rx.schedulers.Schedulers
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.ThreadFactory
;
import
java.util.concurrent.ThreadPoolExecutor
;
//...
ThreadFactory
threadFactory
=
new
ThreadFactoryBuilder
()
.
setNameFormat
(
"MyPool-%d"
)
.
build
();
Executor
executor
=
new
ThreadPoolExecutor
(
10
,
//corePoolSize
10
,
//maximumPoolSize
0L
,
TimeUnit
.
MILLISECONDS
,
//keepAliveTime, unit
new
LinkedBlockingQueue
<>(
1000
),
//workQueue
threadFactory
);
Scheduler
scheduler
=
Schedulers
.
from
(
executor
);
Ich verwende absichtlich diese ausführliche Syntax für die Erstellung von ExecutorService
und nicht die einfachere Version:
import
java.util.concurrent.Executors
;
//...
ExecutorService
executor
=
Executors
.
newFixedThreadPool
(
10
);
So verlockend die Werksklasse Executors
auch sein mag, so sind doch einige Vorgaben fest programmiert, die in Unternehmensanwendungen unpraktisch oder sogar gefährlich sind. So verwendet sie zum Beispiel die unbegrenzte Anzahl von LinkedBlockingQueue
, die ins Unendliche wachsen kann, was zu OutOfMemoryError
führt, wenn eine große Anzahl von Aufgaben ansteht. Außerdem verwendet die Voreinstellung ThreadFactory
sinnlose Thread-Namen wie pool-5-thread-3
. Die richtige Benennung von Threads ist ein unschätzbares Werkzeug bei der Profilerstellung oder der Analyse von Thread-Dumps. ThreadFactory
von Grund auf neu zu implementieren ist etwas umständlich, daher haben wir den ThreadFactoryBuilder von Guava verwendet. Wenn du dich für das Tuning und die richtige Nutzung von Thread Pools interessierst, findest du weitere Informationen unter "Thread Pool von Verbindungen" und "Verwaltung von Ausfällen mit Hystrix". Die Erstellung von Zeitplannungsprogrammen aus Executor
, die wir bewusst konfiguriert haben, ist für Projekte mit hoher Last empfehlenswert. Da RxJava jedoch keine Kontrolle über unabhängig erstellte Threads in einem Executor
hat, kann es keine Threads pinnen (d.h. versuchen, die Arbeit derselben Aufgabe auf demselben Thread zu halten, um die Cache-Lokalität zu verbessern). Dieses Scheduler
sorgt lediglich dafür, dass ein einzelnes Scheduler.Worker
(siehe "Übersicht über die Details der Zeitplannungsprogramm-Implementierung") die Ereignisse sequentiell abarbeitet.
Schedulers.immediate()
-
Schedulers.immediate()
ist ein spezielles Zeitplannungsprogramm, das eine Aufgabe im Client-Thread nicht asynchron, sondern blockierend aufruft. Die Verwendung dieses Zeitplannungsprogramms ist sinnlos, es sei denn, ein Teil deiner API erfordert die Bereitstellung eines Zeitplannungsprogramms, während du mit dem Standardverhalten vonObservable
völlig zufrieden bist, da es kein Threading erfordert. Tatsächlich hat das Abonnieren einesObservable
(mehr dazu in einer Sekunde) überimmediate()
Scheduler
in der Regel denselben Effekt wie das Nicht-Abonnieren mit einem bestimmten Zeitplannungsprogramm. Generell solltest du dieses Zeitplannungsprogramm vermeiden, da es den aufrufenden Thread blockiert und nur von begrenztem Nutzen ist. Schedulers.trampoline()
-
Das Zeitplannungsprogramm
trampoline()
istimmediate()
sehr ähnlich, da es die Aufgaben ebenfalls im selben Thread plant, also effektiv blockiert. Im Gegensatz zuimmediate()
wird die nächste Aufgabe jedoch erst dann ausgeführt, wenn alle zuvor geplanten Aufgaben abgeschlossen sind.immediate()
ruft eine bestimmte Aufgabe sofort auf, währendtrampoline()
wartet, bis die aktuelle Aufgabe abgeschlossen ist. Trampoline ist ein Muster in der funktionalen Programmierung, das es ermöglicht, Rekursionen zu implementieren, ohne den Aufrufstapel ins Unendliche wachsen zu lassen. Das lässt sich am besten anhand eines Beispiels erklären, das zunächstimmediate()
betrifft. Beachte übrigens, dass wir nicht direkt mit einerScheduler
Instanz interagieren, sondern zuerst eineWorker
erstellen. Das macht Sinn, wie du in der "Übersicht über die Details der Zeitplannungsprogramme" sehen wirst .Scheduler
scheduler
=
Schedulers
.
immediate
();
Scheduler
.
Worker
worker
=
scheduler
.
createWorker
();
log
(
"Main start"
);
worker
.
schedule
(()
->
{
log
(
" Outer start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Inner start"
);
sleepOneSecond
();
log
(
" Inner end"
);
});
log
(
" Outer end"
);
});
log
(
"Main end"
);
worker
.
unsubscribe
();
Die Ausgabe ist wie erwartet; du könntest
schedule()
tatsächlich durch einen einfachen Methodenaufruf ersetzen:1044 | main | Main start 1094 | main | Outer start 2097 | main | Inner start 3097 | main | Inner end 3100 | main | Outer end 3100 | main | Main end
Innerhalb des
Outer
Blocksschedule()
Inner
Blocks, der sofort aufgerufen wird und dieOuter
Aufgabe unterbricht. WennInner
fertig ist, geht die Kontrolle zurück anOuter
. Auch dies ist einfach eine verschlungene Art, eine Aufgabe indirekt überimmediate()
Scheduler
aufzurufen und zu blockieren. Aber was passiert, wenn wirSchedulers.immediate()
durchSchedulers.trampoline()
ersetzen? Die Ausgabe ist ganz anders:1030 | main | Main start 1096 | main | Outer start 2101 | main | Outer end 2101 | main | Inner start 3101 | main | Inner end 3101 | main | Main end
Siehst du, wie
Outer
es schafft, fertig zu werden, bevorInner
überhaupt startet? Das liegt daran, dass die AufgabeInner
in der Warteschlange vontrampoline()
Scheduler
stand, die bereits von der AufgabeOuter
belegt war. AlsOuter
fertig war, begann die erste Aufgabe aus der Warteschlange (Inner
). Wir können sogar noch weiter gehen, um sicherzustellen, dass du den Unterschied verstehst:log
(
"Main start"
);
worker
.
schedule
(()
->
{
log
(
" Outer start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Middle start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Inner start"
);
sleepOneSecond
();
log
(
" Inner end"
);
});
log
(
" Middle end"
);
});
log
(
" Outer end"
);
});
log
(
"Main end"
);
Die
Worker
vonimmediate()
Scheduler
gibt folgendes aus:1029 | main | Main start 1091 | main | Outer start 2093 | main | Middle start 3095 | main | Inner start 4096 | main | Inner end 4099 | main | Middle end 4099 | main | Outer end 4099 | main | Main end
Gegen den
trampoline()
Arbeiter:1041 | main | Main start 1095 | main | Outer start 2099 | main | Outer end 2099 | main | Middle start 3101 | main | Middle end 3101 | main | Inner start 4102 | main | Inner end 4102 | main | Main end
Schedulers.test()
-
Dieses
Scheduler
wird nur zu Testzwecken verwendet und du wirst es nie im Produktionscode sehen. Sein Hauptvorteil ist die Möglichkeit, die Uhr beliebig vorzustellen und so zu simulieren, dass die Zeit vergeht.TestScheduler
wird in "Schedulers in Unit Testing" ausführlich beschrieben .Scheduler
s allein sind nicht sehr interessant. Wenn du wissen willst, wie sie intern funktionieren und wie du deine eigenen implementieren kannst, schau dir den nächsten Abschnitt an.
Übersicht über die Details des Zeitplannungsprogramms
Hinweis
Dieser Abschnitt ist völlig optional. Du kannst direkt zu "Declarative Subscription with subscribeOn()" springen, wenn du nicht an den Implementierungsdetails interessiert bist.
Scheduler
entkoppelt nicht nur die Aufgaben und ihre Ausführung (in der Regel, indem sie in einem anderen Thread ausgeführt werden), sondern abstrahiert auch die Uhr, wie wir in "Virtual Time" lernen werden . Die API von Scheduler
ist etwas einfacher als z. B. die von ScheduledExecutorService
:
abstract
class
Scheduler
{
abstract
Worker
createWorker
();
long
now
();
abstract
static
class
Worker
implements
Subscription
{
abstract
Subscription
schedule
(
Action0
action
);
abstract
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
);
long
now
();
}
}
Wenn RxJava eine Aufgabe planen will (vermutlich, aber nicht unbedingt im Hintergrund), muss es zunächst eine Instanz von Worker
anfordern. Die Worker
ermöglicht es, die Aufgabe ohne Verzögerung oder zu einem bestimmten Zeitpunkt zu planen. Sowohl Scheduler
als auch Worker
haben eine überschreibbare Zeitquelle (Methodenow()
), die sie verwenden, um zu bestimmen, wann eine bestimmte Aufgabe ausgeführt werden soll. Naiv betrachtet kannst du dir Scheduler
wie einen Thread-Pool und Worker
wie einen Thread innerhalb dieses Pools vorstellen.
Die Trennung zwischen Scheduler
und Worker
ist notwendig, um einige der Richtlinien des Rx-Vertrages einfach umzusetzen, nämlich die Methode von Subscriber
nacheinander und nicht gleichzeitig aufzurufen. Worker
Der Rx-Vertrag sieht genau das vor: Zwei Aufgaben, die auf demselben Worker
geplant sind, werden niemals gleichzeitig ausgeführt. Unabhängige Worker
s desselben Scheduler
können jedoch problemlos Aufgaben gleichzeitig ausführen.
Anstatt die API durchzugehen, wollen wir den Quellcode eines bestehenden Scheduler
analysieren, nämlich HandlerScheduler
, das im RxAndroid-Projekt zu finden ist. Dieses Scheduler
führt einfach alle geplanten Aufgaben in einem Android UI-Thread aus. Die Aktualisierung der Benutzeroberfläche ist nur von diesem Thread aus möglich (weitere Informationen findest du unter "Android-Entwicklung mit RxJava" ). Das ist ähnlich wie der Event Dispatch Thread (EDT) in Swing, wo die meisten Aktualisierungen von Fenstern und Komponenten in einem eigenen Thread (EDT) ausgeführt werden müssen. Es überrascht nicht, dass es auch das RxSwing5 Projekt für diese Aufgabe.
Der folgende Codeschnipsel ist eine abgespeckte und unvollständige Klasse aus RxAndroid, die nur zu Schulungszwecken dient:
package
rx
.
android
.
schedulers
;
import
android.os.Handler
;
import
android.os.Looper
;
import
rx.Scheduler
;
import
rx.Subscription
;
import
rx.functions.Action0
;
import
rx.internal.schedulers.ScheduledAction
;
import
rx.subscriptions.Subscriptions
;
import
java.util.concurrent.TimeUnit
;
public
final
class
SimplifiedHandlerScheduler
extends
Scheduler
{
@Override
public
Worker
createWorker
()
{
return
new
HandlerWorker
();
}
static
class
HandlerWorker
extends
Worker
{
private
final
Handler
handler
=
new
Handler
(
Looper
.
getMainLooper
());
@Override
public
void
unsubscribe
()
{
//Implementation coming soon...
}
@Override
public
boolean
isUnsubscribed
()
{
//Implementation coming soon...
return
false
;
}
@Override
public
Subscription
schedule
(
final
Action0
action
)
{
return
schedule
(
action
,
0
,
TimeUnit
.
MILLISECONDS
);
}
@Override
public
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
)
{
ScheduledAction
scheduledAction
=
new
ScheduledAction
(
action
);
handler
.
postDelayed
(
scheduledAction
,
unit
.
toMillis
(
delayTime
));
scheduledAction
.
add
(
Subscriptions
.
create
(()
->
handler
.
removeCallbacks
(
scheduledAction
)));
return
scheduledAction
;
}
}
}
Die Details der Android-API sind im Moment nicht wichtig. Was hier passiert, ist, dass jedes Mal, wenn wir etwas auf HandlerWorker
planen, der Codeblock an eine spezielle postDelayed()
Methode übergeben wird, die ihn in einem speziellen Android-Thread ausführt. Es gibt nur einen solchen Thread, so dass Ereignisse nicht nur innerhalb, sondern auch über Worker
hinaus serialisiert werden.
Bevor wir action
zur Ausführung übergeben, wickeln wir es mit ScheduledAction
ein, das sowohl Runnable
als auch Subscription
implementiert. RxJava ist faul, wann immer es geht - das gilt auch für das Planen von Aufgaben. Wenn du aus irgendeinem Grund beschließt, dass eine bestimmte action
doch nicht ausgeführt werden soll (das macht Sinn, wenn die Aktion in der Zukunft und nicht sofort geplant wurde), führe einfach unsubscribe()
auf dem von schedule()
zurückgegebenen Subscription
aus. Es liegt in der Verantwortung von Worker
, die Abmeldung ordnungsgemäß zu handhaben (zumindest so gut es geht).
Der Client-Code kann auch beschließen, unsubscribe()
von Worker
vollständig zu verlassen. Dadurch werden alle Aufgaben in der Warteschlange abbestellt und der Worker
freigegeben, so dass der zugrunde liegende Thread später wieder verwendet werden kann. Der folgende Codeschnipsel erweitert die SimplifiedHandlerScheduler
um den Worker
Abmeldefluss (nur die geänderten Methoden sind enthalten):
private
CompositeSubscription
compositeSubscription
=
new
CompositeSubscription
();
@Override
public
void
unsubscribe
()
{
compositeSubscription
.
unsubscribe
();
}
@Override
public
boolean
isUnsubscribed
()
{
return
compositeSubscription
.
isUnsubscribed
();
}
@Override
public
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
)
{
if
(
compositeSubscription
.
isUnsubscribed
())
{
return
Subscriptions
.
unsubscribed
();
}
final
ScheduledAction
scheduledAction
=
new
ScheduledAction
(
action
);
scheduledAction
.
addParent
(
compositeSubscription
);
compositeSubscription
.
add
(
scheduledAction
);
handler
.
postDelayed
(
scheduledAction
,
unit
.
toMillis
(
delayTime
));
scheduledAction
.
add
(
Subscriptions
.
create
(()
->
handler
.
removeCallbacks
(
scheduledAction
)));
return
scheduledAction
;
}
In "Steuerung von Listenern mit Hilfe von Subscription und Subscriber<T>" haben wir die Schnittstelle Subscription
erkundet, uns aber nie wirklich mit den Implementierungsdetails befasst. CompositeSubscription
ist eine von vielen verfügbaren Implementierungen, die selbst nur ein Container für untergeordnete Subscription
s ist (ein Composite-Designmuster ). Wenn du dich von CompositeSubscription
abmeldest, meldest du dich von allen Kindern ab. Du kannst auch die von CompositeSubscription
verwalteten Kinder hinzufügen und entfernen.
In unserem benutzerdefinierten Scheduler
wird CompositeSubscription
verwendet, um alle Subscription
s von den vorherigen schedule()
Aufrufen zu verfolgen (siehe compositeSubscription.add(scheduledAction)
). Andererseits muss das Kind ScheduledAction
über sein Elternteil Bescheid wissen (siehe: addParent()
), damit es sich selbst entfernen kann, wenn die Aktion abgeschlossen oder abgebrochen wurde. Andernfalls würde Worker
für immer veraltete Child Subscription
s anhäufen. Wenn der Kundencode beschließt, dass er eine Instanz von HandlerWorker
nicht mehr benötigt, meldet er sich von ihr ab. Die Abmeldung wird an alle ausstehenden untergeordneten Instanzen von Subscription
weitergegeben (falls vorhanden).
Das war eine sehr kurze Einführung in Scheduler
s in RxJava. Die Details ihrer Interna sind für die tägliche Arbeit nicht so nützlich; sie sind vielmehr so konzipiert, dass die Verwendung von RxJava intuitiver und vorhersehbarer wird. Trotzdem wollen wir uns kurz ansehen, wie Scheduler
s viele Gleichzeitigkeitsprobleme in Rx lösen.
Deklaratives Abonnement mit subscribeOn()
In "Mastering Observable.create()" haben wir gesehen, dass subscribe()
standardmäßig den Client-Thread verwendet. Um es noch einmal zusammenzufassen: Hier ist das einfachste Abonnement, das du dir ausdenken kannst, bei dem kein Threading im Spiel war:
Observable
<
String
>
simple
()
{
return
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onCompleted
();
});
}
//...
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
final
Observable
<
String
>
obs2
=
obs
.
map
(
x
->
x
)
.
filter
(
x
->
true
);
log
(
"Transformed"
);
obs2
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Beachte, wo die Logging-Anweisungen platziert sind, und sieh dir die Ausgabe genau an, insbesondere im Hinblick darauf, welcher Thread die Druckanweisung aufgerufen hat:
33 | main | Starting 120 | main | Created 128 | main | Transformed 133 | main | Subscribed 133 | main | Got A 133 | main | Got B 133 | main | Completed 134 | main | Exiting
Pass auf: Die Reihenfolge der Anweisungen ist absolut vorhersehbar. Erstens läuft jede Codezeile im vorangegangenen Codeschnipsel im Thread main
, es gibt keine Thread-Pools und keine asynchrone Ausgabe von Ereignissen. Zweitens ist die Reihenfolge der Ausführung auf den ersten Blick vielleicht nicht ganz klar.
Wenn das Programm startet, gibt es Starting
aus, was verständlich ist. Nachdem wir eine Instanz von Observable<String>
erstellt haben, sehen wir die Nachricht Created
. Beachte, dass Subscribed
erst später erscheint, wenn wir uns tatsächlich anmelden. Ohne den Aufruf von subscribe()
wird der Codeblock in Observable.create()
nie ausgeführt. Außerdem haben auch die Operatoren map()
und filter()
keine sichtbaren Nebeneffekte. Beachte, dass die Nachricht Transformed
noch vor Subscribed
ausgegeben wird.
Später erhalten wir alle ausgegebenen Ereignisse und die Fertigstellungsmeldung. Schließlich wird die Anweisung Exiting
gedruckt und das Programm kann zurückkehren. Das ist eine interessante Beobachtung -subscribe()
sollte einen Callback registrieren, wenn Ereignisse asynchron auftreten. Das ist die Annahme, die du standardmäßig machen solltest. In diesem Fall gibt es jedoch kein Threading und subscribe()
ist tatsächlich blockierend. Warum ist das so?
Es gibt eine inhärente, aber versteckte Verbindung zwischen subscribe()
und create()
. Jedes Mal, wenn du subscribe()
auf Observable
aufrufst, wird die Callback-Methode von OnSubscribe
aufgerufen (wobei der Lambda-Ausdruck, den du an create()
übergeben hast, verpackt wird). Sie erhält deine Subscriber
als Argument. Standardmäßig geschieht dies im selben Thread und ist blockierend, so dass alles, was du innerhalb von create()
tust, subscribe()
blockiert. Wenn deine Methode create()
ein paar Sekunden schläft, wird subscribe()
blockiert. Wenn zwischen Observable.create()
und deiner Subscriber
Methode (Lambda, das als Callback fungiert) Operatoren stehen, werden diese Operatoren im Namen des Threads aufgerufen, der subscribe()
aufgerufen hat. RxJava fügt standardmäßig keine Gleichzeitigkeitsfunktionen zwischen Observable
und Subscriber
ein. Der Grund dafür ist, dass Observable
in der Regel durch andere Gleichzeitigkeitsmechanismen wie Ereignisschleifen oder benutzerdefinierte Threads unterstützt wird, so dass Rx dir die volle Kontrolle überlässt und keine Konventionen vorschreibt.
Diese Beobachtung bereitet die Landschaft für den subscribeOn()
Operator vor. Indem du subscribeOn()
irgendwo zwischen Observable
und subscribe()
einfügst, wählst du deklarativ Scheduler
aus, wo die Callback-Methode OnSubscribe
aufgerufen wird. Egal, was du innerhalb von create()
tust, diese Arbeit wird auf eine unabhängige Scheduler
verlagert und dein subscribe()
Aufruf blockiert nicht mehr:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
35 | main | Starting 112 | main | Created 123 | main | Exiting 123 | Sched-A-0 | Subscribed 124 | Sched-A-0 | Got A 124 | Sched-A-0 | Got B 124 | Sched-A-0 | Completed
Siehst du, dass der Thread main
beendet wird, bevor Observable
überhaupt mit der Ausgabe von Werten beginnt? Technisch gesehen ist die Reihenfolge der Logmeldungen nicht mehr so vorhersehbar, weil zwei Threads gleichzeitig laufen: main
Der Thread Sched-A-0
, der sich angemeldet hat und aussteigen will, und der Thread , der Ereignisse sendet, sobald sich jemand angemeldet hat. Sowohl der schedulerA
als auch der Sched-A-0
Thread stammen aus den folgenden Zeitplannungsprogrammen, die wir zur Veranschaulichung erstellt haben:
import
static
java
.
util
.
concurrent
.
Executors
.
newFixedThreadPool
;
ExecutorService
poolA
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-A-%d"
));
Scheduler
schedulerA
=
Schedulers
.
from
(
poolA
);
ExecutorService
poolB
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-B-%d"
));
Scheduler
schedulerB
=
Schedulers
.
from
(
poolB
);
ExecutorService
poolC
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-C-%d"
));
Scheduler
schedulerC
=
Schedulers
.
from
(
poolC
);
private
ThreadFactory
threadFactory
(
String
pattern
)
{
return
new
ThreadFactoryBuilder
()
.
setNameFormat
(
pattern
)
.
build
();
}
Diese Zeitplannungsprogramme werden in allen Beispielen verwendet, aber sie sind relativ leicht zu erinnern. Drei unabhängige Zeitplannungsprogramme, die jeweils 10 Threads von einem ExecutorService
verwalten. Um die Ausgabe übersichtlicher zu gestalten, hat jeder Thread-Pool ein eigenes Namensmuster.
Bevor wir beginnen, musst du verstehen, dass subscribeOn()
in ausgereiften Anwendungen nur sehr selten verwendet wird. Normalerweise kommen Observable
s aus Quellen, die von Natur aus asynchron sind (wie RxNetty, siehe "Nonblocking HTTP Server with Netty and RxNetty") oder von sich aus Scheduling anwenden (wie Hystrix, siehe "Managing Failures with Hystrix"). Du solltest subscribeOn()
nur in besonderen Fällen behandeln, wenn das zugrunde liegende Observable
bekanntlich synchron ist (create()
ist blockierend). Allerdings ist subscribeOn()
immer noch eine viel bessere Lösung als das handgefertigte Threading innerhalb von create()
:
//Don't do this
Observable
<
String
>
obs
=
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
Runnable
code
=
()
->
{
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onCompleted
();
};
new
Thread
(
code
,
"Async"
).
start
();
});
Der obige Code vermischt zwei Konzepte: die Produktion von Ereignissen und die Wahl der Gleichzeitigkeitsstrategie. Observable
sollte nur für die Produktionslogik zuständig sein, während nur der Client-Code eine vernünftige Entscheidung über die Gleichzeitigkeit treffen kann. Erinnere dich daran, dass Observable
faul, aber auch unveränderlich ist, in dem Sinne, dass subscribeOn()
nur nachgeschaltete Abonnenten betrifft. Wenn jemand genau dieselbe Observable
ohne subscribeOn()
dazwischen abonniert, wird standardmäßig keine Gleichzeitigkeit involviert sein.
Vergiss nicht, dass wir uns in diesem Kapitel auf bestehende Anwendungen konzentrieren und RxJava schrittweise einführen. Der subscribeOn()
Operator ist unter diesen Umständen sehr nützlich; sobald du jedoch reaktive Erweiterungen verstehst und sie in großem Umfang einsetzt, nimmt der Wert von subscribeOn()
ab. In vollständig reaktiven Software-Stacks, wie sie z. B. bei Netflix zu finden sind, wird subscribeOn()
fast nie verwendet, obwohl alle Observable
s asynchron sind.
Die meisten Observable
s stammen aus asynchronen Quellen und werden standardmäßig als asynchron behandelt. Daher wird subscribeOn()
nur sehr begrenzt verwendet, meist bei der Nachrüstung bestehender APIs oder Bibliotheken. In Kapitel 5 schreiben wir wirklich asynchrone Anwendungen ganz ohne explizite subscribeOn()
und Scheduler
s.
subscribeOn() Gleichzeitigkeit und Verhalten
Es gibt mehrere Nuancen, wie subscribeOn()
funktioniert. Zunächst einmal sollte sich der neugierige Leser fragen, was passiert, wenn zwei Aufrufe von subscribeOn()
zwischen Observable
und subscribe()
erscheinen. Die Antwort ist einfach: subscribeOn()
, das dem ursprünglichen Observable
am nächsten liegt , gewinnt. Das hat wichtige praktische Auswirkungen. Wenn du eine API entwirfst und intern subscribeOn()
verwendest, hat der Client-Code keine Möglichkeit, die Scheduler
deiner Wahl zu überschreiben. Das kann eine bewusste Designentscheidung sein; schließlich weiß der API-Designer vielleicht am besten, welche Scheduler
geeignet ist. Andererseits ist es immer eine gute Idee, eine überladene Version der besagten API bereitzustellen, die es erlaubt, die gewählte Scheduler
zu überschreiben.
Lass uns untersuchen, wie sich subscribeOn()
verhält:
log
(
"Starting"
);
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
//many other operators
.
subscribeOn
(
schedulerB
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Die Ausgabe zeigt nur die Threads von schedulerA
an:
17 | main | Starting 73 | main | Created 83 | main | Exiting 84 | Sched-A-0 | Subscribed 84 | Sched-A-0 | Got A 84 | Sched-A-0 | Got B 84 | Sched-A-0 | Completed
Interessanterweise wird das Abonnieren von schedulerB
nicht vollständig zugunsten von schedulerA
ignoriert.schedulerB
wird zwar immer noch für eine kurze Zeitspanne verwendet, aber es plant kaum neue Aktionen auf schedulerA
ein, das die ganze Arbeit erledigt. So werden mehrere subscribeOn()
nicht nur ignoriert, sondern verursachen auch einen geringen Overhead.
Apropos Operatoren: Wir haben gesagt, dass die Methode create()
, die verwendet wird, wenn es ein neues Subscriber
gibt, innerhalb des bereitgestellten Zeitplannungsprogramms (falls vorhanden) ausgeführt wird. Aber welcher Thread führt all diese Umwandlungen zwischen create()
und subscribe()
aus? Wir wissen bereits, dass alle Operatoren standardmäßig in demselben Thread (Zeitplannungsprogramm) ausgeführt werden, also keine Gleichzeitigkeit besteht:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
this
::
log
)
.
map
(
x
->
x
+
'1'
)
.
doOnNext
(
this
::
log
)
.
map
(
x
->
x
+
'2'
)
.
subscribeOn
(
schedulerA
)
.
doOnNext
(
this
::
log
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Wir haben die Pipeline der Operatoren gelegentlich mit doOnNext()
bestreut, um zu sehen, welcher Thread an dieser Stelle die Kontrolle hat. Erinnere dich daran, dass die Position von subscribeOn()
nicht relevant ist, sie kann direkt nach Observable
oder kurz vor subscribe()
sein. Das Ergebnis ist nicht überraschend:
20 | main | Starting 104 | main | Created 123 | main | Exiting 124 | Sched-A-0 | Subscribed 124 | Sched-A-0 | A 124 | Sched-A-0 | A1 124 | Sched-A-0 | A12 124 | Sched-A-0 | Got A12 124 | Sched-A-0 | B 124 | Sched-A-0 | B1 124 | Sched-A-0 | B12 125 | Sched-A-0 | Got B12
Beobachte, wie create()
aufgerufen wird und die Ereignisse A
und B
erzeugt. Diese Ereignisse durchlaufen nacheinander den Thread des Zeitplannungsprogramms und erreichen schließlich Subscriber
. Viele RxJava-Neulinge glauben, dass die Verwendung eines Scheduler
mit einer großen Anzahl von Threads die gleichzeitige Verarbeitung von Ereignissen automatisch aufspaltet und am Ende alle Ergebnisse irgendwie zusammenführt. Das ist aber nicht der Fall. RxJava erstellt eine einzige Worker
Instanz (siehe: "Übersicht über die Details der Zeitplannungsprogramm-Implementierung") für die gesamte Pipeline, vor allem um eine sequenzielle Verarbeitung der Ereignisse zu gewährleisten.
Das bedeutet, dass, wenn einer deiner Operatoren besonders langsam ist - z. B. map()
, der Daten von der Festplatte liest, um vorbeiziehende Ereignisse umzuwandeln - dieser kostspielige Vorgang im selben Thread aufgerufen wird. Ein einziger fehlerhafter Operator kann die gesamte Pipeline verlangsamen, von der Produktion bis zum Verbrauch. Das ist in RxJava nicht üblich. Operatoren sollten nicht blockierend, schnell und so rein wie möglich sein.
Auch hier kommt flatMap()
zur Rettung. Anstatt innerhalb von map()
zu blockieren, können wir flatMap()
aufrufen und alle Ergebnisse asynchron sammeln. Daher sind flatMap()
und merge()
die Operatoren, wenn wir echte Parallelität erreichen wollen. Aber auch mit flatMap()
ist es nicht offensichtlich. Stell dir einen Lebensmittelladen vor (nennen wir ihn "RxGroceries"), der eine API für den Einkauf von Waren anbietet:
class
RxGroceries
{
Observable
<
BigDecimal
>
purchase
(
String
productName
,
int
quantity
)
{
return
Observable
.
fromCallable
(()
->
doPurchase
(
productName
,
quantity
));
}
BigDecimal
doPurchase
(
String
productName
,
int
quantity
)
{
log
(
"Purchasing "
+
quantity
+
" "
+
productName
);
//real logic here
log
(
"Done "
+
quantity
+
" "
+
productName
);
return
priceForProduct
;
}
}
Natürlich ist die Implementierung von doPurchase()
hier irrelevant, stell dir einfach vor, dass sie einige Zeit und Ressourcen in Anspruch nimmt. Wir simulieren die Geschäftslogik, indem wir einen künstlichen Schlaf von einer Sekunde hinzufügen, der etwas höher ist, wenn quantity
größer ist. Blockierende Observable
s wie die, die von purchase()
zurückgegeben wird, sind in einer realen Anwendung ungewöhnlich, aber wir wollen es zu Lehrzwecken so belassen. Beim Kauf mehrerer Waren möchten wir so viel wie möglich parallelisieren und am Ende den Gesamtpreis für alle Waren berechnen. Der erste Versuch ist erfolglos:
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
subscribeOn
(
schedulerA
)
//BROKEN!!!
.
map
(
prod
->
rxGroceries
.
doPurchase
(
prod
,
1
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
Das Ergebnis ist korrekt, es ist ein Observable
mit nur einem einzigen Wert: dem Gesamtpreis, der mit reduce()
berechnet wurde. Für jedes Produkt rufen wir doPurchase()
mit quantity
einmal auf. Obwohl schedulerA
von einem Thread-Pool mit 10 Threads unterstützt wird, ist der Code vollständig sequentiell:
144 | Sched-A-0 | Purchasing 1 bread 1144 | Sched-A-0 | Done 1 bread 1146 | Sched-A-0 | Purchasing 1 butter 2146 | Sched-A-0 | Done 1 butter 2146 | Sched-A-0 | Purchasing 1 milk 3147 | Sched-A-0 | Done 1 milk 3147 | Sched-A-0 | Purchasing 1 tomato 4147 | Sched-A-0 | Done 1 tomato 4147 | Sched-A-0 | Purchasing 1 cheese 5148 | Sched-A-0 | Done 1 cheese
Beachte, wie jedes Produkt die nachfolgenden Produkte von der Verarbeitung abhält. Wenn der Kauf von Brot abgeschlossen ist, beginnt die Butter sofort, aber nicht früher. Seltsamerweise hilft auch das Ersetzen von map()
durch flatMap()
nicht, und das Ergebnis ist genau dasselbe:
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
subscribeOn
(
schedulerA
)
.
flatMap
(
prod
->
rxGroceries
.
purchase
(
prod
,
1
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
Der Code funktioniert nicht gleichzeitig, weil es nur einen einzigen Fluss von Ereignissen gibt, der von vornherein sequenziell ablaufen muss. Andernfalls müsste dein Subscriber
gleichzeitige Benachrichtigungen kennen (onNext()
, onComplete()
, etc.), also ist es ein fairer Kompromiss. Glücklicherweise ist die idiomatische Lösung sehr nahe dran. Die wichtigsten Observable
emittierenden Produkte können nicht parallelisiert werden. Wir erstellen jedoch für jedes Produkt ein neues, unabhängiges Observable
, wie es von purchase()
zurückgegeben wird. Da sie unabhängig sind, können wir jedes einzelne von ihnen sicher gleichzeitig einplanen:
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
flatMap
(
prod
->
rxGroceries
.
purchase
(
prod
,
1
)
.
subscribeOn
(
schedulerA
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
Kannst du erkennen, wo subscribeOn()
ist? Der Haupt-Thread Observable
macht eigentlich nichts, daher ist ein spezieller Thread-Pool unnötig. Allerdings wird jeder Substream, der innerhalb von flatMap()
erstellt wird, mit einem schedulerA
versorgt. Jedes Mal, wenn subscribeOn()
benutzt wird, bekommt Scheduler
die Chance, einen neuen Worker
und damit einen separaten Thread zu erstellen (was die Sache ein wenig vereinfacht):
113 | Sched-A-1 | Purchasing 1 butter 114 | Sched-A-0 | Purchasing 1 bread 125 | Sched-A-2 | Purchasing 1 milk 125 | Sched-A-3 | Purchasing 1 tomato 126 | Sched-A-4 | Purchasing 1 cheese 1126 | Sched-A-2 | Done 1 milk 1126 | Sched-A-0 | Done 1 bread 1126 | Sched-A-1 | Done 1 butter 1128 | Sched-A-3 | Done 1 tomato 1128 | Sched-A-4 | Done 1 cheese
Endlich haben wir echte Gleichzeitigkeit erreicht. Jeder Kaufvorgang beginnt nun zur gleichen Zeit und alle werden schließlich abgeschlossen. Der flatMap()
Operator ist sorgfältig konzipiert und implementiert, damit er alle Ereignisse aus allen unabhängigen Streams sammelt und sie nacheinander weiterleitet. Wie wir jedoch bereits in "Reihenfolge der Ereignisse nach flatMap()" gelernt haben , können wir uns nicht mehr auf die Reihenfolge der nachgelagerten Ereignisse verlassen - sie beginnen und enden nicht in der gleichen Reihenfolge, in der sie ausgesendet wurden (die ursprüngliche Reihenfolge begann bei Brot). Wenn die Ereignisse den reduce()
Operator erreichen, sind sie bereits sequenziell und verhalten sich gut.
Inzwischen solltest du dich langsam vom klassischen Thread
Modell lösen und verstehen, wie Schedulers
funktioniert. Aber wenn dir das schwerfällt, hier ist eine einfache Analogie:
-
Observable
ohneScheduler
funktioniert wie ein Single-Thread-Programm mit blockierenden Methodenaufrufen, die untereinander Daten austauschen. -
Observable
mit einem einzigensubscribeOn()
ist wie das Starten einer großen Aufgabe im HintergrundThread
. Das Programm in diesemThread
ist zwar immer noch sequentiell, aber zumindest läuft es im Hintergrund. -
Observable
Die Verwendung vonflatMap()
, bei der jeder interneObservable
subscribeOn()
hat, funktioniert wieForkJoinPool
vonjava.util.concurrent
, wobei jeder Teilstrom eine Gabelung der Ausführung ist undflatMap()
eine sichere Join-Phase ist.
Die vorstehenden Tipps gelten natürlich nur für blockierende Observable
s, die in realen Anwendungen nur selten vorkommen. Wenn deine zugrunde liegenden Observable
s bereits asynchron sind, ist das Erreichen von Gleichzeitigkeit eine Frage des Verständnisses, wie sie kombiniert werden und wann die Subskription erfolgt. merge()
auf zwei Streams wird zum Beispiel beide gleichzeitig abonnieren, während der concat()
Operator wartet, bis der erste Stream beendet ist, bevor er den zweiten abonniert.
Batching-Anfragen mit groupBy()
Hast du bemerkt, dass RxGroceries.purchase()
productName
und quantity
nimmt, obwohl die Menge immer eins war? Was wäre, wenn unsere Lebensmittelliste einige Produkte mehrfach enthielte, was auf eine größere Nachfrage hindeutet? Die erste naive Implementierung sendet einfach dieselbe Anfrage, z. B. nach einem Ei, mehrmals und fragt jedes Mal nach einem. Glücklicherweise können wir solche Anfragen deklarativ stapeln, indem wir groupBy()
verwenden - und das funktioniert auch mit deklarativer Gleichzeitigkeit:
import
org.apache.commons.lang3.tuple.Pair
;
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"egg"
,
"milk"
,
"tomato"
,
"cheese"
,
"tomato"
,
"egg"
,
"egg"
)
.
groupBy
(
prod
->
prod
)
.
flatMap
(
grouped
->
grouped
.
count
()
.
map
(
quantity
->
{
String
productName
=
grouped
.
getKey
();
return
Pair
.
of
(
productName
,
quantity
);
}))
.
flatMap
(
order
->
store
.
purchase
(
order
.
getKey
(),
order
.
getValue
())
.
subscribeOn
(
schedulerA
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
Dieser Code ist ziemlich komplex, also gehen wir ihn kurz durch, bevor wir die Ausgabe zeigen. Zuerst gruppieren wir die Produkte einfach nach ihrem Namen, also der Identitätsfunktion prod -> prod
. Im Gegenzug erhalten wir eine umständliche Observable<GroupedObservable<String, String>>
. Daran gibt es nichts auszusetzen. Als Nächstes erhält flatMap()
alle GroupedObservable<String, String>
, die alle Produkte mit demselben Namen repräsentieren. So gibt es z.B. ein ["egg", "egg", "egg"]
Observable
mit einem Schlüssel "egg"
. Wenn groupBy()
eine andere Schlüsselfunktion verwenden würde, wie prod.length()
, hätte dieselbe Sequenz einen Schlüssel 3
.
An diesem Punkt müssen wir innerhalb von flatMap()
ein Observable
vom Typ Pair<String, Integer>
konstruieren, das jedes einzelne Produkt und seine Menge repräsentiert. Sowohl count()
als auch map()
geben ein Observable
zurück, also passt alles perfekt zusammen. Als zweites erhält flatMap()
order
vom Typ Pair<String, Integer>
und tätigt einen Kauf, diesmal kann die Menge größer sein. Die Ausgabe sieht perfekt aus. Beachte, dass größere Bestellungen etwas langsamer sind, aber es ist immer noch viel schneller als mehrere wiederholte Anfragen:
164 | Sched-A-0 | Purchasing 1 bread 165 | Sched-A-1 | Purchasing 1 butter 166 | Sched-A-2 | Purchasing 3 egg 166 | Sched-A-3 | Purchasing 1 milk 166 | Sched-A-4 | Purchasing 2 tomato 166 | Sched-A-5 | Purchasing 1 cheese 1151 | Sched-A-0 | Done 1 bread 1178 | Sched-A-1 | Done 1 butter 1180 | Sched-A-5 | Done 1 cheese 1183 | Sched-A-3 | Done 1 milk 1253 | Sched-A-4 | Done 2 tomato 1354 | Sched-A-2 | Done 3 egg
Wenn du glaubst, dass dein System auf diese Weise vom Batching profitieren kann, schau dir "Batching and Collapsing Commands" an.
Deklarative Gleichzeitigkeit mit observeOn()
Ob du es glaubst oder nicht, die Gleichzeitigkeit in RxJava kann durch zwei Operatoren beschrieben werden: die bereits erwähnten subscribeOn()
und observeOn()
. Sie sehen sehr ähnlich aus und sind für Neulinge verwirrend, aber ihre Semantik ist eigentlich ziemlich klar und sinnvoll.
subscribeOn()
ermöglicht es, zu wählen, welches Scheduler
verwendet wird, um OnSubscribe
aufzurufen (Lambda-Ausdruck innerhalb von create()
). Daher wird jeglicher Code innerhalb von create()
in einen anderen Thread verschoben - zum Beispiel, um ein Blockieren des Hauptthreads zu vermeiden. Umgekehrt steuert observeOn()
, welches Scheduler
verwendet wird, um nachgelagerte Subscriber
s aufzurufen, die nach observeOn()
auftreten. Zum Beispiel erfolgt der Aufruf von create()
im io()
Scheduler
(über subscribeOn(io())
), um ein Blockieren der Benutzeroberfläche zu vermeiden.
Die Aktualisierung der Widgets der Benutzeroberfläche muss jedoch im UI-Thread erfolgen (sowohl Swing als auch Android haben diese Einschränkung), also verwenden wir observeOn()
zum Beispiel mit AndroidSchedulers.mainThread()
vor den Operatoren oder den Abonnenten, die die Benutzeroberfläche ändern. Auf diese Weise können wir ein Scheduler
verwenden, um create()
und alle Operatoren bis zum ersten observeOn()
zu behandeln, aber andere(s), um Transformationen anzuwenden. Das lässt sich am besten anhand eines Beispiels erklären:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
x
->
log
(
"Found 1: "
+
x
))
.
observeOn
(
schedulerA
)
.
doOnNext
(
x
->
log
(
"Found 2: "
+
x
))
.
subscribe
(
x
->
log
(
"Got 1: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
observeOn()
kommt irgendwo in der Pipeline-Kette vor, und im Gegensatz zu subscribeOn()
ist diesmal die Position von observeOn()
ziemlich wichtig. Unabhängig davon, welche Scheduler
Operatoren oberhalb von observeOn()
ausgeführt haben (falls es welche gab), verwenden alle darunter liegenden Operatoren das bereitgestellte Scheduler
. In diesem Beispiel gibt es keine subscribeOn()
, also wird die Standardeinstellung verwendet (keine Gleichzeitigkeit):
23 | main | Starting 136 | main | Created 163 | main | Subscribed 163 | main | Found 1: A 163 | main | Found 1: B 163 | main | Exiting 163 | Sched-A-0 | Found 2: A 164 | Sched-A-0 | Got 1: A 164 | Sched-A-0 | Found 2: B 164 | Sched-A-0 | Got 1: B 164 | Sched-A-0 | Completed
Alle Operatoren oberhalb von observeOn
werden im Client-Thread ausgeführt, was in RxJava der Standard ist. Aber unterhalb von observeOn()
werden die Operatoren innerhalb des mitgelieferten Scheduler
ausgeführt. Dies wird noch deutlicher, wenn sowohl subscribeOn()
als auch mehrere observeOn()
innerhalb der Pipeline auftreten:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
x
->
log
(
"Found 1: "
+
x
))
.
observeOn
(
schedulerB
)
.
doOnNext
(
x
->
log
(
"Found 2: "
+
x
))
.
observeOn
(
schedulerC
)
.
doOnNext
(
x
->
log
(
"Found 3: "
+
x
))
.
subscribeOn
(
schedulerA
)
.
subscribe
(
x
->
log
(
"Got 1: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Kannst du die Ausgabe vorhersagen? Erinnere dich daran, dass alles, was unter observeOn()
steht, innerhalb des gelieferten Scheduler
ausgeführt wird, natürlich solange, bis ein anderes observeOn()
auftaucht. Außerdem kann subscribeOn()
überall zwischen Observable
und subscribe()
vorkommen, aber dieses Mal betrifft es nur die Operatoren bis zum ersten observeOn()
:
21 | main | Starting 98 | main | Created 108 | main | Exiting 129 | Sched-A-0 | Subscribed 129 | Sched-A-0 | Found 1: A 129 | Sched-A-0 | Found 1: B 130 | Sched-B-0 | Found 2: A 130 | Sched-B-0 | Found 2: B 130 | Sched-C-0 | Found 3: A 130 | Sched-C-0 | Got: A 130 | Sched-C-0 | Found 3: B 130 | Sched-C-0 | Got: B 130 | Sched-C-0 | Completed
Die Subskription erfolgt in schedulerA
, weil wir das in subscribeOn()
angegeben haben. Auch der "Found 1"
Operator wurde innerhalb des Scheduler
ausgeführt, weil er vor dem ersten observeOn()
steht. Später wird die Situation interessanter. observeOn()
schaltet den aktuellen Scheduler
auf schedulerB
um, und "Found 2"
verwendet stattdessen diesen. Der letzte observeOn(schedulerC)
wirkt sich sowohl auf den "Found 3"
Operator als auch auf Subscriber
aus. Erinnere dich daran, dass Subscriber
im Kontext des letzten Scheduler
funktioniert.
subscribeOn()
und observeOn()
funktionieren wirklich gut zusammen, wenn du Producer (Observable.create()
) und Consumer (Subscriber
) physisch entkoppeln willst. Standardmäßig gibt es keine solche Entkopplung, und RxJava verwendet einfach denselben Thread. subscribeOn()
reicht nicht aus, wir wählen einfach einen anderen Thread.observeOn()
ist besser, aber dann blockieren wir den Client-Thread im Falle von synchronen Observable
s.
Da die meisten Operatoren nicht blockierend sind und die in ihnen verwendeten Lambda-Ausdrücke in der Regel kurz und billig sind, gibt es in der Regel nur einen subscribeOn()
und observeOn()
in der Pipeline der Operatoren. subscribeOn()
kann zur besseren Lesbarkeit in der Nähe des ursprünglichen Observable
platziert werden, während observeOn()
in der Nähe von subscribe()
liegt, so dass nur Subscriber
diesen speziellen Scheduler
verwendet, während andere Operatoren auf den Scheduler
von subscribeOn()
zurückgreifen.
Hier ist ein fortgeschrittenes Programm, das die Vorteile dieser beiden Operatoren nutzt:
log
(
"Starting"
);
Observable
<
String
>
obs
=
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onNext
(
"C"
);
subscriber
.
onNext
(
"D"
);
subscriber
.
onCompleted
();
});
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
.
flatMap
(
record
->
store
(
record
).
subscribeOn
(
schedulerB
))
.
observeOn
(
schedulerC
)
.
subscribe
(
x
->
log
(
"Got: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Dabei ist store()
eine einfache verschachtelte Operation:
Observable
<
UUID
>
store
(
String
s
)
{
return
Observable
.
create
(
subscriber
->
{
log
(
"Storing "
+
s
);
//hard work
subscriber
.
onNext
(
UUID
.
randomUUID
());
subscriber
.
onCompleted
();
});
}
Die Erzeugung von Ereignissen findet in schedulerA
statt, aber jedes Ereignis wird unabhängig voneinander mit schedulerB
verarbeitet, um die Gleichzeitigkeit zu verbessern, eine Technik, die wir in "subscribeOn() Concurrency and Behavior" kennengelernt haben . Die Subskription am Ende geschieht in einem weiteren schedulerC
. Wir sind uns ziemlich sicher, dass du jetzt verstehst, welcher Scheduler
/Thread welche Aktion ausführt, aber nur für den Fall der Fälle (leere Zeilen wurden zur Klarheit hinzugefügt):
26 | main | Starting 93 | main | Created 121 | main | Exiting 122 | Sched-A-0 | Subscribed 124 | Sched-B-0 | Storing A 124 | Sched-B-1 | Storing B 124 | Sched-B-2 | Storing C 124 | Sched-B-3 | Storing D 1136 | Sched-C-1 | Got: 44b8b999-e687-485f-b17a-a11f6a4bb9ce 1136 | Sched-C-1 | Got: 532ed720-eb35-4764-844e-690327ac4fe8 1136 | Sched-C-1 | Got: 13ddf253-c720-48fa-b248-4737579a2c2a 1136 | Sched-C-1 | Got: 0eced01d-3fa7-45ec-96fb-572ff1e33587 1137 | Sched-C-1 | Completed
observeOn()
ist besonders wichtig für Anwendungen mit einer Benutzeroberfläche, bei denen wir den UI-Ereignis-Thread nicht blockieren wollen. Unter Android (siehe "Android-Entwicklung mit RxJava") oder Swing müssen einige Aktionen wie die Aktualisierung der Benutzeroberfläche in einem bestimmten Thread ausgeführt werden. Wenn du aber zu viel in diesem Thread machst, reagiert deine Benutzeroberfläche nicht mehr.
In diesen Fällen setzt du observeOn()
in die Nähe von subscribe()
, damit der Code innerhalb der Subskription im Kontext eines bestimmten Scheduler
(z. B. UI-Thread) aufgerufen wird. Andere Transformationen, auch recht billige, sollten jedoch außerhalb des UI-Threads ausgeführt werden. Auf dem Server wird observeOn()
selten verwendet, weil die wahre Quelle der Gleichzeitigkeit in den meisten Observable
s eingebaut ist. Das führt zu einer interessanten Schlussfolgerung: RxJava steuert die Gleichzeitigkeit mit nur zwei Operatoren (subscribeOn()
und observeOn()
), aber je mehr du reaktive Erweiterungen verwendest, desto seltener wirst du diese im Produktionscode sehen.
Andere Verwendungszwecke für Zeitplanungsprogramme
Es gibt zahlreiche Operatoren, die standardmäßig einen Scheduler
verwenden. Normalerweise wird Schedulers.computation()
verwendet, wenn kein anderer Operator angegeben wird - in der JavaDoc wird das immer deutlich gemacht. Der delay()
Operator nimmt zum Beispiel Upstream-Ereignisse auf und schiebt sie nach einer bestimmten Zeit Downstream. Offensichtlich kann er den ursprünglichen Thread während dieses Zeitraums nicht halten, also muss er einen anderen Scheduler
verwenden:
Observable
.
just
(
'A'
,
'B'
)
.
delay
(
1
,
SECONDS
,
schedulerA
)
.
subscribe
(
this
::
log
);
Ohne die Angabe eines eigenen schedulerA
würden alle Operatoren unterhalb von delay()
den computation()
Scheduler
verwenden. Daran ist an sich nichts auszusetzen, aber wenn dein Subscriber
bei der E/A blockiert ist, würde es ein Worker
aus dem global geteilten computation()
Zeitplannungsprogramm verbrauchen, was möglicherweise das gesamte System beeinträchtigt. Andere wichtige Operatoren, die benutzerdefinierte Scheduler
unterstützen, sind: interval()
, range()
, timer()
, repeat()
, skip()
, take()
, timeout()
und einige andere, die noch eingeführt werden müssen. Wenn du diesen Operatoren kein Zeitplannungsprogramm zur Verfügung stellst, wird computation()
Scheduler
verwendet, was in den meisten Fällen eine sichere Voreinstellung ist.
Die Beherrschung von Zeitplannungsprogrammen ist wichtig, um skalierbaren und sicheren Code mit RxJava zu schreiben. Der Unterschied zwischen subscribeOn()
und observeOn()
ist besonders wichtig bei hoher Last, wo jede Aufgabe genau dann ausgeführt werden muss, wenn wir sie erwarten. In wirklich reaktiven Anwendungen, bei denen alle langlaufenden Operationen asynchron sind, werden nur sehr wenige Threads und damit Scheduler
s benötigt. Aber es gibt immer diese eine API oder Abhängigkeit, die blockierenden Code erfordert.
Nicht zuletzt müssen wir sicher sein, dass Scheduler
s, die nachgelagert eingesetzt werden, mit der Last mithalten können, die von Scheduler
s vorgelagert erzeugt wird. Aber diese Gefahr wird in Kapitel 6 ausführlich erklärt.
Zusammenfassung
In diesem Kapitel wurden verschiedene Muster in traditionellen Anwendungen beschrieben, die durch RxJava ersetzt werden können. Ich hoffe, du hast inzwischen verstanden, dass der Hochfrequenzhandel oder das Streaming von Posts aus sozialen Medien nicht die einzigen Anwendungsfälle für RxJava sind. Tatsächlich kann fast jede API nahtlos durch Observable
ersetzt werden. Selbst wenn du die Leistungsfähigkeit reaktiver Erweiterungen im Moment nicht brauchst oder willst, kannst du die Implementierung weiterentwickeln, ohne rückwärtskompatible Änderungen einzuführen. Außerdem ist es der Client, der letztendlich alle Möglichkeiten nutzt, die RxJava bietet, wie Faulheit, deklarative Gleichzeitigkeit oder asynchrone Verkettung. Noch besser: Durch die nahtlose Konvertierung vonObservable
zu BlockingObservable
können herkömmliche Clients deine API so nutzen, wie sie wollen, und du kannst jederzeit eine einfache Brückenschicht bereitstellen.
Du solltest ziemlich sicher mit RxJava umgehen können und die Vorteile der Anwendung auch in Altsystemen verstehen. Zweifellos ist die Arbeit mit reaktivenObservable
s anspruchsvoller und hat eine etwas steile Lernkurve. Aber die Vorteile und Wachstumsmöglichkeiten sind einfach nicht zu überschätzen. Stell dir vor, wir könnten ganze Anwendungen mit reaktiven Erweiterungen schreiben, und zwar von vorne bis hinten. Wie ein Projekt auf der grünen Wiese, bei dem wir die Kontrolle über jede API, jede Schnittstelle und jedes externe System haben. In Kapitel 5 wird erläutert, wie du eine solche Anwendung schreiben kannst und welche Auswirkungen das hat.
1 Tatsächlich versucht RxJava, über die Thread-Affinität im Ereignisschleifenmodell auf demselben Thread zu bleiben, um auch dies auszunutzen.
2 Siehe auch "Schottenmuster und Fail-Fast".
3 Vergleiche es mit der faulen Auswertung von Ausdrücken in Haskell.
4 Für ein echtes Projekt wirst du natürlich ein produktionsgerechtes Logging-System wie Logback oder Log4J 2 verwenden.
Get Reaktive Programmierung mit RxJava now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.