October 2018
Intermediate to advanced
556 pages
15h 18m
English
By using TemperatureSensor, which exposes a stream using temperature values, we may subscribe each new SseEmitter to the Observable stream and send the received onNext signals to SSE clients. To handle errors and the closing of a proper HTTP connection, let's write the following SseEmitter extension:
class RxSeeEmitter extends SseEmitter { static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L; private final Subscriber<Temperature> subscriber; // (1) RxSeeEmitter() { super(SSE_SESSION_TIMEOUT); // (2) this.subscriber = new Subscriber<Temperature>() { // (3) @Override public void onNext(Temperature temperature) { try { RxSeeEmitter.this.send(temperature); // (4) } catch (IOException e) { unsubscribe(); // (5) } } @Override ...