Add intersperse operator#368
Conversation
Codecov Report
@@ Coverage Diff @@
## master #368 +/- ##
==========================================
- Coverage 86.47% 86.37% -0.11%
==========================================
Files 313 314 +1
Lines 8406 8417 +11
Branches 1683 1681 -2
==========================================
+ Hits 7269 7270 +1
- Misses 1137 1147 +10 |
alexandru
left a comment
There was a problem hiding this comment.
Thanks @omainegra, looks pretty good, but needs a couple of small changes - don't get discouraged by the verbose comments, this is for the purpose of passing on knowledge.
| import scala.concurrent.Future | ||
|
|
||
| /** | ||
| * Created by omainegra on 5/29/17. |
There was a problem hiding this comment.
As a general rule, I abstained from adding names in source files - you can add yourself to the AUTHORS file.
There was a problem hiding this comment.
Sorry about this, just IntelliJ template, fixing it
| * Created by omainegra on 5/29/17. | ||
| */ | ||
| private[reactive] final class IntersperseObservable[+A](source: Observable[A], | ||
| separator: A) extends Observable[A]{ self => |
There was a problem hiding this comment.
I'm obsessed with indentation and I'd prefer for this to be consistent with the other files. Personally I don't like things dangling so far on the right, although I can understand the preference.
| out.onNext(elem) | ||
| } | ||
| else { | ||
| out.onNext(separator).flatMap { |
There was a problem hiding this comment.
Doing a flatMap on a Future can be pretty inefficient since it forces an asynchronous boundary.
This is why Monix introduces a couple of special Future extension methods for avoiding async boundaries if the Future returned by onNext is already complete. Monix is efficient because it tries to collapse these async boundaries if the downstream subscriber returns a plain Continue or Stop that's immediately available.
Don't get me wrong, this code is correct, but it can be more efficient if you use syncFlatMap instead:
out.onNext(separator).syncFlatMap {
???
}As a note, for user code this is actually dangerous because it is stack-unsafe (in loops), but we know what we are doing here and it's fine.
| } | ||
|
|
||
| def onError(ex: Throwable) = { | ||
| if (!isDone.getAndSet(true)){ |
There was a problem hiding this comment.
Just as with firstTime, you're only modifying isDone in onComplete and onError. The calls to onNext, onComplete and onError are ordered (synchronized) already so this can be a private[this] var.
The only concurrency problem you can have is that the producer doesn't need to back-pressure for that final Future[Ack] returned by the last onNext, so depending what you do in those flatMaps, you can end up with a concurrency problem, but you don't need anything special here.
| } | ||
| } | ||
| def onComplete() = { | ||
| downstreamAck.syncOnContinue { |
There was a problem hiding this comment.
In this case you don't need to back-pressure on downstreamAck, because this is legal (as an optimization of the protocol:
out.onNext(e)
out.onComplete()But this is NOT legal of course:
out.onNext(e)
out.onNext(e)
out.onComplete()There are operators that keep that last ack around and use it in onComplete, but that's only because they have to do more stuff.
I suspect that if you have to stream a final end, then you do need to have that ack to back-pressure on, but you don't need that here.
| * @param separator is the separator | ||
| */ | ||
| def intersperse[B >: A](separator: B): Self[B] = | ||
| self.transform(self => new IntersperseObservable(self, separator)) |
There was a problem hiding this comment.
If we introduce this, we also need the version with the start and the end tokens and I think it can be handled by the same implementation - in that Observable source you can make them optional or something.
|
|
||
| import scala.concurrent.duration._ | ||
|
|
||
| object IntersperseSuite extends BaseOperatorSuite { |
There was a problem hiding this comment.
👍 it's good that you introduced the test as well
| separator: A) extends Observable[A]{ self => | ||
|
|
||
| override def unsafeSubscribeFn(out: Subscriber[A]): Cancelable = { | ||
| val firstTime = AtomicBoolean(true) |
There was a problem hiding this comment.
You are modifying firstTime in onNext only, so you don't need to synchronize it, because the onNext calls are guaranteed to be ordered (the same guarantee you get with actors processing messages).
Atomics come with overhead and this can be a simple private[this] var in that Subscriber class.
There was a problem hiding this comment.
I didn't know about this, good to know
…icense in source files.
alexandru
left a comment
There was a problem hiding this comment.
Looking good, just one final touch before the merge :-)
| downstreamAck.syncFlatMap { | ||
| case Continue if end.isDefined => out.onNext(end.get) | ||
| case ack => ack | ||
| }.syncOnContinue { |
There was a problem hiding this comment.
Looking good dude. One final touch ... there's no need for back-pressure with this syncOnContinue here, because you can do ...
out.onNext(end.get)
out.onComplete()This optimisation (not waiting on the final onNext for sending onComplete) is good from a networking POV, but it's also why Observable.tailRecM can work well.
I would also prefer if you wouldn't wait on downstreamAck in case end.isDefined == false. It's totally fine if we have multiple if branches or case statements.
There was a problem hiding this comment.
Ok then :-). I'm still learning about all these things. You have been a great help
There was a problem hiding this comment.
It's totally fine @omainegra, no rush, I breathe this protocol and know the optimisation opportunities because I came up with it.
And your first try was correct and pretty good.
There was a problem hiding this comment.
I need assistance here please. If I don't do back-pressure before onComplete, one unit test fails.
[info] monix.reactive.internal.operators.IntersperseSuite
[info] - should emit exactly the requested elements
[info] - should work for synchronous observers
[info] - should work for asynchronous observers *** FAILED ***
[info] received 24310 != expected 24530 (BaseOperatorSuite.scala:157)
[info] minitest.api.Asserts$class.assertEquals(Asserts.scala:67)
[info] monix.reactive.internal.operators.BaseOperatorSuite.assertEquals(BaseOperatorSuite.scala:30)
[info] monix.reactive.internal.operators.BaseOperatorSuite$$anonfun$3.apply(BaseOperatorSuite.scala:157)
[info] monix.reactive.internal.operators.BaseOperatorSuite$$anonfun$3.apply(BaseOperatorSuite.scala:125)
[info] minitest.api.TestSpec$$anonfun$from$1.apply(TestSpec.scala:54)
[info] minitest.api.TestSpec$$anonfun$from$1.apply(TestSpec.scala:52)
[info] minitest.api.TestSpec.apply(TestSpec.scala:28)
There was a problem hiding this comment.
Sure, but please copy/paste your newest onComplete
There was a problem hiding this comment.
It does look like a contract problem, I don't dismiss the possibility of me being wrong :-)
There was a problem hiding this comment.
This is the new onComplete. In the else branch is commented the code with back-pressure. As it is, it's not working.
def onComplete() = {
if (end.isDefined){
downstreamAck.syncOnComplete {
case Success(_) =>
out.onNext(end.get)
signalOnComplete()
case _ =>
}
} else {
signalOnComplete()
// downstreamAck.syncOnComplete {
// case Success(_) => signalOnComplete()
// case _ =>
// }
}
}
private[this] def signalOnComplete(): Unit = {
if (!isDone) {
isDone = true
out.onComplete()
}
}There was a problem hiding this comment.
OK, I think I know what's going on. In onNext you can end up pushing 2 events instead of one, so if the subscriber is delaying on sep, you still have the elem queued up for sending it, so you can end up triggering onNext(end) and onComplete before that happens. See, I was wrong :-)
Btw, you need to check that you receive a Continue and don't need to do anything in case of Stop or Failure(ex), so doing just one syncOnContinue there is totally fine.
ack.syncOnContinue {
if (end.nonEmpty) out.onNext(end.get)
out.onComplete
}Or smth.
There was a problem hiding this comment.
Yes, It's as you say. One last question. Is it required the isDone logic`?, because I just removed it and all test passed OK
There was a problem hiding this comment.
Yeah, you probably don't need it - isDone in onComplete is usually useful when you're doing stuff in onNext that triggers a completion and then you want to ensure that you're not going to be completing that stream a second time.
It might be OK to remove it.
| start.map(out.onNext).getOrElse(Continue).syncFlatMap { | ||
| case Continue => out.onNext(elem) | ||
| case ack => ack | ||
| } |
| * @param separator is the separator | ||
| * @param end the last element emitted | ||
| */ | ||
| def intersperse[B >: A](start: B, separator: B, end: B): Self[B] = |
Proposal for the operator
Observable.intersperse. It also includes unit tests for it. This PR resolves #367