October 2018
Intermediate to advanced
556 pages
15h 18m
English
If default subscribe(...) methods do not provide the required versatility, we may implement our own Subscriber. We can always directly implement the Subscriber interface from the Reactive Streams specification and subscribe it to the stream, as follows:
Subscriber<String> subscriber = new Subscriber<String>() { volatile Subscription subscription; // (1) public void onSubscribe(Subscription s) { // (2) subscription = s; // (2.1) log.info("initial request for 1 element"); // subscription.request(1); // (2.2) } public void onNext(String s) { // (3) log.info("onNext: {}", s); // log.info("requesting 1 more element"); // subscription.request(1); // (3.1) } public void onComplete() { log.info("onComplete"); }