
流控制和回压
|
183
乍看上去,它是没有问题的。最初的源在应用
debounce() 操作符之后有一个超时操作符。
出现超时的时候,样例会发布遇到的第一个条目,然后继续使用相同的源,这也是通过
debounce() 操作符实现的。但是,如果出现第一次超时,我们切换到了备用 Observable
上,而这个 Observable 是没有使用 timeout() 操作符的。有一种快速、混乱且短视的修正
方案,如下所示。
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream
.take(1)
.concatWith(
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream)))
以上代码再次忘记了在内层 timeout() 操作符中添加备用 Observable。足够了,你应该
已经注意到了递归模式。与其重复使用相同形式的 upstream
→
debounce
→ timeout()
→
upstream
→…,其实可以使用递归。
import static rx.Observable.defer;
Observable<Long> timedDebounce(Observable<Long> upstream) {
Observable<Long> onTimeout = upstream
.take(1)
.concatWith(defer(() ...