Skip to content

feat(streams,ringbuffer): concurrent stream operators (mapPar / mergeAll / flatMapPar) + primitive SPSC ring buffers#1451

Merged
jdegoes merged 1 commit into
mainfrom
streams-concurrency
Jun 4, 2026
Merged

feat(streams,ringbuffer): concurrent stream operators (mapPar / mergeAll / flatMapPar) + primitive SPSC ring buffers#1451
jdegoes merged 1 commit into
mainfrom
streams-concurrency

Conversation

@jdegoes

@jdegoes jdegoes commented Jun 4, 2026

Copy link
Copy Markdown
Member

Motivation

ZIO Blocks streams previously had no concurrent operators. Long-running streams that wanted to fan out per-element work (mapPar), drain multiple sub-streams concurrently (mergeAll, flatMapPar), or hand off batches between producer and consumer threads, had to fall back to either external concurrency frameworks or homegrown thread pools — losing the typed-error, synchronous, pull-based Reader[A] programming model that the rest of the library is built around.

This PR adds three concurrent stream operators, the ring-buffer machinery that backs them, and keeps the existing programming model: the consumer thread still receives Either[E, Z] — no effect system required, no IO/Task boilerplate.

What changed

New Stream operators (JVM-only; degrade to sequential on Scala.js)

Operator Semantics
Stream#mapPar(n)(f) Apply f to each element on up to n worker threads. Output is unordered (arrival order).
Stream.mergeAll(n)(streams) Drain up to n inner streams concurrently; interleave their elements as they arrive.
Stream#flatMapPar(n)(f) Per element, produce a sub-stream via f; drain up to n sub-streams concurrently.

All three use virtual threads on JDK 21+ (reflective probe at class init; falls back to plain Thread on JDK <21). Errors from any worker terminate all concurrent work and surface as Left(e) from the terminal operation. Resource safety is deterministic — closing the consumer reader, an error, or a scope finalizer shuts down workers and drains queues.

Buffer sizing is exposed via Stream.bufferSize(n) { ... } (powers of two) and Pipeline.buffer(n).

Primitive specialization

Per-JvmType specialized readers are picked in PlatformSpecific.createMapParReader / createMergeReader:

  • Int/Long/Float/Double flows go through primitive-typed readers that use the new primitive SPSC ring buffers for the cross-thread handoff (zero boxing on the producer side).
  • Generic AnyRef flows fall back to ConcurrentMapParReader[A, B] / ConcurrentMergeReader[A].

Primitive SPSC ring buffers (ringbuffer project)

Four new classes in ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/:

Type Class Slot encoding
Int IntSpscRingBuffer one Long per slot: high 32 bits hold a 0/1 occupancy tag, low 32 bits hold the Int payload. The whole 64-bit word is 0L when empty, so any tagged value is non-zero regardless of payload bits — all Int values are representable.
Long LongSpscRingBuffer one Long per slot. Long.MinValue is reserved as the empty marker; offering it throws IllegalArgumentException.
Float FloatSpscRingBuffer one Long per slot: high 32 bits tag, low 32 bits hold floatToRawIntBits(value). All Float values representable.
Double DoubleSpscRingBuffer one Long per slot. doubleToRawLongBits(value) is stored directly; a reserved non-canonical NaN bit pattern marks empty. Any payload NaN is canonicalized to Double.NaN on offer so it can never collide with the empty marker.

All four use the same FastFlow algorithm as the generic SpscRingBuffer (single backing Array[Long], padded indices, look-ahead cache) and expose identical operations (offer, take, peek, isEmpty, isFull, size, capacity, drain, fill).

JMH benchmarks in ringbuffer-benchmarks/PrimitiveSpscThroughputBenchmark cover SPSC throughput for all four variants.

Coordinator/worker queue primitives

For the multi-worker fan-out coordinator-to-worker channels, three blocking queue primitives in streams/jvm/src/main/scala/zio/blocks/streams/queues/:

  • BlockingSpscQueue[A] / BlockingMpscQueue[A] / BlockingMpmcQueue[A] — array-backed, fixed-capacity, parked-thread blocking variants tuned for the specific concurrency shape each reader uses.

Correctness fixes uncovered during stabilization

Two real races were found and fixed before this PR:

  1. Specialized mapPar lost-element race. The Int/Long/Float/Double mapPar readers use a separate control queue from the input queue (the generic reader uses a single combined queue). A worker could observe DoneSentinel on the control queue before all prior input writes were visible and exit early, dropping the tail of the stream. Reproduced as a flake in ConcurrentLawsSpec (mapPar(1)(f) ≡ map(f) as sets).

    Fix: a stopSeen flag in workerLoop. Once DoneSentinel is observed, the loop defers exit until inputQueues(idx) is actually drained, ensuring writes that happened-before the stop signal are picked up.

  2. isClosed reported true before consumer observed EOS. The previous predicate workersRunning.get() == 0 && allOutputQueuesEmpty() could flip to true between the last producer write and the consumer's matching read, racing the consumer's loop termination check (reproduced in IsClosedRegressionSpec mapPar: readable() is consistent with isClosed during consumption).

    Fix: track an eofReturned flag set on the read that actually returns the sentinel to the consumer; isClosed becomes eofReturned || consumerClosed. The reader only signals "closed" once EOS has actually been delivered, not when it has merely become inevitable.

