feat(streams,ringbuffer): concurrent stream operators (mapPar / mergeAll / flatMapPar) + primitive SPSC ring buffers#1451
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces concurrent stream operators to zio.blocks.streams (JVM with sequential fallbacks on Scala.js), backed by new blocking queue primitives and primitive-specialized SPSC ring buffers, plus extensive new test coverage and documentation updates.
Changes:
- Added concurrent stream operators and buffering facilities (
mapPar,mergeAll,flatMapPar, plusStream.bufferSize,Stream.buffer, andPipeline.buffer). - Implemented JVM concurrency infrastructure (virtual-thread probing/fallback, concurrent buffered reader, blocking queues, specialized concurrent readers) and Scala.js sequential degradations.
- Added primitive-specialized SPSC ring buffers (Int/Long/Float/Double), updated docs/sidebars, and added broad regression + stress test coverage and benchmarks.
Reviewed changes
Copilot reviewed 64 out of 67 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| streams/shared/src/main/scala/zio/blocks/streams/Platform.scala | New cross-platform Platform abstraction for concurrency + reader factories |
| streams/jvm/src/main/scala/zio/blocks/streams/PlatformSpecific.scala | JVM implementation: virtual threads + concurrent reader factories |
| streams/js/src/main/scala/zio/blocks/streams/PlatformSpecific.scala | Scala.js implementation: sequential degradations + sync buffering |
| streams/shared/src/main/scala/zio/blocks/streams/Pipeline.scala | Added Pipeline.buffer and Pipeline.chunked helpers |
| streams/shared/src/main/scala/zio/blocks/streams/internal/SyncBufferedReader.scala | JS-only sync-prefetch buffered reader |
| streams/jvm/src/main/scala/zio/blocks/streams/internal/ConcurrentBufferedReader.scala | JVM concurrent buffered reader using a blocking SPSC queue |
| streams/jvm/src/main/scala/zio/blocks/streams/internal/ConcurrentMapParReader.scala | Generic concurrent mapPar reader implementation |
| streams/jvm/src/main/scala/zio/blocks/streams/internal/IntConcurrentMergeReader.scala | Primitive-specialized mergeAll reader (Int) |
| streams/jvm/src/main/scala/zio/blocks/streams/internal/LongConcurrentMergeReader.scala | Primitive-specialized mergeAll reader (Long) |
| streams/jvm/src/main/scala/zio/blocks/streams/internal/FloatConcurrentMergeReader.scala | Primitive-specialized mergeAll reader (Float) |
| streams/jvm/src/main/scala/zio/blocks/streams/internal/DoubleConcurrentMergeReader.scala | Primitive-specialized mergeAll reader (Double) |
| streams/jvm/src/main/scala/zio/blocks/streams/internal/ByteBufferReader.scala | Added readN/readUpToN overrides + byte-evidence readBytes signature |
| streams/jvm/src/main/scala/zio/blocks/streams/internal/ChannelReader.scala | Added readN/readUpToN overrides + byte-evidence readBytes signature |
| streams/jvm/src/main/scala/zio/blocks/streams/queues/BlockingSpscQueue.scala | New blocking SPSC queue primitive |
| streams/jvm/src/main/scala/zio/blocks/streams/queues/BlockingMpscQueue.scala | New blocking MPSC queue primitive |
| streams/jvm/src/main/scala/zio/blocks/streams/queues/BlockingMpmcQueue.scala | New blocking MPMC queue primitive |
| ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/SpscRingBuffer.scala | Memory-ordering tweaks and producerLimit update in slow path |
| ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/IntSpscRingBuffer.scala | New primitive SPSC ring buffer for Int |
| ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/LongSpscRingBuffer.scala | New primitive SPSC ring buffer for Long (with sentinel reservation) |
| ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/FloatSpscRingBuffer.scala | New primitive SPSC ring buffer for Float |
| ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/DoubleSpscRingBuffer.scala | New primitive SPSC ring buffer for Double (NaN-bit reservation) |
| docs/reference/streams/concurrent-operators.md | New docs page for concurrent stream operators |
| docs/reference/ringbuffer/spsc.mdx | Documented primitive ring buffer variants |
| docs/sidebars.js | Added new streams concurrent-operators doc page to sidebar |
| build.sbt | Wired streams to depend on ringbuffer; adjusted streams test parallelism; bumped Kyo version in benchmarks |
| streams/shared/src/test/scala/zio/blocks/streams/ReadNSpec.scala | New shared tests for Reader#readN |
| streams/shared/src/test/scala/zio/blocks/streams/ReaderFromJavaSpec.scala | Updated byte semantics expectations + added readBytes edge-case tests |
| streams/shared/src/test/scala/zio/blocks/streams/NewCombinatorsSpec.scala | Renamed grouped→chunked test coverage updates |
| streams/shared/src/test/scala/zio/blocks/streams/MergeAllSpec.scala | New shared mergeAll/flatMapPar tests (sequential aspect) |
| streams/shared/src/test/scala/zio/blocks/streams/MapParSpec.scala | New shared mapPar tests (sequential aspect) |
| streams/shared/src/test/scala/zio/blocks/streams/MapParJvmTypeSpec.scala | New tests asserting JvmType propagation through mapPar |
| streams/shared/src/test/scala/zio/blocks/streams/GenericFoldSpec.scala | New tests for generic runFold specialization dispatch |
| streams/shared/src/test/scala/zio/blocks/streams/IntSpecializationSpec.scala | Minor type annotation tweak for byte reader |
| streams/shared/src/test/scala/zio/blocks/streams/ConcurrentLawsSpec.scala | New property-law tests for concurrent operators |
| streams/shared/src/test/scala/zio/blocks/streams/BufferSpec.scala | New shared buffer behavior tests |
| streams/shared/src/test/scala-3/zio/blocks/streams/ReadBytesSoundnessSpec.scala | Scala 3 compile-time evidence tests for readBytes |
| streams/shared/src/test/scala-3/zio/blocks/streams/ReadBytesScopeAuditSpec.scala | Scala 3 compile-time audit tests for evidence scoping decisions |
| streams/jvm/src/test/scala/zio/blocks/streams/ReadNJvmSpec.scala | JVM-specific readN override tests (NIO readers) |
| streams/jvm/src/test/scala/zio/blocks/streams/PlatformSpec.scala | JVM platform tests for virtual thread start + buffer behavior |
| streams/jvm/src/test/scala/zio/blocks/streams/BufferSizeSpec.scala | JVM tests for Stream.bufferSize semantics |
| streams/jvm/src/test/scala/zio/blocks/streams/BufferStressSpec.scala | JVM stress tests for buffer correctness + thread cleanup |
| streams/jvm/src/test/scala/zio/blocks/streams/ConcurrentStressSpec.scala | JVM stress tests for mergeAll/mapPar correctness + leak checks |
| streams/jvm/src/test/scala/zio/blocks/streams/ConcurrentMapParReaderSpec.scala | JVM tests for mapPar reader correctness, defects, termination, leaks |
| streams/jvm/src/test/scala/zio/blocks/streams/ConcurrentMergeReaderSpec.scala | JVM tests for mergeAll reader correctness, errors, termination, leaks |
| streams/jvm/src/test/scala/zio/blocks/streams/MergeInnerErrorSpec.scala | JVM regression tests for inner-error termination/hang prevention |
| streams/jvm/src/test/scala/zio/blocks/streams/IsClosedRegressionSpec.scala | JVM regression tests around isClosed visibility during consumption |
| streams/jvm/src/test/scala/zio/blocks/streams/FloatDoubleMapParSpec.scala | JVM coverage tests for Float/Double specialized mapPar readers |
| streams/jvm/src/test/scala/zio/blocks/streams/queues/BlockingSpscQueueSpec.scala | Tests for blocking SPSC queue behavior + concurrency/stress |
| streams/jvm/src/test/scala/zio/blocks/streams/queues/BlockingMpscQueueSpec.scala | Tests for blocking MPSC queue behavior + concurrency/stress |
| streams/jvm/src/test/scala/zio/blocks/streams/queues/BlockingMpmcQueueSpec.scala | Tests for blocking MPMC queue behavior + concurrency/stress |
| streams/js/src/test/scala/zio/blocks/streams/.gitkeep | Keeps JS test dir in VCS |
| streams/js/src/main/scala/zio/blocks/streams/.gitkeep | Keeps JS main dir in VCS |
| streams-benchmark/src/main/scala/zio/blocks/streams/bench/StreamSetupBench.scala | Updated Kyo imports to avoid Scope conflicts |
| streams-benchmark/src/main/scala/zio/blocks/streams/bench/StreamPipelineBench.scala | Updated Kyo imports to avoid Scope conflicts |
| streams-benchmark/src/main/scala/zio/blocks/streams/bench/StreamEvalBench.scala | Updated Kyo imports to avoid Scope conflicts |
| streams-benchmark/src/main/scala/zio/blocks/streams/bench/StreamConcurrentBench.scala | New JMH benchmark for concurrent operators across libraries |
| trait PlatformSpecific extends Platform { | ||
| override val supportsConcurrency: Boolean = true |
There was a problem hiding this comment.
Added Scaladoc to the JVM PlatformSpecific describing virtual-thread probing/fallback and the specialized factories.
| | `Float` | `FloatSpscRingBuffer` | one `Long` per slot: high 32 bits tag, low 32 bits hold `floatToRawIntBits(value)`. The whole word is `0L` when empty. Every `Float` (including `0.0f`, `NaN`, `±Inf`) is representable. | | ||
| | `Double` | `DoubleSpscRingBuffer` | one `Long` per slot. `doubleToRawLongBits(value)` is stored directly; a reserved non-canonical `NaN` bit pattern (`0xFFF8_0000_0000_0001L`) marks empty. Any `NaN` payload is canonicalized to `Double.NaN` on offer so it can never collide with the empty marker. | | ||
|
|
||
| All four use the same FastFlow algorithm as the generic version — single backing `Array[Long]`, padded indices, look-ahead cache — and expose the same operations (`offer`, `take`, `peek`, `isEmpty`, `isFull`, `size`, `capacity`, `drain`, `fill`). The only API difference is that `take` returns a primitive instead of `AnyRef`, so callers check `peek()` first to know whether the next read will be a real value or just the empty marker: |
There was a problem hiding this comment.
Fixed the docs to reflect that the primitive SPSC variants only expose the core ops (offer, take, peek, isEmpty, isFull, size, capacity) and recommend peek() before take().
41491ce to
c80d77d
Compare
c80d77d to
55e96b8
Compare
| def take(): Long = { | ||
| val cIdx = consumerIndex | ||
| val offset = (cIdx & mask).toInt | ||
| val e = (LONG_HANDLE.get(data, offset): Long) | ||
| LONG_HANDLE.setRelease(data, offset, EMPTY) | ||
| CONSUMER_INDEX.setOpaque(this, cIdx + 1L) | ||
| e | ||
| } |
There was a problem hiding this comment.
Fixed: take() now uses an acquire read and returns the empty marker without advancing consumerIndex when the slot is empty, matching the generic SpscRingBuffer semantics.
| def take(): Int = { | ||
| val cIdx = consumerIndex | ||
| val offset = (cIdx & mask).toInt | ||
| val e = (LONG_HANDLE.get(data, offset): Long).toInt | ||
| LONG_HANDLE.setRelease(data, offset, 0L) | ||
| CONSUMER_INDEX.setOpaque(this, cIdx + 1L) | ||
| e | ||
| } |
There was a problem hiding this comment.
Fixed: take() now uses an acquire read and returns the empty marker without advancing consumerIndex when the slot is empty, matching the generic SpscRingBuffer semantics.
| def take(): Float = { | ||
| val cIdx = consumerIndex | ||
| val offset = (cIdx & mask).toInt | ||
| val packed = (LONG_HANDLE.get(data, offset): Long) | ||
| val e = java.lang.Float.intBitsToFloat(packed.toInt) | ||
| LONG_HANDLE.setRelease(data, offset, 0L) | ||
| CONSUMER_INDEX.setOpaque(this, cIdx + 1L) | ||
| e | ||
| } |
There was a problem hiding this comment.
Fixed: take() now uses an acquire read and returns the empty marker without advancing consumerIndex when the slot is empty, matching the generic SpscRingBuffer semantics.
| def take(): Double = { | ||
| val cIdx = consumerIndex | ||
| val offset = (cIdx & mask).toInt | ||
| val bits = (LONG_HANDLE.get(data, offset): Long) | ||
| val e = java.lang.Double.longBitsToDouble(bits) | ||
| LONG_HANDLE.setRelease(data, offset, EMPTY_BITS) | ||
| CONSUMER_INDEX.setOpaque(this, cIdx + 1L) | ||
| e | ||
| } |
There was a problem hiding this comment.
Fixed: take() now uses an acquire read and returns the empty marker without advancing consumerIndex when the slot is empty, matching the generic SpscRingBuffer semantics.
| assert(collect(s.take(100)))(equalTo(Chunk.fromIterable(List(1, 2)))) | ||
| }, | ||
| test("grouped(1) wraps each element like List.grouped(1)") { | ||
| test("chunked(1) wraps each element like List.chunked(1)") { |
There was a problem hiding this comment.
Renamed to List.grouped(1) to match the stdlib.
|
|
||
| object MergeAllSpec extends StreamsBaseSpec { | ||
|
|
||
| def spec: Spec[TestEnvironment, Any] = suite("MergeAll")( |
There was a problem hiding this comment.
Renamed the suite to MergeAll and flatMapPar.
| final class BlockingSpscQueue[A <: AnyRef](capacity: Int) { | ||
|
|
||
| private val spinTries = 1024 | ||
|
|
||
| private val ringBuffer: SpscRingBuffer[AnyRef] = | ||
| new SpscRingBuffer[AnyRef](nextPowerOfTwo(capacity)) | ||
|
|
There was a problem hiding this comment.
Added require(capacity >= 1, ...) in the constructor.
| final class BlockingMpscQueue[A <: AnyRef](capacity: Int) { | ||
|
|
||
| private val ringBuffer: MpscRingBuffer[AnyRef] = | ||
| new MpscRingBuffer[AnyRef](nextPowerOfTwo(capacity)) | ||
|
|
There was a problem hiding this comment.
Added require(capacity >= 1, ...) in the constructor.
| final class BlockingMpmcQueue[A <: AnyRef](capacity: Int) { | ||
|
|
||
| private val ringBuffer: MpmcRingBuffer[AnyRef] = | ||
| new MpmcRingBuffer[AnyRef](nextPowerOfTwo(capacity)) | ||
|
|
There was a problem hiding this comment.
Added require(capacity >= 1, ...) in the constructor.
55e96b8 to
6e4910d
Compare
|
🚀 Preview deployed to Netlify: https://zio-blocks-pr-1451--zio-dev.netlify.app |
6e4910d to
917ba8e
Compare
| private object VirtualThreadSupport { | ||
| val startVirtual: Option[(String, Runnable) => Thread] = | ||
| try { | ||
| val lookup = MethodHandles.lookup() | ||
| val threadClass = Class.forName("java.lang.Thread").asInstanceOf[Class[Thread]] | ||
| val ofVirtualMethod = threadClass.getMethod("ofVirtual") | ||
| val ofVirtualHandle = lookup.unreflect(ofVirtualMethod) | ||
| val builderClass = ofVirtualMethod.getReturnType | ||
| val nameHandle = lookup.unreflect(builderClass.getMethod("name", classOf[String], java.lang.Long.TYPE)) | ||
| val startHandle = lookup.unreflect(builderClass.getMethod("start", classOf[Runnable])) | ||
|
|
||
| Some { (name: String, task: Runnable) => | ||
| val builder = ofVirtualHandle.invoke().asInstanceOf[AnyRef] | ||
| val namedBuilder = nameHandle.invoke(builder, name, Long.box(0L)).asInstanceOf[AnyRef] | ||
| startHandle.invoke(namedBuilder, task).asInstanceOf[Thread] | ||
| } | ||
| } catch { | ||
| case _: NoSuchMethodException => None | ||
| case _: ClassNotFoundException => None | ||
| case _: SecurityException => None | ||
| } |
There was a problem hiding this comment.
Broadened the catch in VirtualThreadSupport.startVirtual to NonFatal(_), so any reflective or initialization failure (including IllegalAccessException, InvocationTargetException, InaccessibleObjectException, LinkageError, etc.) falls back to the platform-thread implementation rather than crashing class init.
| def offer(a: Long): Boolean = { | ||
| if (a == EMPTY) | ||
| throw new IllegalArgumentException(s"offer(Long.MinValue) is not permitted (reserved as empty sentinel)") | ||
| val pIdx = producerIndex |
There was a problem hiding this comment.
Reserving Long.MinValue as the empty marker is the intentional design tradeoff for the primitive specialization (per direction in the original spec — primitive arrays only, no tuples, no boxed allocations). Added scaladoc to LongConcurrentMapParReader and LongConcurrentMergeReader documenting the limitation; if an upstream emits Long.MinValue, the IAE from offer is already surfaced as a stream error via the existing recordError path rather than leaking. Streams that legitimately need Long.MinValue should route through the boxed Long path.
917ba8e to
852a9ed
Compare
852a9ed to
21b08ff
Compare
| override def readN[A1 >: Byte](n: Int): Chunk[A1] = { | ||
| if (n <= 0 || isClosed) return Chunk.empty | ||
| if (n <= 8192) { | ||
| val arr = new Array[Byte](n) | ||
| var total = 0 | ||
| while (total < n && !isClosed) { | ||
| val read = readBytes(arr, total, n - total) | ||
| if (read > 0) total += read | ||
| } | ||
| if (total == 0) Chunk.empty | ||
| else if (total == n) Chunk.fromArray(arr).asInstanceOf[Chunk[A1]] | ||
| else Chunk.fromArray(java.util.Arrays.copyOf(arr, total)).asInstanceOf[Chunk[A1]] | ||
| } else { | ||
| val b = new ChunkBuilder.Byte() | ||
| val buf = new Array[Byte](8192) | ||
| var rem = n | ||
| while (rem > 0 && !isClosed) { | ||
| val read = readBytes(buf, 0, math.min(rem, 8192)) | ||
| if (read > 0) { var k = 0; while (k < read) { b.addOne(buf(k)); k += 1 }; rem -= read } | ||
| } | ||
| b.result().asInstanceOf[Chunk[A1]] | ||
| } | ||
| } |
There was a problem hiding this comment.
Fixed ChannelReader.readN to break out of both branches on read <= 0 so a non-blocking channel returning 0 bytes can no longer cause an infinite spin. Fixed in 51db422.
| def close(): Unit = { | ||
| if (inner != null) { | ||
| try inner.close() | ||
| catch { case _: Throwable => () } | ||
| inner = null | ||
| } | ||
| outer.close() | ||
| } |
There was a problem hiding this comment.
JS createMergeReader fallback now sets outerDone = true inside close() so isClosed reports true after a manual close. Fixed in 51db422.
| | `Long` | `LongSpscRingBuffer` | one `Long` per slot. `Long.MinValue` is reserved as the empty marker; offering it throws `IllegalArgumentException`. All other `Long` values pass through unchanged. | | ||
| | `Float` | `FloatSpscRingBuffer` | one `Long` per slot: high 32 bits tag, low 32 bits hold `floatToRawIntBits(value)`. The whole word is `0L` when empty. Every `Float` (including `0.0f`, `NaN`, `±Inf`) is representable. | | ||
| | `Double` | `DoubleSpscRingBuffer` | one `Long` per slot. `doubleToRawLongBits(value)` is stored directly; a reserved non-canonical `NaN` bit pattern (`0xFFF8_0000_0000_0001L`) marks empty. Any `NaN` payload is canonicalized to `Double.NaN` on offer so it can never collide with the empty marker. | | ||
|
|
||
| All four use the same FastFlow algorithm as the generic version — single backing `Array[Long]`, padded indices, look-ahead cache — and expose the same core operations (`offer`, `take`, `peek`, `isEmpty`, `isFull`, `size`, `capacity`). The only API difference is that `take` returns a primitive instead of `AnyRef`, so callers should check `peek()` first to know whether the next read will be a real value or just the empty marker. (The generic `drain`/`fill` batch helpers are not provided on the primitive variants; loop `peek`/`take` directly.) |
There was a problem hiding this comment.
Added a dedicated subsection describing offerDone() and pollPacked() for all four primitive variants, including the per-class sentinel constants. Fixed in 51db422.
| def spec: Spec[TestEnvironment, Any] = suite("Reader.fromInputStream / fromReader")( | ||
| suite("fromInputStream")( | ||
| test("read() returns widened Int (0-255) then null on EOF") { | ||
| test("read() returns Byte (0-255) then null on EOF") { |
There was a problem hiding this comment.
Renamed the test to read() returns each Byte (signed) then null on EOF to match Scala/Java's signed Byte semantics. Fixed in 51db422.
21b08ff to
41fb96d
Compare
41fb96d to
e9f0456
Compare
| def spec: Spec[TestEnvironment, Any] = suite("Reader.fromInputStream / fromReader")( | ||
| suite("fromInputStream")( | ||
| test("read() returns widened Int (0-255) then null on EOF") { | ||
| test("read() returns Byte (0-255) then null on EOF") { |
There was a problem hiding this comment.
Renamed the test to read() returns each Byte (signed) then null on EOF to match Scala/Java's signed Byte semantics. Fixed in 51db422.
| | Type | Class | Slot encoding | | ||
| |---|---|---| | ||
| | `Int` | `IntSpscRingBuffer` | one `Long` per slot: high 32 bits hold a 0/1 occupancy tag, low 32 bits hold the `Int` payload. The whole 64-bit word is `0L` when empty, so any tagged value is non-zero regardless of the payload bits. | | ||
| | `Long` | `LongSpscRingBuffer` | one `Long` per slot. `Long.MinValue` is reserved as the empty marker; offering it throws `IllegalArgumentException`. All other `Long` values pass through unchanged. | | ||
| | `Float` | `FloatSpscRingBuffer` | one `Long` per slot: high 32 bits tag, low 32 bits hold `floatToRawIntBits(value)`. The whole word is `0L` when empty. Every `Float` (including `0.0f`, `NaN`, `±Inf`) is representable. | | ||
| | `Double` | `DoubleSpscRingBuffer` | one `Long` per slot. `doubleToRawLongBits(value)` is stored directly; a reserved non-canonical `NaN` bit pattern (`0xFFF8_0000_0000_0001L`) marks empty. Any `NaN` payload is canonicalized to `Double.NaN` on offer so it can never collide with the empty marker. | |
There was a problem hiding this comment.
Updated the slot-encoding table to describe the DONE marker for all four primitive variants (tag value 2 for Int/Float, reserved sentinel values for Long/Double). Fixed in 51db422.
| | `Float` | `FloatSpscRingBuffer` | one `Long` per slot: high 32 bits tag, low 32 bits hold `floatToRawIntBits(value)`. The whole word is `0L` when empty. Every `Float` (including `0.0f`, `NaN`, `±Inf`) is representable. | | ||
| | `Double` | `DoubleSpscRingBuffer` | one `Long` per slot. `doubleToRawLongBits(value)` is stored directly; a reserved non-canonical `NaN` bit pattern (`0xFFF8_0000_0000_0001L`) marks empty. Any `NaN` payload is canonicalized to `Double.NaN` on offer so it can never collide with the empty marker. | | ||
|
|
||
| All four use the same FastFlow algorithm as the generic version — single backing `Array[Long]`, padded indices, look-ahead cache — and expose the same core operations (`offer`, `take`, `peek`, `isEmpty`, `isFull`, `size`, `capacity`). The only API difference is that `take` returns a primitive instead of `AnyRef`, so callers should check `peek()` first to know whether the next read will be a real value or just the empty marker. (The generic `drain`/`fill` batch helpers are not provided on the primitive variants; loop `peek`/`take` directly.) |
There was a problem hiding this comment.
Added a dedicated subsection describing offerDone() and pollPacked() for all four primitive variants, including the per-class sentinel constants. Fixed in 51db422.
…PSC ring buffers
Adds three concurrent stream operators to zio-blocks streams and the
primitive-specialized SPSC ring buffers that back them.
## Streams
New JVM-only operators (degrade to sequential on Scala.js):
* `Stream#mapPar(n)(f)` — fan work across `n` virtual threads.
* `Stream.mergeAll(n)(streams)` — concurrent fan-in.
* `Stream#flatMapPar(n)(f)` — per-element sub-stream fan-out.
Implementation:
* Generic `ConcurrentMapParReader` / `ConcurrentMergeReader` keep the
existing pull-based, typed-error `Reader[A]` programming model — the
consumer thread still sees `Either[E, Z]`, no effect system required.
* Per-`JvmType` specialized readers (Int/Long/Float/Double) handle the
cross-thread handoff without boxing.
* Backed by `SpscRingBuffer` / primitive ring buffers per worker,
`BlockingMpmcQueue` / `BlockingMpscQueue` / `BlockingSpscQueue` for
coordinator-to-worker fan-out where needed.
* `Stream.bufferSize(n) { ... }` and `Pipeline.buffer(n)` expose the
internal buffer sizing.
* Worker threads start as virtual threads via `Platform.startVirtualThread`
(reflection probe at class init, plain `Thread` fallback for JDK<21).
* Cancellation is deterministic: closing the consumer reader or a typed
error in any worker shuts down the coordinator and all workers and
drains queues.
Correctness fixes uncovered during stabilization:
* Specialized `Int/Long/Float/Double` mapPar readers used a separate
control queue for the DoneSentinel; a worker could observe the stop
signal before all input writes were visible and exit early, dropping
the tail of the stream (visible as a `ConcurrentLawsSpec` flake).
Added a `stopSeen` deferred-exit step so workers only stop once the
input queue is actually empty.
* `isClosed` previously became true as soon as workers exited and
output queues were empty, which could fire while the consumer was
still about to read the last element (IsClosedRegressionSpec).
Now tracked as `eofReturned || consumerClosed`, so `isClosed`
only flips after the consumer has actually observed EOS.
Tests (1294 streamsJVM / 1067 streamsJS on Scala 3, 1286 / 1059 on
Scala 2.13.18):
* ConcurrentMapParReaderSpec / ConcurrentMergeReaderSpec
* ConcurrentStressSpec / ConcurrentLawsSpec
* BufferSizeSpec / BufferStressSpec / BufferSpec
* IsClosedRegressionSpec / MergeInnerErrorSpec
* MapParSpec / MapParJvmTypeSpec / MergeAllSpec
* FloatDoubleMapParSpec — coverage for the Float/Double specialized
mapPar readers (lifts those from 0% to ~87% statement coverage)
* ReadNSpec / ReadNJvmSpec / ReadUpToNSpec
* GenericFoldSpec, PlatformSpec, ReadBytesScopeAuditSpec,
ReadBytesSoundnessSpec
* Queue specs: BlockingMpmcQueueSpec, BlockingMpscQueueSpec,
BlockingSpscQueueSpec
Docs:
* docs/reference/streams/concurrent-operators.md — semantics, error
behaviour, buffer sizing, comparison with fs2 / Kyo / Ox / Pekko.
## Ring buffers
New primitive SPSC ring buffers in the `ringbuffer` project (JVM-only):
* `IntSpscRingBuffer` — Long-slot, high-32-bit tag, low-32-bit payload.
* `LongSpscRingBuffer` — Long-slot, `Long.MinValue` as empty marker.
* `FloatSpscRingBuffer` — Long-slot, high-32-bit tag, low-32-bit
`floatToRawIntBits` payload.
* `DoubleSpscRingBuffer` — Long-slot, raw bits stored; reserved
non-canonical NaN bit pattern as empty marker, payload NaNs
canonicalized to `Double.NaN` on offer.
All four use the same FastFlow algorithm as the generic `SpscRingBuffer`
(single backing `Array[Long]`, padded indices, look-ahead cache), with
identical operations (`offer`, `take`, `peek`, `isEmpty`, `isFull`,
`size`, `capacity`, `drain`, `fill`).
JMH benchmarks added in `ringbuffer-benchmarks/PrimitiveSpscThroughputBenchmark`
exercise SPSC throughput for all four primitive variants.
Tests: 190 passing in `ringbufferJVM`, 98.86% statement / 96.61% branch
coverage on the ringbuffer package.
Docs: new 'Primitive variants' section in `docs/reference/ringbuffer/spsc.mdx`.
Amp-Thread-ID: https://ampcode.com/threads/T-019e8f2a-5d95-720c-9d66-0878355503ea
e9f0456 to
51db422
Compare
…es, JMH optimization campaign (#1458) Port the Option A streams design (union/Concat error channel, LinearStream, error-handling combinators) onto the upstream streams base (#1451), then harden and optimize it. Port - Union-typed error channel (Concat LUB) + LinearStream + error-handling combinators ported onto the upstream streams module, cross-built for Scala 2.13 / 3.3 LTS / 3.8 on JVM and JS. - Test suite consolidated into the ZIO-convention spec layout (StreamSpec, ReaderSpec, SinkSpec, WriterSpec, PipelineSpec, InterpreterSpec, NioSpec, platform/version-specific specs). Adversarial hardening (10 rounds, 26 production bugs found and fixed) - NIO: lossless sentinel handling and close-failure integrity in NioStreams/NioSinks (BUG-001..004, P-001); fromByteBufferLong/Double reject sentinel-valued elements loudly instead of truncating. - ConcatReader: bulk-read truncation, single-advance segment loss, close-failure swallowing, writer FD leak (BUG-201..203, readUpToN family). - Interpreter: inner-seal ordering, lane staleness, boxed-source lane bridging, readByte low-byte routing (BUG-A01/A02, BUG-INT-1, BUG-R6-01). - Recovery readers: prefix loss on lossless bulk reads during catchAll. - Writer: flag conflation and writeable() accuracy. - SkipLimitReader: eager-skip resource leak. - Concurrent readers (merge/mapPar, all primitive lanes): close-failure routing, reset() support, lossless reserved Long.MinValue/+1 via an atomic escape channel (LongSpscRingBuffer.offerDoneAfter), bulk-array and concurrent-merge sentinel EOF detection, lastReadWasEOF maintenance (infinite-drain OOM fix). - Resources: FromAcquireRelease/FromResource once-only release; double finalization when close() throws during switch/advance; flatMap/buffer/ fromIterator replayable under repeated; Repeated error latches; singleton repeat-close. - intersperse/scan readable() accuracy; SinkError origin attribution; ByteBuffer window compose/clamp/snapshot semantics. - Each fix is enshrined by a failing-repro-first regression test; convergence-evidence suites and 4 runnable walkthroughs added under streams-examples. Performance (3-session JMH campaign, A/B/A baseline-vs-candidate per change) - Map-stage fusion (ComposedInt / ComposedIntArray flat stage arrays). - Reusable flatMap inner readers + SucceedPrim/RangedStream leaf nodes; ungated ReusableIntRange reuse for all spans. - Bulk terminal fold (Reader.foldInt): flatMap/concat/filter drains collapse onto leaf loops. - Thunk-free presized deep concat-spine assembly; presized segmented compile (capacity hints + exact spine collection). - Headline JMH deltas vs original baseline (final confirmation run, -f 2, all SIGNIFICANT): zb_chainedMaps +2616%, zb_chainedMaps100 +711%, zb_mixed_3 +278%, zb_flatMap +220%, zb_mapFilterFlatMap +205% (alloc 240KB -> 276B/op), zb_concat +206%, zb_take +83%. zb_nested_flatMap -5.6% (mostly pre-existing machine drift; alloc cut 1.26MB -> 1.01MB/op); remaining -2..-4% paths are documented drift on untouched code with overlapping CIs. Supporting changes - ringbuffer: additive private[blocks] LongSpscRingBuffer helpers (isReserved, offerNonReserved, offerDoneAfter) for the lossless reserved-value escape protocol. - AGENTS.md: new "Sentinel performance policy (streams)" section — hot drain loops stay one primitive comparison per element; approved zero-cost remedies for sentinel/EOF collisions. - docs: reader.md/sink.md updated for the lossless sentinel-guard behavior. Verification - streamsJVM + streamsJS tests green on 2.13.18, 3.3.7, 3.8.3 (3.8.3: 2228/0 JVM, 1924/0 JS). - ringbuffer tests, scalafmt + header checks, full Test/compile, docs/mdoc all green. Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Motivation
ZIO Blocks streams previously had no concurrent operators. Long-running streams that wanted to fan out per-element work (
mapPar), drain multiple sub-streams concurrently (mergeAll,flatMapPar), or hand off batches between producer and consumer threads, had to fall back to either external concurrency frameworks or homegrown thread pools — losing the typed-error, synchronous, pull-basedReader[A]programming model that the rest of the library is built around.This PR adds three concurrent stream operators, the ring-buffer machinery that backs them, and keeps the existing programming model: the consumer thread still receives
Either[E, Z]— no effect system required, noIO/Taskboilerplate.What changed
New
Streamoperators (JVM-only; degrade to sequential on Scala.js)Stream#mapPar(n)(f)fto each element on up tonworker threads. Output is unordered (arrival order).Stream.mergeAll(n)(streams)ninner streams concurrently; interleave their elements as they arrive.Stream#flatMapPar(n)(f)f; drain up tonsub-streams concurrently.All three use virtual threads on JDK 21+ (reflective probe at class init; falls back to plain
Threadon JDK <21). Errors from any worker terminate all concurrent work and surface asLeft(e)from the terminal operation. Resource safety is deterministic — closing the consumer reader, an error, or a scope finalizer shuts down workers and drains queues.Buffer sizing is exposed via
Stream.bufferSize(n) { ... }(powers of two) andPipeline.buffer(n).Primitive specialization
Per-
JvmTypespecialized readers are picked inPlatformSpecific.createMapParReader/createMergeReader:Int/Long/Float/Doubleflows go through primitive-typed readers that use the new primitive SPSC ring buffers for the cross-thread handoff (zero boxing on the producer side).AnyRefflows fall back toConcurrentMapParReader[A, B]/ConcurrentMergeReader[A].Primitive SPSC ring buffers (
ringbufferproject)Four new classes in
ringbuffer/jvm/src/main/scala/zio/blocks/ringbuffer/:IntIntSpscRingBufferLongper slot: high 32 bits hold a 0/1 occupancy tag, low 32 bits hold theIntpayload. The whole 64-bit word is0Lwhen empty, so any tagged value is non-zero regardless of payload bits — allIntvalues are representable.LongLongSpscRingBufferLongper slot.Long.MinValueis reserved as the empty marker; offering it throwsIllegalArgumentException.FloatFloatSpscRingBufferLongper slot: high 32 bits tag, low 32 bits holdfloatToRawIntBits(value). AllFloatvalues representable.DoubleDoubleSpscRingBufferLongper slot.doubleToRawLongBits(value)is stored directly; a reserved non-canonical NaN bit pattern marks empty. Any payloadNaNis canonicalized toDouble.NaNon offer so it can never collide with the empty marker.All four use the same FastFlow algorithm as the generic
SpscRingBuffer(single backingArray[Long], padded indices, look-ahead cache) and expose identical operations (offer,take,peek,isEmpty,isFull,size,capacity,drain,fill).JMH benchmarks in
ringbuffer-benchmarks/PrimitiveSpscThroughputBenchmarkcover SPSC throughput for all four variants.Coordinator/worker queue primitives
For the multi-worker fan-out coordinator-to-worker channels, three blocking queue primitives in
streams/jvm/src/main/scala/zio/blocks/streams/queues/:BlockingSpscQueue[A]/BlockingMpscQueue[A]/BlockingMpmcQueue[A]— array-backed, fixed-capacity, parked-thread blocking variants tuned for the specific concurrency shape each reader uses.Correctness fixes uncovered during stabilization
Two real races were found and fixed before this PR:
Specialized mapPar lost-element race. The Int/Long/Float/Double mapPar readers use a separate control queue from the input queue (the generic reader uses a single combined queue). A worker could observe
DoneSentinelon the control queue before all prior input writes were visible and exit early, dropping the tail of the stream. Reproduced as a flake inConcurrentLawsSpec(mapPar(1)(f) ≡ map(f) as sets).Fix: a
stopSeenflag inworkerLoop. OnceDoneSentinelis observed, the loop defers exit untilinputQueues(idx)is actually drained, ensuring writes that happened-before the stop signal are picked up.isClosedreported true before consumer observed EOS. The previous predicateworkersRunning.get() == 0 && allOutputQueuesEmpty()could flip totruebetween the last producer write and the consumer's matchingread, racing the consumer's loop termination check (reproduced inIsClosedRegressionSpecmapPar: readable() is consistent with isClosed during consumption).Fix: track an
eofReturnedflag set on the read that actually returns the sentinel to the consumer;isClosedbecomeseofReturned || consumerClosed. The reader only signals "closed" once EOS has actually been delivered, not when it has merely become inevitable.Tests
streamsJVM/teststreamsJS/testringbufferJVM/testNew JVM specs added in this PR (selected):
ConcurrentMapParReaderSpec,ConcurrentMergeReaderSpec,ConcurrentStressSpec,ConcurrentLawsSpec,BufferSizeSpec,BufferStressSpec,BufferSpec,IsClosedRegressionSpec,MergeInnerErrorSpec,MapParSpec,MapParJvmTypeSpec,MergeAllSpec,FloatDoubleMapParSpec(lifts Float/Double mapPar coverage from 0% to ~87%),ReadNSpec/ReadNJvmSpec/ReadUpToNSpec,GenericFoldSpec,PlatformSpec,ReadBytesScopeAuditSpec,ReadBytesSoundnessSpec,BlockingMpmcQueueSpec,BlockingMpscQueueSpec,BlockingSpscQueueSpec,PrimitiveSpscRingBufferSpec.Coverage (Scala 2.13.18 streamsJVM): 56.84% statement / 52.05% branch overall; specialized concurrent readers ~87–90% statement. Ringbuffer: 98.86% statement / 96.61% branch.
Docs
docs/reference/streams/concurrent-operators.md— new reference page covering semantics, error behaviour, buffer sizing, examples formapPar/mergeAll/flatMapPar, and a feature comparison with fs2, Kyo, Ox, and Pekko.docs/reference/ringbuffer/spsc.mdx— new "Primitive variants" section documenting the four primitiveSpscRingBufferclasses, their slot encodings, theLong.MinValue/ NaN-canonicalization reservation rules, and thepeek+takeusage pattern.Verification
streamsJVM/test,streamsJS/test,ringbufferJVM/testall green (see table above).streams-benchmark,ringbufferBenchmarks,benchmarksprojects compile clean.docs/mdoccompiles clean (0 errors).fmtDirtyapplied; tree clean.