Add Observable.whileBusyAggregateEvents#1320
Conversation
|
btw, build fails because of some changes in GitHub Actions. #1321 should fix it. |
This is useful for when a downstream consumer of events is slower than the upstream consumer and events can be aggregated.
ebeb9dd to
4e05515
Compare
|
Thanks @larsrh . Just rebased |
Avasil
left a comment
There was a problem hiding this comment.
Thank you, that's a nice operator!
Very well done, scala docs are great!
I've left some general comments.
I'm also wondering if we could relax synchronization a bit but I'll have to look a bit more carefully into it.
| if (downstreamIsDone) Stop | ||
| else { | ||
| if (!pendingAck) { | ||
| val downstreamAck = downstream.onNext(seed(elem)) |
There was a problem hiding this comment.
We usually assume that user-provided methods could throw an exception and we should protect against that. There is an example in StateActionObservable
There was a problem hiding this comment.
Addressed and enabled the tests to verify this.
| } | ||
| else { | ||
| pendingAck = true | ||
| downstreamAck.map { ack => |
There was a problem hiding this comment.
I think we should use onComplete instead of a map to handle Failure case
There was a problem hiding this comment.
Switched this to use onComplete (as well as the instance in emitAggregated
| if (!downstreamIsDone) { | ||
| lastAck = downstream.onNext(agg) | ||
|
|
||
| lastAck.map { _ => |
There was a problem hiding this comment.
It seems like we ignore Stop. We could also look into using some of Ack extensions, e.g. syncOnContinue to prevent redundant async boundaries
| } | ||
|
|
||
| private def emitAggregated(): Unit = { | ||
| upstreamSubscriber.synchronized { |
There was a problem hiding this comment.
We have some redundancy in synchronized calls to this method - I think we could remove synchronized here and make sure each caller synchronizes which is already done in some places like onComplete and lastAck.map in this method
There was a problem hiding this comment.
Have removed. It was redundant
| def onComplete(): Unit = | ||
| // Can't emit the aggregated element immediately as the previous `onNext` may not yet have been acked. | ||
| upstreamSubscriber.synchronized { | ||
| lastAck.map { |
There was a problem hiding this comment.
We could use lastAck.syncOnContinue
There was a problem hiding this comment.
This seemed like a good suggestion, but when I tried changing to make use of syncOnContinue some of my tests started to break (the last element which should have been emitted here wasn't received).
|
@Avasil Thanks for reviewing. I've addressed the feedback, but note I didn't have success using |
This is useful for when a downstream consumer of events is slower than the upstream consumer
and events can be aggregated.
Note the implementation of the operator is largely based on https://github.com/monix/monix/blob/5c24f19ea805d5223c9e38961703aa955d691fdd/monix-reactive/shared/src/main/scala/monix/reactive/internal/operators/BufferWithSelectorObservable.scala
Related issue: #1319
I'd appreciate any feedback!