Fix zip observable to upstream (v2)#2040
Conversation
…ions in order to avoid reading stale shared state in multithreaded environment
There was a problem hiding this comment.
Pull request overview
Aligns Zip{2..6}Observable synchronization behavior with upstream by removing internal lock.synchronized from signalOnComplete / signalOnError and requiring synchronization at call-sites, avoiding re-entrant locking in completion paths.
Changes:
- Wraps
signalOnComplete(false)withlock.synchronizedin theack.onCompletecallback to avoid re-entrant synchronization insidesignalOnComplete. - Refactors
signalOnErrorandsignalOnCompleteto requirelocksynchronization at the call-site (call-sites updated accordingly). - Adds/updates inline documentation clarifying the required locking contract (in most ZipN variants).
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip2Observable.scala | Moves completion/error signaling to require external synchronization; updates async-ack completion path accordingly. |
| monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip3Observable.scala | Shifts signalOnError/signalOnComplete synchronization responsibility to call-sites and updates subscribers. |
| monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip4Observable.scala | Same synchronization-contract refactor and subscriber call-site updates. |
| monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip5Observable.scala | Same synchronization-contract refactor and subscriber call-site updates. |
| monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip6Observable.scala | Same synchronization-contract refactor and subscriber call-site updates. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val ack = out.onNext(c) | ||
| if (completeWithNext) { | ||
| ack.onComplete(_ => signalOnComplete(false)) | ||
| ack.onComplete(_ => lock.synchronized(signalOnComplete(false))) | ||
| } |
There was a problem hiding this comment.
This change adjusts the completion path to avoid re-entrant synchronization and relies on a specific ordering between downstream ack completion and the completeWithNext/signalOnComplete logic. There are Zip operator suites in monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/Zip{2..6}Suite.scala, but none appear to exercise the completeWithNext/async-ack completion scenario; adding a regression test covering “one source completes after emitting while downstream onNext ack is async” would help prevent reintroducing the non-terminating behavior this PR is addressing.
…uilders/Zip5Observable.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Continuation of this PR: #2029
Original: https://github.com/AVSystem/monix/pull/1]
Changes:
onCompleteby avoidinglock.synchronizeinsignalOnCompletesignalOnErrorto requirelock.synchronizeat the call-site, matchingsignalOnCompleteandsignalOnNext(all require synchronization at the call-site now) … this makes the need for synchronization clearer