Tests

Suite Scala 3 Scala 2.13.18
streamsJVM/test 1294 passed 1286 passed
streamsJS/test 1067 passed, 1 ignored 1059 passed, 1 ignored
ringbufferJVM/test 190 passed 190 passed

New JVM specs added in this PR (selected): ConcurrentMapParReaderSpec, ConcurrentMergeReaderSpec, ConcurrentStressSpec, ConcurrentLawsSpec, BufferSizeSpec, BufferStressSpec, BufferSpec, IsClosedRegressionSpec, MergeInnerErrorSpec, MapParSpec, MapParJvmTypeSpec, MergeAllSpec, FloatDoubleMapParSpec (lifts Float/Double mapPar coverage from 0% to ~87%), ReadNSpec/ReadNJvmSpec/ReadUpToNSpec, GenericFoldSpec, PlatformSpec, ReadBytesScopeAuditSpec, ReadBytesSoundnessSpec, BlockingMpmcQueueSpec, BlockingMpscQueueSpec, BlockingSpscQueueSpec, PrimitiveSpscRingBufferSpec.

Coverage (Scala 2.13.18 streamsJVM): 56.84% statement / 52.05% branch overall; specialized concurrent readers ~87–90% statement. Ringbuffer: 98.86% statement / 96.61% branch.

Docs

  • docs/reference/streams/concurrent-operators.md — new reference page covering semantics, error behaviour, buffer sizing, examples for mapPar/mergeAll/flatMapPar, and a feature comparison with fs2, Kyo, Ox, and Pekko.
  • docs/reference/ringbuffer/spsc.mdx — new "Primitive variants" section documenting the four primitive SpscRingBuffer classes, their slot encodings, the Long.MinValue / NaN-canonicalization reservation rules, and the peek + take usage pattern.

Verification

  • Scala 3.3.7 / 2.13.18: streamsJVM/test, streamsJS/test, ringbufferJVM/test all green (see table above).
  • streams-benchmark, ringbufferBenchmarks, benchmarks projects compile clean.
  • docs/mdoc compiles clean (0 errors).
  • fmtDirty applied; tree clean.

Copilot AI review requested due to automatic review settings June 4, 2026 02:44

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces concurrent stream operators to zio.blocks.streams (JVM with sequential fallbacks on Scala.js), backed by new blocking queue primitives and primitive-specialized SPSC ring buffers, plus extensive new test coverage and documentation updates.

Changes:

  • Added concurrent stream operators and buffering facilities (mapPar, mergeAll, flatMapPar, plus Stream.bufferSize, Stream.buffer, and Pipeline.buffer).
  • Implemented JVM concurrency infrastructure (virtual-thread probing/fallback, concurrent buffered reader, blocking queues, specialized concurrent readers) and Scala.js sequential degradations.
  • Added primitive-specialized SPSC ring buffers (Int/Long/Float/Double), updated docs/sidebars, and added broad regression + stress test coverage and benchmarks.

Reviewed changes

