Fix #1045: ensure early cancellation of Observable#mapParallel on error#1064
Conversation
|
Thanks @TapanVaishnav Could you add a test for it? Something that if val failedTask = Task.raiseError(ex).delayExecution(1.second)
val otherTask = Task.sleep(2.second).doOnCancel(incrementCounter)
Observable(0, 1, 2, 3, 4, 5, 6).mapParallelOrdered(n)(if (i == 0) failedTask else otherTask)and then we can do |
Got it, will add a test. Thanks |
|
@Avasil Hi, I've added test for MapParallelOrderedSuite and was planning to add one for mapParallelUnorderedSuite but because the map is unordered(in the latter case) we can't ensure the value of the counter. For example in the above case, because the map is unordered test might pass/fail, is that right? |
|
I feel like it should still work because it will preserve the order of the input (so Did you experience non-deterministic behavior here? |
Thought not sure if is this the input order or the order in which Tasks are getting completed in their respective threads. |
|
Also, I tried running the #1064 (comment) test and out of three cases, it passed in one and failed in others. So it seems like the input order isn't preserved. (which I believe, should be the case) |
|
In your example the tasks are "instant" so I can see the output happening in this way, I think it would be a little bit more consistent with a small delay AFAIK
I wonder if we have a race condition, i.e. Do you see any tasks successfully completing or just more cancels? |
I see, will try both, thanks for your input. 👍 |
|
Hi @Avasil, I changed my observable to this Am I executing my tasks in the right manner?? |
This is correct, note that the first 4 tasks are always from 0-3 range. |
I see but if that is the case then the below code should produce the same output(=3) as Also, will check for the race condition(which you mentioned earlier) and will update the comment. |
|
Ok, so the output is not the same on each run but at most only the 5th Task is getting called i.e. and will check more on the code. |
|
@Avasil I think it was as you mentioned about the race condition, to cancel the composite instead of releasing the semaphore and it's working as it should have. |
| case Failure(ex) => | ||
| lastAck = Stop | ||
| composite -= head.cancelable | ||
| composite.cancel() // cancel the whole downstream on error |
There was a problem hiding this comment.
I think it will be better to composite.cancel() in process method so as soon as the task is completed with failure
There was a problem hiding this comment.
@TapanVaishnav Sorry for nitpicking but this line should be redundant now :D
There was a problem hiding this comment.
Ahhh, because process calls sendDownstreamOrdered which then calls onNext and which then calls the process again. And if we stop the stream in the process itself then need not worry about sendDownstreamOrdered method.
There was a problem hiding this comment.
@Avasil I have removed the above line and believe that we needn't to add composite -= head.cancelable again, right?
There was a problem hiding this comment.
Yes, I think there are some redundant operators there but it's OK, I have to sit one day and refactor entire operator, it's a bit of a mess :D
|
Thank you @TapanVaishnav |
|
@Avasil Thank you for your review. I've added |
|
All the tests are passing on my local machine and I didn't even touch the tests which are failing on CI(and don't have any dependency on MapParallel(Un)orderedSuite), |
|
|
||
| case Failure(error) => | ||
| lastAck = Stop | ||
| composite.cancel() |
There was a problem hiding this comment.
the order should be different, no gain in composite -= future.cancelable after it is canceled
There was a problem hiding this comment.
the order should be different,
Do we even need this line now composite -= future.cancelable , as we can just cancel the composite using composite.cancel()?
There was a problem hiding this comment.
I'm not 100% so I'd prefer to stay with status quo
Looks like a random error, I restarted CI |
Avasil
left a comment
There was a problem hiding this comment.
Thank you for PR and sorry for nitpicking :D
|
@Avasil hey, no worries. |
…n error (monix#1064) * add early cancellation of Observable#mapParallel on error * MapParallelOrderedSuite: add test for early cancellation * Refactor: format code * MapParallelUnordered: add test for early cancellation * Refactor: format code * Feat: cancel composite as soon as error is found * Feat: create cancelComposite task * remove redundant line * fix composite cancelation order
closes: #1045