Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package monix.reactive.internal.operators

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.locks.ReentrantLock

import monix.eval.Task
import monix.execution.Ack.{Continue, Stop}
Expand Down Expand Up @@ -81,16 +80,17 @@ private[reactive] final class MapParallelOrderedObservable[A, B](
// elements at the same time.
private[this] val queue = new ConcurrentLinkedQueue[CancelableFuture[B]]
// This lock makes sure that only one thread at the time sends processed elements downstream
// Note that we only use `tryLock()` - we never wait for acquiring lock
private[this] val sendDownstreamLock = new ReentrantLock()
private[this] val sendDownstreamSemaphore = AsyncSemaphore(1)

private def shouldStop: Boolean = isDone || lastAck == Stop

private def sendDownstreamOrdered(): Unit = {
// Only one thread should poll queue for completed tasks
// We can ignore it if there is one doing the work
if (sendDownstreamLock.tryLock()) {
val permit = sendDownstreamSemaphore.acquire()
composite += permit
def doSend(): Unit =
try {
composite -= permit
// Keep checking the head of a queue since we have to signal elements in order
while (!shouldStop && !queue.isEmpty && queue.peek().isCompleted) {
val head = queue.poll()
Expand Down Expand Up @@ -118,8 +118,24 @@ private[reactive] final class MapParallelOrderedObservable[A, B](
}
}
} finally {
sendDownstreamLock.unlock()
sendDownstreamSemaphore.release()
}
permit.value match {
case Some(Success(_)) =>
doSend()
case Some(Failure(error)) =>
lastAck = Stop
composite -= permit
self.onError(error)
case None =>
permit.onComplete {
case Success(_) =>
doSend()
case Failure(error) =>
lastAck = Stop
composite -= permit
self.onError(error)
}
}
}

Expand Down