Copilot reviewed 64 out of 67 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
streams/shared/src/main/scala/zio/blocks/streams/Platform.scala New cross-platform Platform abstraction for concurrency + reader factories
streams/jvm/src/main/scala/zio/blocks/streams/PlatformSpecific.scala JVM implementation: virtual threads + concurrent reader factories
streams/js/src/main/scala/zio/blocks/streams/PlatformSpecific.scala Scala.js implementation: sequential degradations + sync buffering
streams/shared/src/main/scala/zio/blocks/streams/Pipeline.scala Added Pipeline.buffer and Pipeline.chunked helpers
streams/shared/src/main/scala/zio/blocks/streams/internal/SyncBufferedReader.scala JS-only sync-prefetch buffered reader
streams/jvm/src/main/scala/zio/blocks/streams/internal/ConcurrentBufferedReader.scala JVM concurrent buffered reader using a blocking SPSC queue
streams/jvm/src/main/scala/zio/blocks/streams/internal/ConcurrentMapParReader.scala Generic concurrent mapPar reader implementation
streams/jvm/src/main/scala/zio/blocks/streams/internal/IntConcurrentMergeReader.scala Primitive-specialized mergeAll reader (Int)
streams/jvm/src/main/scala/zio/blocks/streams/internal/LongConcurrentMergeReader.scala Primitive-specialized mergeAll reader (Long)
streams/jvm/src/main/scala/zio/blocks/streams/internal/FloatConcurrentMergeReader.scala Primitive-specialized mergeAll reader (Float)
streams/jvm/src/main/scala/zio/blocks/streams/internal/DoubleConcurrentMergeReader.scala Primitive-specialized mergeAll reader (Double)
streams/jvm/src/main/scala/zio/blocks/streams/internal/ByteBufferReader.scala Added readN/readUpToN overrides + byte-evidence readBytes signature
streams/jvm/src/main/scala/zio/blocks/streams/internal/ChannelReader.scala Added readN/readUpToN overrides + byte-evidence readBytes signature
streams/jvm/src/main/scala/zio/blocks/streams/queues/BlockingSpscQueue.scala New blocking SPSC queue primitive
streams/jvm/src/main/scala/zio/blocks/streams/queues/BlockingMpscQueue.scala New blocking MPSC queue primitive
streams/jvm/src/main/scala/zio/blocks/streams/queues/BlockingMpmcQueue.scala New blocking MPMC queue primitive
ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/SpscRingBuffer.scala Memory-ordering tweaks and producerLimit update in slow path
ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/IntSpscRingBuffer.scala New primitive SPSC ring buffer for Int
ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/LongSpscRingBuffer.scala New primitive SPSC ring buffer for Long (with sentinel reservation)
ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/FloatSpscRingBuffer.scala New primitive SPSC ring buffer for Float
ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/DoubleSpscRingBuffer.scala New primitive SPSC ring buffer for Double (NaN-bit reservation)
docs/reference/streams/concurrent-operators.md New docs page for concurrent stream operators
docs/reference/ringbuffer/spsc.mdx Documented primitive ring buffer variants
docs/sidebars.js Added new streams concurrent-operators doc page to sidebar
build.sbt Wired streams to depend on ringbuffer; adjusted streams test parallelism; bumped Kyo version in benchmarks
streams/shared/src/test/scala/zio/blocks/streams/ReadNSpec.scala New shared tests for Reader#readN
streams/shared/src/test/scala/zio/blocks/streams/ReaderFromJavaSpec.scala Updated byte semantics expectations + added readBytes edge-case tests
streams/shared/src/test/scala/zio/blocks/streams/NewCombinatorsSpec.scala Renamed grouped→chunked test coverage updates
streams/shared/src/test/scala/zio/blocks/streams/MergeAllSpec.scala New shared mergeAll/flatMapPar tests (sequential aspect)
streams/shared/src/test/scala/zio/blocks/streams/MapParSpec.scala New shared mapPar tests (sequential aspect)
streams/shared/src/test/scala/zio/blocks/streams/MapParJvmTypeSpec.scala New tests asserting JvmType propagation through mapPar
streams/shared/src/test/scala/zio/blocks/streams/GenericFoldSpec.scala New tests for generic runFold specialization dispatch
streams/shared/src/test/scala/zio/blocks/streams/IntSpecializationSpec.scala Minor type annotation tweak for byte reader
streams/shared/src/test/scala/zio/blocks/streams/ConcurrentLawsSpec.scala New property-law tests for concurrent operators
streams/shared/src/test/scala/zio/blocks/streams/BufferSpec.scala New shared buffer behavior tests
streams/shared/src/test/scala-3/zio/blocks/streams/ReadBytesSoundnessSpec.scala Scala 3 compile-time evidence tests for readBytes
streams/shared/src/test/scala-3/zio/blocks/streams/ReadBytesScopeAuditSpec.scala Scala 3 compile-time audit tests for evidence scoping decisions
streams/jvm/src/test/scala/zio/blocks/streams/ReadNJvmSpec.scala JVM-specific readN override tests (NIO readers)
streams/jvm/src/test/scala/zio/blocks/streams/PlatformSpec.scala JVM platform tests for virtual thread start + buffer behavior
streams/jvm/src/test/scala/zio/blocks/streams/BufferSizeSpec.scala JVM tests for Stream.bufferSize semantics
streams/jvm/src/test/scala/zio/blocks/streams/BufferStressSpec.scala JVM stress tests for buffer correctness + thread cleanup
streams/jvm/src/test/scala/zio/blocks/streams/ConcurrentStressSpec.scala JVM stress tests for mergeAll/mapPar correctness + leak checks
streams/jvm/src/test/scala/zio/blocks/streams/ConcurrentMapParReaderSpec.scala JVM tests for mapPar reader correctness, defects, termination, leaks
streams/jvm/src/test/scala/zio/blocks/streams/ConcurrentMergeReaderSpec.scala JVM tests for mergeAll reader correctness, errors, termination, leaks
streams/jvm/src/test/scala/zio/blocks/streams/MergeInnerErrorSpec.scala JVM regression tests for inner-error termination/hang prevention
streams/jvm/src/test/scala/zio/blocks/streams/IsClosedRegressionSpec.scala JVM regression tests around isClosed visibility during consumption
streams/jvm/src/test/scala/zio/blocks/streams/FloatDoubleMapParSpec.scala JVM coverage tests for Float/Double specialized mapPar readers
streams/jvm/src/test/scala/zio/blocks/streams/queues/BlockingSpscQueueSpec.scala Tests for blocking SPSC queue behavior + concurrency/stress
streams/jvm/src/test/scala/zio/blocks/streams/queues/BlockingMpscQueueSpec.scala Tests for blocking MPSC queue behavior + concurrency/stress
streams/jvm/src/test/scala/zio/blocks/streams/queues/BlockingMpmcQueueSpec.scala Tests for blocking MPMC queue behavior + concurrency/stress
streams/js/src/test/scala/zio/blocks/streams/.gitkeep Keeps JS test dir in VCS
streams/js/src/main/scala/zio/blocks/streams/.gitkeep Keeps JS main dir in VCS
streams-benchmark/src/main/scala/zio/blocks/streams/bench/StreamSetupBench.scala Updated Kyo imports to avoid Scope conflicts
streams-benchmark/src/main/scala/zio/blocks/streams/bench/StreamPipelineBench.scala Updated Kyo imports to avoid Scope conflicts
streams-benchmark/src/main/scala/zio/blocks/streams/bench/StreamEvalBench.scala Updated Kyo imports to avoid Scope conflicts
streams-benchmark/src/main/scala/zio/blocks/streams/bench/StreamConcurrentBench.scala New JMH benchmark for concurrent operators across libraries

