-
Notifications
You must be signed in to change notification settings - Fork 42
Description
Copied from gitter... Thanks @alexandru
@theamytran for repeated HTTP requests, it’s a very simple job, you can simply do …
// Continuous
Observable.repeat(()).mapFuture(_ => createFuture())
// Request every 1 second
Observable.interval(1.second).mapFuture(_ => createFuture())
Observable.repeat(()).mapTask(_ => createTask())
// Request every 1 second
Observable.interval(1.second).mapTask(_ => createTask())
// There’s also good-old flatMap / concatMap
Observable.interval(1.second).concatMap(_ => Observable.fromTask(myTask))
Observable.interval(1.second).concatMap(_ => Observable.fromFuture(myFuture()))
Note there's intervalWithFixedDelay and intervalAtFixedRate. And these operators, along with these mapFuture, mapTask, concatMap, mapAsync are applying back-pressure, so you won’t have multiple requests running in parallel, unless you want it (see mapAsync(paralellism=4))
@theamytran @a-reisberg when publishing into a PublishSubject, note that their back-pressure contract needs to be respected and that back-pressure contract also implies doing synchronization, in case you want to call onNext from multiple sources / threads. If you don’t want to deal with back-pressure or with concurency, one needs to use a ConcurrentSubject.publish instead, which place a concurrent buffer in front.
@theamytran if you don’t have a buffer (e.g. ConcurrentSubject, Pipe.multicast or Observable.create) and want to push straight into a simple Observer / Subscriber / Subject, then the back-pressure must be upholded. See the docs on that: https://monix.io/docs/2x/reactive/observers.html#feeding-an-observer