Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/ParkedSignal.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020-2025 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe

private sealed abstract class ParkedSignal extends Product with Serializable

private object ParkedSignal {
case object Unparked extends ParkedSignal

case object ParkedPolling extends ParkedSignal
case object ParkedSimple extends ParkedSignal

case object Interrupting extends ParkedSignal
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.{
AtomicReference,
AtomicReferenceArray
}
import java.util.concurrent.locks.LockSupport

import WorkStealingThreadPool._

Expand Down Expand Up @@ -93,7 +94,8 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
new AtomicReferenceArray(threadCount)
private[unsafe] val localQueues: Array[LocalQueue] = new Array(threadCount)
private[unsafe] val sleepers: Array[TimerHeap] = new Array(threadCount)
private[unsafe] val parkedSignals: Array[AtomicBoolean] = new Array(threadCount)
private[unsafe] val parkedSignals: Array[AtomicReference[ParkedSignal]] = new Array(
threadCount)
private[unsafe] val fiberBags: Array[WeakBag[Runnable]] = new Array(threadCount)
private[unsafe] val pollers: Array[P] =
new Array[AnyRef](threadCount).asInstanceOf[Array[P]]
Expand Down Expand Up @@ -158,7 +160,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
localQueues(i) = queue
val sleepersHeap = new TimerHeap()
sleepers(i) = sleepersHeap
val parkedSignal = new AtomicBoolean(false)
val parkedSignal = new AtomicReference[ParkedSignal](ParkedSignal.Unparked)
parkedSignals(i) = parkedSignal
val index = i
val fiberBag = new WeakBag[Runnable]()
Expand Down Expand Up @@ -248,7 +250,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](

if (isStackTracing) {
destWorker.active = fiber
parkedSignals(dest).lazySet(false)
parkedSignals(dest).lazySet(ParkedSignal.Unparked)
}

fiber
Expand Down Expand Up @@ -313,14 +315,14 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
val index = (from + i) % threadCount

val signal = parkedSignals(index)
if (signal.getAndSet(false)) {
// Update the state so that a thread can be unparked.
// Here we are updating the 16 most significant bits, which hold the
// number of active threads, as well as incrementing the number of
// searching worker threads (unparked worker threads are implicitly
// allowed to search for work in the local queues of other worker
// threads).
state.getAndAdd(DeltaSearching)
val st = signal.get()

val polling = st eq ParkedSignal.ParkedPolling
val simple = st eq ParkedSignal.ParkedSimple

if ((polling || simple) && signal.compareAndSet(st, ParkedSignal.Interrupting)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic here is different from the old implementation: getAndSet involves a loop, but there is no longer a loop here.

In the (admittedly unlikely) scenario that a worker thread transitions from ParkedPolling to ParkedSimple or vice versa while evaluating this condition, we would prematurely give up on attempting to wake up this thread. We might consider looping if the compareAndSet fails.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might not be worth it. So going from ParkedPolling to ParkedSimple would require that between the get() above and the CAS, the other thread would need to wake itself up (or be awakened by another notify), run through its whole state machine, pick up a fiber, execute that fiber through the whole machinery, run out of work, go through the whole "check all the things for work" (which, includes doing the thing that we're trying to notify it to do here!) and then finally go back and park again, all in the space of just these few lines.

Aside from the fact that this race condition is almost inconceivable, it effectively just results in a loss of performance since we give up on the thread and go wake another one. The work item we're trying to notify on would actually get picked up (before the target thread resuspends), and we'll ultimately wake the same number of threads.

This compared to having an actual CAS loop which would require more complexity here and I think an extra conditional jump, despite the fact that it's effectively never going to be hit and wouldn't matter even if it did. So I think I'd rather just eat the CAS failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from the fact that this race condition is almost inconceivable, it effectively just results in a loss of performance since we give up on the thread and go wake another one.

Sure ... I am just worried about relying on the existence of another thread. For example, if there is exactly once worker thread in the runtime, could we end up in a deadlock?

I have thought through it a few times and I have not come up with a scenario yet that could deadlock, but I don't feel confident.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's only one worker then that worker is either awake already (in which case we can't have entered this conditional) or it's parked and we're notifying it (in which case all the normal rules apply).

Copy link
Member

@armanbilge armanbilge Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or it's parked and we're notifying it (in which case all the normal rules apply).

Right, so if all the normal rules are applying ...

Aside from the fact that this race condition is almost inconceivable, it effectively just results in a loss of performance since we give up on the thread and go wake another one.

What happens if we fail the CAS due to the race condition, we don't loop, and then we don't go to the next thread because there is no next thread? We've given up on our one-and-only thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You words precisely capture my feelings about this:

Now, I think you're probably right that nothing is lost here since the thread will both check the external queue and attempt to steal before going back to sleep, but it's hard for me to 100% convince myself that this is safe in all circumstances. This is very similar to the multi-consumer queue cases, which are genuinely insanely subtle and we've been bitten quite a bit with lost notifications. Speaking more generally, we tend to always bias notifications in favor of over-notifying rather than under-notifying, since the worst case in the former is a bit less performance, while the worst case in the latter is a deadlock.

doneSleeping()

// Fetch the latest references to the worker threads before doing the
// actual unparking. There is no danger of a race condition where the
// parked signal has been successfully marked as unparked but the
Expand All @@ -329,7 +331,14 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
// point it is already unparked and entering this code region is thus
// impossible.
val worker = workerThreads.get(index)
system.interrupt(worker, pollers(index))

if (polling) {
system.interrupt(worker, pollers(index))
} else {
LockSupport.unpark(worker)
}
signal.set(ParkedSignal.Unparked)

return true
}

Expand Down Expand Up @@ -453,6 +462,12 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
}

private[unsafe] def doneSleeping(): Unit = {
// Update the state so that a thread can be unparked.
// Here we are updating the 16 most significant bits, which hold the
// number of active threads, as well as incrementing the number of
// searching worker threads (unparked worker threads are implicitly
// allowed to search for work in the local queues of other worker
// threads).
state.getAndAdd(DeltaSearching)
()
}
Expand Down
Loading
Loading