Comment on lines +25 to +26
trait PlatformSpecific extends Platform {
override val supportsConcurrency: Boolean = true

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Scaladoc to the JVM PlatformSpecific describing virtual-thread probing/fallback and the specialized factories.

Comment thread docs/reference/ringbuffer/spsc.mdx Outdated
| `Float` | `FloatSpscRingBuffer` | one `Long` per slot: high 32 bits tag, low 32 bits hold `floatToRawIntBits(value)`. The whole word is `0L` when empty. Every `Float` (including `0.0f`, `NaN`, `±Inf`) is representable. |
| `Double` | `DoubleSpscRingBuffer` | one `Long` per slot. `doubleToRawLongBits(value)` is stored directly; a reserved non-canonical `NaN` bit pattern (`0xFFF8_0000_0000_0001L`) marks empty. Any `NaN` payload is canonicalized to `Double.NaN` on offer so it can never collide with the empty marker. |

All four use the same FastFlow algorithm as the generic version — single backing `Array[Long]`, padded indices, look-ahead cache — and expose the same operations (`offer`, `take`, `peek`, `isEmpty`, `isFull`, `size`, `capacity`, `drain`, `fill`). The only API difference is that `take` returns a primitive instead of `AnyRef`, so callers check `peek()` first to know whether the next read will be a real value or just the empty marker:

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the docs to reflect that the primitive SPSC variants only expose the core ops (offer, take, peek, isEmpty, isFull, size, capacity) and recommend peek() before take().

@jdegoes jdegoes force-pushed the streams-concurrency branch from 41491ce to c80d77d Compare June 4, 2026 02:56
Copilot AI review requested due to automatic review settings June 4, 2026 03:02
@jdegoes jdegoes force-pushed the streams-concurrency branch from c80d77d to 55e96b8 Compare June 4, 2026 03:02

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 64 out of 67 changed files in this pull request and generated 9 comments.

Comment on lines +91 to +98
def take(): Long = {
val cIdx = consumerIndex
val offset = (cIdx & mask).toInt
val e = (LONG_HANDLE.get(data, offset): Long)
LONG_HANDLE.setRelease(data, offset, EMPTY)
CONSUMER_INDEX.setOpaque(this, cIdx + 1L)
e
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: take() now uses an acquire read and returns the empty marker without advancing consumerIndex when the slot is empty, matching the generic SpscRingBuffer semantics.

Comment on lines +87 to +94
def take(): Int = {
val cIdx = consumerIndex
val offset = (cIdx & mask).toInt
val e = (LONG_HANDLE.get(data, offset): Long).toInt
LONG_HANDLE.setRelease(data, offset, 0L)
CONSUMER_INDEX.setOpaque(this, cIdx + 1L)
e
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: take() now uses an acquire read and returns the empty marker without advancing consumerIndex when the slot is empty, matching the generic SpscRingBuffer semantics.

Comment on lines +86 to +94
def take(): Float = {
val cIdx = consumerIndex
val offset = (cIdx & mask).toInt
val packed = (LONG_HANDLE.get(data, offset): Long)
val e = java.lang.Float.intBitsToFloat(packed.toInt)
LONG_HANDLE.setRelease(data, offset, 0L)
CONSUMER_INDEX.setOpaque(this, cIdx + 1L)
e
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: take() now uses an acquire read and returns the empty marker without advancing consumerIndex when the slot is empty, matching the generic SpscRingBuffer semantics.

Comment on lines +96 to +104
def take(): Double = {
val cIdx = consumerIndex
val offset = (cIdx & mask).toInt
val bits = (LONG_HANDLE.get(data, offset): Long)
val e = java.lang.Double.longBitsToDouble(bits)
LONG_HANDLE.setRelease(data, offset, EMPTY_BITS)
CONSUMER_INDEX.setOpaque(this, cIdx + 1L)
e
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: take() now uses an acquire read and returns the empty marker without advancing consumerIndex when the slot is empty, matching the generic SpscRingBuffer semantics.

assert(collect(s.take(100)))(equalTo(Chunk.fromIterable(List(1, 2))))
},
test("grouped(1) wraps each element like List.grouped(1)") {
test("chunked(1) wraps each element like List.chunked(1)") {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to List.grouped(1) to match the stdlib.


object MergeAllSpec extends StreamsBaseSpec {

def spec: Spec[TestEnvironment, Any] = suite("MergeAll")(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the suite to MergeAll and flatMapPar.

Comment on lines +42 to +48
final class BlockingSpscQueue[A <: AnyRef](capacity: Int) {

private val spinTries = 1024

private val ringBuffer: SpscRingBuffer[AnyRef] =
new SpscRingBuffer[AnyRef](nextPowerOfTwo(capacity))

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added require(capacity >= 1, ...) in the constructor.

Comment on lines +41 to +45
final class BlockingMpscQueue[A <: AnyRef](capacity: Int) {

private val ringBuffer: MpscRingBuffer[AnyRef] =
new MpscRingBuffer[AnyRef](nextPowerOfTwo(capacity))

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added require(capacity >= 1, ...) in the constructor.

Comment on lines +63 to +67
final class BlockingMpmcQueue[A <: AnyRef](capacity: Int) {

private val ringBuffer: MpmcRingBuffer[AnyRef] =
new MpmcRingBuffer[AnyRef](nextPowerOfTwo(capacity))

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added require(capacity >= 1, ...) in the constructor.

@jdegoes jdegoes force-pushed the streams-concurrency branch from 55e96b8 to 6e4910d Compare June 4, 2026 03:36
@github-actions

github-actions Bot commented Jun 4, 2026

Copy link
Copy Markdown

🚀 Preview deployed to Netlify: https://zio-blocks-pr-1451--zio-dev.netlify.app

Copilot AI review requested due to automatic review settings June 4, 2026 04:00
@jdegoes jdegoes force-pushed the streams-concurrency branch from 6e4910d to 917ba8e Compare June 4, 2026 04:00

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 64 out of 67 changed files in this pull request and generated 2 comments.

Comment on lines +128 to +148
private object VirtualThreadSupport {
val startVirtual: Option[(String, Runnable) => Thread] =
try {
val lookup = MethodHandles.lookup()
val threadClass = Class.forName("java.lang.Thread").asInstanceOf[Class[Thread]]
val ofVirtualMethod = threadClass.getMethod("ofVirtual")
val ofVirtualHandle = lookup.unreflect(ofVirtualMethod)
val builderClass = ofVirtualMethod.getReturnType
val nameHandle = lookup.unreflect(builderClass.getMethod("name", classOf[String], java.lang.Long.TYPE))
val startHandle = lookup.unreflect(builderClass.getMethod("start", classOf[Runnable]))

Some { (name: String, task: Runnable) =>
val builder = ofVirtualHandle.invoke().asInstanceOf[AnyRef]
val namedBuilder = nameHandle.invoke(builder, name, Long.box(0L)).asInstanceOf[AnyRef]
startHandle.invoke(namedBuilder, task).asInstanceOf[Thread]
}
} catch {
case _: NoSuchMethodException => None
case _: ClassNotFoundException => None
case _: SecurityException => None
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadened the catch in VirtualThreadSupport.startVirtual to NonFatal(_), so any reflective or initialization failure (including IllegalAccessException, InvocationTargetException, InaccessibleObjectException, LinkageError, etc.) falls back to the platform-thread implementation rather than crashing class init.

Comment on lines +73 to +76
def offer(a: Long): Boolean = {
if (a == EMPTY)
throw new IllegalArgumentException(s"offer(Long.MinValue) is not permitted (reserved as empty sentinel)")
val pIdx = producerIndex

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reserving Long.MinValue as the empty marker is the intentional design tradeoff for the primitive specialization (per direction in the original spec — primitive arrays only, no tuples, no boxed allocations). Added scaladoc to LongConcurrentMapParReader and LongConcurrentMergeReader documenting the limitation; if an upstream emits Long.MinValue, the IAE from offer is already surfaced as a stream error via the existing recordError path rather than leaking. Streams that legitimately need Long.MinValue should route through the boxed Long path.

@jdegoes jdegoes force-pushed the streams-concurrency branch from 917ba8e to 852a9ed Compare June 4, 2026 04:48
Copilot AI review requested due to automatic review settings June 4, 2026 13:51
@jdegoes jdegoes force-pushed the streams-concurrency branch from 852a9ed to 21b08ff Compare June 4, 2026 13:51

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 64 out of 67 changed files in this pull request and generated 4 comments.

Comment on lines +72 to +94
override def readN[A1 >: Byte](n: Int): Chunk[A1] = {
if (n <= 0 || isClosed) return Chunk.empty
if (n <= 8192) {
val arr = new Array[Byte](n)
var total = 0
while (total < n && !isClosed) {
val read = readBytes(arr, total, n - total)
if (read > 0) total += read
}
if (total == 0) Chunk.empty
else if (total == n) Chunk.fromArray(arr).asInstanceOf[Chunk[A1]]
else Chunk.fromArray(java.util.Arrays.copyOf(arr, total)).asInstanceOf[Chunk[A1]]
} else {
val b = new ChunkBuilder.Byte()
val buf = new Array[Byte](8192)
var rem = n
while (rem > 0 && !isClosed) {
val read = readBytes(buf, 0, math.min(rem, 8192))
if (read > 0) { var k = 0; while (k < read) { b.addOne(buf(k)); k += 1 }; rem -= read }
}
b.result().asInstanceOf[Chunk[A1]]
}
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed ChannelReader.readN to break out of both branches on read <= 0 so a non-blocking channel returning 0 bytes can no longer cause an infinite spin. Fixed in 51db422.

Comment on lines +79 to +86
def close(): Unit = {
if (inner != null) {
try inner.close()
catch { case _: Throwable => () }
inner = null
}
outer.close()
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JS createMergeReader fallback now sets outerDone = true inside close() so isClosed reports true after a manual close. Fixed in 51db422.

Comment thread docs/reference/ringbuffer/spsc.mdx Outdated
Comment on lines +278 to +282
| `Long` | `LongSpscRingBuffer` | one `Long` per slot. `Long.MinValue` is reserved as the empty marker; offering it throws `IllegalArgumentException`. All other `Long` values pass through unchanged. |
| `Float` | `FloatSpscRingBuffer` | one `Long` per slot: high 32 bits tag, low 32 bits hold `floatToRawIntBits(value)`. The whole word is `0L` when empty. Every `Float` (including `0.0f`, `NaN`, `±Inf`) is representable. |
| `Double` | `DoubleSpscRingBuffer` | one `Long` per slot. `doubleToRawLongBits(value)` is stored directly; a reserved non-canonical `NaN` bit pattern (`0xFFF8_0000_0000_0001L`) marks empty. Any `NaN` payload is canonicalized to `Double.NaN` on offer so it can never collide with the empty marker. |

All four use the same FastFlow algorithm as the generic version — single backing `Array[Long]`, padded indices, look-ahead cache — and expose the same core operations (`offer`, `take`, `peek`, `isEmpty`, `isFull`, `size`, `capacity`). The only API difference is that `take` returns a primitive instead of `AnyRef`, so callers should check `peek()` first to know whether the next read will be a real value or just the empty marker. (The generic `drain`/`fill` batch helpers are not provided on the primitive variants; loop `peek`/`take` directly.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a dedicated subsection describing offerDone() and pollPacked() for all four primitive variants, including the per-class sentinel constants. Fixed in 51db422.

def spec: Spec[TestEnvironment, Any] = suite("Reader.fromInputStream / fromReader")(
suite("fromInputStream")(
test("read() returns widened Int (0-255) then null on EOF") {
test("read() returns Byte (0-255) then null on EOF") {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the test to read() returns each Byte (signed) then null on EOF to match Scala/Java's signed Byte semantics. Fixed in 51db422.

@jdegoes jdegoes force-pushed the streams-concurrency branch from 21b08ff to 41fb96d Compare June 4, 2026 14:18
Copilot AI review requested due to automatic review settings June 4, 2026 14:27
@jdegoes jdegoes force-pushed the streams-concurrency branch from 41fb96d to e9f0456 Compare June 4, 2026 14:27

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 64 out of 67 changed files in this pull request and generated 3 comments.

def spec: Spec[TestEnvironment, Any] = suite("Reader.fromInputStream / fromReader")(
suite("fromInputStream")(
test("read() returns widened Int (0-255) then null on EOF") {
test("read() returns Byte (0-255) then null on EOF") {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the test to read() returns each Byte (signed) then null on EOF to match Scala/Java's signed Byte semantics. Fixed in 51db422.

Comment thread docs/reference/ringbuffer/spsc.mdx Outdated
Comment on lines +275 to +280
| Type | Class | Slot encoding |
|---|---|---|
| `Int` | `IntSpscRingBuffer` | one `Long` per slot: high 32 bits hold a 0/1 occupancy tag, low 32 bits hold the `Int` payload. The whole 64-bit word is `0L` when empty, so any tagged value is non-zero regardless of the payload bits. |
| `Long` | `LongSpscRingBuffer` | one `Long` per slot. `Long.MinValue` is reserved as the empty marker; offering it throws `IllegalArgumentException`. All other `Long` values pass through unchanged. |
| `Float` | `FloatSpscRingBuffer` | one `Long` per slot: high 32 bits tag, low 32 bits hold `floatToRawIntBits(value)`. The whole word is `0L` when empty. Every `Float` (including `0.0f`, `NaN`, `±Inf`) is representable. |
| `Double` | `DoubleSpscRingBuffer` | one `Long` per slot. `doubleToRawLongBits(value)` is stored directly; a reserved non-canonical `NaN` bit pattern (`0xFFF8_0000_0000_0001L`) marks empty. Any `NaN` payload is canonicalized to `Double.NaN` on offer so it can never collide with the empty marker. |

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the slot-encoding table to describe the DONE marker for all four primitive variants (tag value 2 for Int/Float, reserved sentinel values for Long/Double). Fixed in 51db422.

| `Float` | `FloatSpscRingBuffer` | one `Long` per slot: high 32 bits tag, low 32 bits hold `floatToRawIntBits(value)`. The whole word is `0L` when empty. Every `Float` (including `0.0f`, `NaN`, `±Inf`) is representable. |
| `Double` | `DoubleSpscRingBuffer` | one `Long` per slot. `doubleToRawLongBits(value)` is stored directly; a reserved non-canonical `NaN` bit pattern (`0xFFF8_0000_0000_0001L`) marks empty. Any `NaN` payload is canonicalized to `Double.NaN` on offer so it can never collide with the empty marker. |

All four use the same FastFlow algorithm as the generic version — single backing `Array[Long]`, padded indices, look-ahead cache — and expose the same core operations (`offer`, `take`, `peek`, `isEmpty`, `isFull`, `size`, `capacity`). The only API difference is that `take` returns a primitive instead of `AnyRef`, so callers should check `peek()` first to know whether the next read will be a real value or just the empty marker. (The generic `drain`/`fill` batch helpers are not provided on the primitive variants; loop `peek`/`take` directly.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a dedicated subsection describing offerDone() and pollPacked() for all four primitive variants, including the per-class sentinel constants. Fixed in 51db422.

…PSC ring buffers

Adds three concurrent stream operators to zio-blocks streams and the
primitive-specialized SPSC ring buffers that back them.

## Streams

New JVM-only operators (degrade to sequential on Scala.js):
  * `Stream#mapPar(n)(f)`              — fan work across `n` virtual threads.
  * `Stream.mergeAll(n)(streams)`      — concurrent fan-in.
  * `Stream#flatMapPar(n)(f)`          — per-element sub-stream fan-out.

Implementation:
  * Generic `ConcurrentMapParReader` / `ConcurrentMergeReader` keep the
    existing pull-based, typed-error `Reader[A]` programming model — the
    consumer thread still sees `Either[E, Z]`, no effect system required.
  * Per-`JvmType` specialized readers (Int/Long/Float/Double) handle the
    cross-thread handoff without boxing.
  * Backed by `SpscRingBuffer` / primitive ring buffers per worker,
    `BlockingMpmcQueue` / `BlockingMpscQueue` / `BlockingSpscQueue` for
    coordinator-to-worker fan-out where needed.
  * `Stream.bufferSize(n) { ... }` and `Pipeline.buffer(n)` expose the
    internal buffer sizing.
  * Worker threads start as virtual threads via `Platform.startVirtualThread`
    (reflection probe at class init, plain `Thread` fallback for JDK<21).
  * Cancellation is deterministic: closing the consumer reader or a typed
    error in any worker shuts down the coordinator and all workers and
    drains queues.

Correctness fixes uncovered during stabilization:
  * Specialized `Int/Long/Float/Double` mapPar readers used a separate
    control queue for the DoneSentinel; a worker could observe the stop
    signal before all input writes were visible and exit early, dropping
    the tail of the stream (visible as a `ConcurrentLawsSpec` flake).
    Added a `stopSeen` deferred-exit step so workers only stop once the
    input queue is actually empty.
  * `isClosed` previously became true as soon as workers exited and
    output queues were empty, which could fire while the consumer was
    still about to read the last element (IsClosedRegressionSpec).
    Now tracked as `eofReturned || consumerClosed`, so `isClosed`
    only flips after the consumer has actually observed EOS.

Tests (1294 streamsJVM / 1067 streamsJS on Scala 3, 1286 / 1059 on
Scala 2.13.18):
  * ConcurrentMapParReaderSpec / ConcurrentMergeReaderSpec
  * ConcurrentStressSpec / ConcurrentLawsSpec
  * BufferSizeSpec / BufferStressSpec / BufferSpec
  * IsClosedRegressionSpec / MergeInnerErrorSpec
  * MapParSpec / MapParJvmTypeSpec / MergeAllSpec
  * FloatDoubleMapParSpec — coverage for the Float/Double specialized
    mapPar readers (lifts those from 0% to ~87% statement coverage)
  * ReadNSpec / ReadNJvmSpec / ReadUpToNSpec
  * GenericFoldSpec, PlatformSpec, ReadBytesScopeAuditSpec,
    ReadBytesSoundnessSpec
  * Queue specs: BlockingMpmcQueueSpec, BlockingMpscQueueSpec,
    BlockingSpscQueueSpec

Docs:
  * docs/reference/streams/concurrent-operators.md — semantics, error
    behaviour, buffer sizing, comparison with fs2 / Kyo / Ox / Pekko.

## Ring buffers

New primitive SPSC ring buffers in the `ringbuffer` project (JVM-only):
  * `IntSpscRingBuffer`    — Long-slot, high-32-bit tag, low-32-bit payload.
  * `LongSpscRingBuffer`   — Long-slot, `Long.MinValue` as empty marker.
  * `FloatSpscRingBuffer`  — Long-slot, high-32-bit tag, low-32-bit
    `floatToRawIntBits` payload.
  * `DoubleSpscRingBuffer` — Long-slot, raw bits stored; reserved
    non-canonical NaN bit pattern as empty marker, payload NaNs
    canonicalized to `Double.NaN` on offer.

All four use the same FastFlow algorithm as the generic `SpscRingBuffer`
(single backing `Array[Long]`, padded indices, look-ahead cache), with
identical operations (`offer`, `take`, `peek`, `isEmpty`, `isFull`,
`size`, `capacity`, `drain`, `fill`).

JMH benchmarks added in `ringbuffer-benchmarks/PrimitiveSpscThroughputBenchmark`
exercise SPSC throughput for all four primitive variants.

Tests: 190 passing in `ringbufferJVM`, 98.86% statement / 96.61% branch
coverage on the ringbuffer package.

Docs: new 'Primitive variants' section in `docs/reference/ringbuffer/spsc.mdx`.
Amp-Thread-ID: https://ampcode.com/threads/T-019e8f2a-5d95-720c-9d66-0878355503ea
jdegoes added a commit that referenced this pull request Jun 11, 2026
…es, JMH optimization campaign (#1458)

Port the Option A streams design (union/Concat error channel, LinearStream,
error-handling combinators) onto the upstream streams base (#1451), then
harden and optimize it.

Port
- Union-typed error channel (Concat LUB) + LinearStream + error-handling
  combinators ported onto the upstream streams module, cross-built for
  Scala 2.13 / 3.3 LTS / 3.8 on JVM and JS.
- Test suite consolidated into the ZIO-convention spec layout
  (StreamSpec, ReaderSpec, SinkSpec, WriterSpec, PipelineSpec,
  InterpreterSpec, NioSpec, platform/version-specific specs).

Adversarial hardening (10 rounds, 26 production bugs found and fixed)
- NIO: lossless sentinel handling and close-failure integrity in
  NioStreams/NioSinks (BUG-001..004, P-001); fromByteBufferLong/Double
  reject sentinel-valued elements loudly instead of truncating.
- ConcatReader: bulk-read truncation, single-advance segment loss,
  close-failure swallowing, writer FD leak (BUG-201..203, readUpToN family).
- Interpreter: inner-seal ordering, lane staleness, boxed-source lane
  bridging, readByte low-byte routing (BUG-A01/A02, BUG-INT-1, BUG-R6-01).
- Recovery readers: prefix loss on lossless bulk reads during catchAll.
- Writer: flag conflation and writeable() accuracy.
- SkipLimitReader: eager-skip resource leak.
- Concurrent readers (merge/mapPar, all primitive lanes): close-failure
  routing, reset() support, lossless reserved Long.MinValue/+1 via an
  atomic escape channel (LongSpscRingBuffer.offerDoneAfter), bulk-array
  and concurrent-merge sentinel EOF detection, lastReadWasEOF maintenance
  (infinite-drain OOM fix).
- Resources: FromAcquireRelease/FromResource once-only release; double
  finalization when close() throws during switch/advance; flatMap/buffer/
  fromIterator replayable under repeated; Repeated error latches;
  singleton repeat-close.
- intersperse/scan readable() accuracy; SinkError origin attribution;
  ByteBuffer window compose/clamp/snapshot semantics.
- Each fix is enshrined by a failing-repro-first regression test;
  convergence-evidence suites and 4 runnable walkthroughs added under
  streams-examples.

Performance (3-session JMH campaign, A/B/A baseline-vs-candidate per change)
- Map-stage fusion (ComposedInt / ComposedIntArray flat stage arrays).
- Reusable flatMap inner readers + SucceedPrim/RangedStream leaf nodes;
  ungated ReusableIntRange reuse for all spans.
- Bulk terminal fold (Reader.foldInt): flatMap/concat/filter drains
  collapse onto leaf loops.
- Thunk-free presized deep concat-spine assembly; presized segmented
  compile (capacity hints + exact spine collection).
- Headline JMH deltas vs original baseline (final confirmation run, -f 2,
  all SIGNIFICANT): zb_chainedMaps +2616%, zb_chainedMaps100 +711%,
  zb_mixed_3 +278%, zb_flatMap +220%, zb_mapFilterFlatMap +205%
  (alloc 240KB -> 276B/op), zb_concat +206%, zb_take +83%.
  zb_nested_flatMap -5.6% (mostly pre-existing machine drift; alloc cut
  1.26MB -> 1.01MB/op); remaining -2..-4% paths are documented drift on
  untouched code with overlapping CIs.

Supporting changes
- ringbuffer: additive private[blocks] LongSpscRingBuffer helpers
  (isReserved, offerNonReserved, offerDoneAfter) for the lossless
  reserved-value escape protocol.
- AGENTS.md: new "Sentinel performance policy (streams)" section — hot
  drain loops stay one primitive comparison per element; approved
  zero-cost remedies for sentinel/EOF collisions.
- docs: reader.md/sink.md updated for the lossless sentinel-guard
  behavior.

Verification
- streamsJVM + streamsJS tests green on 2.13.18, 3.3.7, 3.8.3
  (3.8.3: 2228/0 JVM, 1924/0 JS).
- ringbuffer tests, scalafmt + header checks, full Test/compile,
  docs/mdoc all green.

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants