Skip to content

Add intersperse operator#368

Merged
alexandru merged 4 commits into
monix:masterfrom
omainegra:master
Jun 2, 2017
Merged

Add intersperse operator#368
alexandru merged 4 commits into
monix:masterfrom
omainegra:master

Conversation

@omainegra
Copy link
Copy Markdown
Contributor

@omainegra omainegra commented Jun 1, 2017

Proposal for the operator Observable.intersperse. It also includes unit tests for it. This PR resolves #367

@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 1, 2017

Codecov Report

Merging #368 into master will decrease coverage by 0.1%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #368      +/-   ##
==========================================
- Coverage   86.47%   86.37%   -0.11%     
==========================================
  Files         313      314       +1     
  Lines        8406     8417      +11     
  Branches     1683     1681       -2     
==========================================
+ Hits         7269     7270       +1     
- Misses       1137     1147      +10

Copy link
Copy Markdown
Member

@alexandru alexandru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @omainegra, looks pretty good, but needs a couple of small changes - don't get discouraged by the verbose comments, this is for the purpose of passing on knowledge.

import scala.concurrent.Future

/**
* Created by omainegra on 5/29/17.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general rule, I abstained from adding names in source files - you can add yourself to the AUTHORS file.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about this, just IntelliJ template, fixing it

* Created by omainegra on 5/29/17.
*/
private[reactive] final class IntersperseObservable[+A](source: Observable[A],
separator: A) extends Observable[A]{ self =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm obsessed with indentation and I'd prefer for this to be consistent with the other files. Personally I don't like things dangling so far on the right, although I can understand the preference.

out.onNext(elem)
}
else {
out.onNext(separator).flatMap {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing a flatMap on a Future can be pretty inefficient since it forces an asynchronous boundary.

This is why Monix introduces a couple of special Future extension methods for avoiding async boundaries if the Future returned by onNext is already complete. Monix is efficient because it tries to collapse these async boundaries if the downstream subscriber returns a plain Continue or Stop that's immediately available.

Don't get me wrong, this code is correct, but it can be more efficient if you use syncFlatMap instead:

out.onNext(separator).syncFlatMap {
  ???
}

As a note, for user code this is actually dangerous because it is stack-unsafe (in loops), but we know what we are doing here and it's fine.

}

def onError(ex: Throwable) = {
if (!isDone.getAndSet(true)){
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as with firstTime, you're only modifying isDone in onComplete and onError. The calls to onNext, onComplete and onError are ordered (synchronized) already so this can be a private[this] var.

The only concurrency problem you can have is that the producer doesn't need to back-pressure for that final Future[Ack] returned by the last onNext, so depending what you do in those flatMaps, you can end up with a concurrency problem, but you don't need anything special here.

}
}
def onComplete() = {
downstreamAck.syncOnContinue {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case you don't need to back-pressure on downstreamAck, because this is legal (as an optimization of the protocol:

out.onNext(e)
out.onComplete()

But this is NOT legal of course:

out.onNext(e)
out.onNext(e)
out.onComplete()

There are operators that keep that last ack around and use it in onComplete, but that's only because they have to do more stuff.

I suspect that if you have to stream a final end, then you do need to have that ack to back-pressure on, but you don't need that here.

* @param separator is the separator
*/
def intersperse[B >: A](separator: B): Self[B] =
self.transform(self => new IntersperseObservable(self, separator))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we introduce this, we also need the version with the start and the end tokens and I think it can be handled by the same implementation - in that Observable source you can make them optional or something.


import scala.concurrent.duration._

object IntersperseSuite extends BaseOperatorSuite {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 it's good that you introduced the test as well

separator: A) extends Observable[A]{ self =>

override def unsafeSubscribeFn(out: Subscriber[A]): Cancelable = {
val firstTime = AtomicBoolean(true)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are modifying firstTime in onNext only, so you don't need to synchronize it, because the onNext calls are guaranteed to be ordered (the same guarantee you get with actors processing messages).

Atomics come with overhead and this can be a simple private[this] var in that Subscriber class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about this, good to know

Copy link
Copy Markdown
Member

@alexandru alexandru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, just one final touch before the merge :-)

downstreamAck.syncFlatMap {
case Continue if end.isDefined => out.onNext(end.get)
case ack => ack
}.syncOnContinue {
Copy link
Copy Markdown
Member

@alexandru alexandru Jun 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good dude. One final touch ... there's no need for back-pressure with this syncOnContinue here, because you can do ...

out.onNext(end.get)
out.onComplete()

This optimisation (not waiting on the final onNext for sending onComplete) is good from a networking POV, but it's also why Observable.tailRecM can work well.

I would also prefer if you wouldn't wait on downstreamAck in case end.isDefined == false. It's totally fine if we have multiple if branches or case statements.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok then :-). I'm still learning about all these things. You have been a great help

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's totally fine @omainegra, no rush, I breathe this protocol and know the optimisation opportunities because I came up with it.

And your first try was correct and pretty good.

Copy link
Copy Markdown
Contributor Author

@omainegra omainegra Jun 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need assistance here please. If I don't do back-pressure before onComplete, one unit test fails.

[info] monix.reactive.internal.operators.IntersperseSuite
[info] - should emit exactly the requested elements
[info] - should work for synchronous observers
[info] - should work for asynchronous observers *** FAILED ***
[info]   received 24310 != expected 24530 (BaseOperatorSuite.scala:157)
[info]     minitest.api.Asserts$class.assertEquals(Asserts.scala:67)
[info]     monix.reactive.internal.operators.BaseOperatorSuite.assertEquals(BaseOperatorSuite.scala:30)
[info]     monix.reactive.internal.operators.BaseOperatorSuite$$anonfun$3.apply(BaseOperatorSuite.scala:157)
[info]     monix.reactive.internal.operators.BaseOperatorSuite$$anonfun$3.apply(BaseOperatorSuite.scala:125)
[info]     minitest.api.TestSpec$$anonfun$from$1.apply(TestSpec.scala:54)
[info]     minitest.api.TestSpec$$anonfun$from$1.apply(TestSpec.scala:52)
[info]     minitest.api.TestSpec.apply(TestSpec.scala:28)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but please copy/paste your newest onComplete

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does look like a contract problem, I don't dismiss the possibility of me being wrong :-)

Copy link
Copy Markdown
Contributor Author

@omainegra omainegra Jun 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new onComplete. In the else branch is commented the code with back-pressure. As it is, it's not working.

def onComplete() = {
  if (end.isDefined){
    downstreamAck.syncOnComplete {
      case Success(_) =>
        out.onNext(end.get)
        signalOnComplete()
      case _ =>
    }
  } else {
    signalOnComplete()
//          downstreamAck.syncOnComplete {
//            case Success(_) => signalOnComplete()
//            case _ =>
//          }
  }
}

private[this] def signalOnComplete(): Unit = {
  if (!isDone) {
    isDone = true
    out.onComplete()
  }
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think I know what's going on. In onNext you can end up pushing 2 events instead of one, so if the subscriber is delaying on sep, you still have the elem queued up for sending it, so you can end up triggering onNext(end) and onComplete before that happens. See, I was wrong :-)

Btw, you need to check that you receive a Continue and don't need to do anything in case of Stop or Failure(ex), so doing just one syncOnContinue there is totally fine.

ack.syncOnContinue {
  if (end.nonEmpty) out.onNext(end.get)
  out.onComplete
}

Or smth.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It's as you say. One last question. Is it required the isDone logic`?, because I just removed it and all test passed OK

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you probably don't need it - isDone in onComplete is usually useful when you're doing stuff in onNext that triggers a completion and then you want to ensure that you're not going to be completing that stream a second time.

It might be OK to remove it.

start.map(out.onNext).getOrElse(Continue).syncFlatMap {
case Continue => out.onNext(elem)
case ack => ack
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 looking good

* @param separator is the separator
* @param end the last element emitted
*/
def intersperse[B >: A](start: B, separator: B, end: B): Self[B] =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alexandru alexandru merged commit 94a556b into monix:master Jun 2, 2017
@alexandru alexandru added this to the 3.0.0 milestone Jan 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

New operator Observable.intersperse

2 participants