mpmc_bounded_queue: add fetch-and-add blocking enqueue for FiberQueue#600
mpmc_bounded_queue: add fetch-and-add blocking enqueue for FiberQueue#600romange wants to merge 3 commits into
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds a three-step fetch-and-add (FAA) enqueue API ( ChangesMPMC FAA Enqueue API and FiberQueue Slot-based Add
Sequence Diagram(s)sequenceDiagram
participant ProducerFiber as Producer Fiber
participant FiberQueue
participant MPMC as mpmc_bounded_queue
participant ConsumerThread as Consumer Thread
ProducerFiber->>MPMC: claim_slot() → pos
MPMC-->>ProducerFiber: pos (atomic fetch_add)
ProducerFiber->>MPMC: slot_ready(pos)?
alt slot not ready (queue full)
ProducerFiber->>FiberQueue: blocked_submitters_++
ProducerFiber->>ProducerFiber: yield (ActiveFiberScheduler)
ProducerFiber->>FiberQueue: push_ec_.await(slot_ready(pos))
ProducerFiber->>FiberQueue: blocked_submitters_--
end
ProducerFiber->>MPMC: commit_slot(pos, callback)
ProducerFiber->>ConsumerThread: pull_ec_.notify()
ConsumerThread->>MPMC: try_dequeue → callback()
ConsumerThread->>FiberQueue: push_ec_.notify() (after dequeue)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a fetch-and-add (FAA) enqueue discipline to mpmc_bounded_queue to reduce producer contention, and integrates it into FiberQueue::Add along with new tests and benchmarks. The review feedback identifies a critical race condition between FiberQueue::Add and FiberQueue::Shutdown that can cause deadlocks, which can be fixed by checking for in-flight slots. Additionally, the reviewer points out a potential null pointer dereference when calling detail::FiberActive()->Yield() from non-fiber threads, and suggests optimizing claim_slot() by using std::memory_order_relaxed instead of std::memory_order_acq_rel.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
base/mpmc_bounded_queue.h (1)
110-133: ⚡ Quick winAdd
ABSL_MUST_USE_RESULTtoclaim_slot.Per coding guidelines, error-returning functions in headers should be marked with
ABSL_MUST_USE_RESULT. Whileclaim_slotdoesn't return an error, ignoring its return value is always a bug—the caller must passpostocommit_slot. Marking it prevents silent misuse.Suggested change
+#include <absl/base/attributes.h> + // Unconditionally claims a ticket. Never fails. The caller MUST eventually commit_slot(pos), // waiting for slot_ready(pos) first if it is not already free. - size_t claim_slot() { + ABSL_MUST_USE_RESULT size_t claim_slot() { return enqueue_pos_.fetch_add(1, std::memory_order_acq_rel); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@base/mpmc_bounded_queue.h` around lines 110 - 133, The claim_slot() method should be marked with ABSL_MUST_USE_RESULT since ignoring its return value is always a bug—the caller must pass the returned position to commit_slot(). Add the ABSL_MUST_USE_RESULT attribute before the return type of the claim_slot() method declaration to enforce that callers cannot accidentally ignore the position value it returns.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@util/fibers/fiberqueue_threadpool.h`:
- Around line 53-60: The call to detail::FiberActive()->Yield() in the
FiberQueue::Add method assumes an active fiber context, but since
FiberQueue::Add is publicly reachable and can be called from plain thread
contexts (such as via FiberQueueThreadPool::Add), this will crash or assert when
called from a non-fiber thread. Guard the Yield() call on line 56 by checking if
we are currently in a fiber context before invoking it. If not in a fiber
context, skip the yield and proceed directly to the push_ec_.await call, or
handle the non-fiber case appropriately to prevent crashes from plain thread
callers.
---
Nitpick comments:
In `@base/mpmc_bounded_queue.h`:
- Around line 110-133: The claim_slot() method should be marked with
ABSL_MUST_USE_RESULT since ignoring its return value is always a bug—the caller
must pass the returned position to commit_slot(). Add the ABSL_MUST_USE_RESULT
attribute before the return type of the claim_slot() method declaration to
enforce that callers cannot accidentally ignore the position value it returns.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: eece8877-7ba3-4ad9-b618-9ae3a0b9a764
📒 Files selected for processing (6)
base/mpmc_bounded_queue.hbase/mpmc_bounded_queue_test.ccutil/fibers/CMakeLists.txtutil/fibers/fiberqueue_threadpool.ccutil/fibers/fiberqueue_threadpool.hutil/fibers/fiberqueue_threadpool_test.cc
05c1238 to
5cddefd
Compare
Add claim_slot/slot_ready/commit_slot so a blocking caller can claim a slot with a single fetch_add instead of the CAS retry loop in try_enqueue, avoiding cache-line contention on enqueue_pos_ under many producers. The per-cell sequence protocol is shared, so the FAA and CAS disciplines coexist on the same queue. Rewrite FiberQueue::Add to claim a slot, yield once, then block on push_ec_ until the slot frees, and move the blocked_submitters_ contention gauge into FiberQueue. try_enqueue/TryAdd are kept for the non-blocking path (AddAnyWorker). Adds unit tests, a CAS-vs-FAA micro-benchmark, and a FiberQueue blocking-path test. Benchmark (BM_MPMCContention vs BM_MPMCContentionFAA, 8 producers / 1 consumer, q=1024, 2^20 ops/producer, identical workload): CAS (try_enqueue): 11.4 M items/s enq_fail/op = 734u FAA (claim_slot+commit): 32.4 M items/s enq_spin/op = 0.45 FAA is ~2.8x faster. enq_fail/op is ~0.0007, so the queue is almost never full -- the CAS path's cost is not full-queue backoff but the compare_exchange_weak loop on the shared enqueue_pos_ cache line: lost CAS races re-loop invisibly (the counter only ticks on a full queue), showing up purely as wall-clock. FAA replaces that with one unconditional fetch_add, so every producer makes progress on the first try. The only remaining wait is enq_spin/op = 0.45, a spin on slot_ready() that loads the producer's own (uncontended) cell while the consumer drains it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Roman Gershman <romange@gmail.com>
5cddefd to
e54af8e
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@util/fibers/fiberqueue_threadpool_test.cc`:
- Around line 32-33: The test assertion EXPECT_GT(preempts.load(), 0u) is flaky
because the consumer fiber starts before the producer fibers, causing
timing-sensitive behavior that can miss contention in CI. Restructure the test
to ensure producers start and execute before the consumer fiber (around line
32-33) to guarantee that contention occurs deterministically, eliminating the
race condition. Apply the same synchronization/ordering fix to the second
occurrence of this pattern (around lines 52-53).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 44f9d480-d259-4d15-ab6b-cfcaae124ac7
📒 Files selected for processing (6)
base/mpmc_bounded_queue.hbase/mpmc_bounded_queue_test.ccutil/fibers/CMakeLists.txtutil/fibers/fiberqueue_threadpool.ccutil/fibers/fiberqueue_threadpool.hutil/fibers/fiberqueue_threadpool_test.cc
✅ Files skipped from review due to trivial changes (1)
- util/fibers/fiberqueue_threadpool.cc
🚧 Files skipped from review as they are similar to previous changes (4)
- util/fibers/CMakeLists.txt
- base/mpmc_bounded_queue.h
- util/fibers/fiberqueue_threadpool.h
- base/mpmc_bounded_queue_test.cc
Address PR review and pull the callback-type improvement from #595. Shutdown race (lost callback + deadlock): a blocking Add() that has claimed a slot via claim_slot() but not yet committed it is invisible to try_dequeue_sc, so Run() could see an empty-looking queue, observe is_closed_, and exit while the producer was still parked between claim and commit -- stranding the callback and deadlocking any Await waiting on it. Add mpmc_bounded_queue::has_inflight_slots() and gate Run()'s exit on is_closed_ && !queue_.has_inflight_slots(), keeping the consumer parked until the producer commits and notifies pull_ec_. claim_slot() now uses memory_order_relaxed instead of acq_rel: enqueue_pos_ only hands out unique tickets and carries no data; publication is done by commit_slot's release store and observed by slot_ready/try_dequeue acquire loads. Matches try_enqueue. Replace the std::function<void()> callback (CbFunc) with Tasklet, a fixed-capacity fu2::function_base (capacity_fixed<32,8>, non-copyable, non-throwing). Callbacks up to 32 bytes are stored inline with no heap allocation. Ported from #595. Rejected the bots' FiberActive()->Yield() null-check: FiberActive() lazily constructs the thread-local main-context stub and is never null. Adds MPMCTest.HasInflightSlots. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| size_t pos = queue_.claim_slot(); | ||
|
|
||
| bool preempted = false; | ||
| if (!queue_.slot_ready(pos)) { | ||
| preempted = true; | ||
| ++blocked_submitters_; | ||
| detail::FiberActive()->Yield(); | ||
| if (!queue_.slot_ready(pos)) { | ||
| push_ec_.await([&] { return queue_.slot_ready(pos); }); | ||
| } | ||
| --blocked_submitters_; | ||
| } | ||
|
|
||
| bool result = false; | ||
| while (true) { | ||
| auto key = push_ec_.prepareWait(); | ||
| queue_.commit_slot(pos, std::forward<F>(f)); | ||
| pull_ec_.notify(); |
There was a problem hiding this comment.
the CAS was on enqueue_pos - now it is predetermined. However wouldn't having slot_ready + commit as a single CAS not benefit?
There was a problem hiding this comment.
Also the yield here is quite an interesting choice. First, it's quite a heavy time skip under load, second - with inter-thread coordination the timing is off either way
There was a problem hiding this comment.
also what I don't understand: if we now pre-assign the pos (like with a ticket system) and Yield, then we will stall the whole qeue, won't we? (I know same can happen is OS takes thread off in any atomic data structure, but this is one more point here)
There was a problem hiding this comment.
- CAS is heavier as it is a full blown transaction. Factually, you can look at the unit test and run it yourself locally and see yourself: it compares FAA and CAS. In addition it is not my invention. FAA class of algorithms are known to be more efficient that CAS due to their "blind" semantics. I am sure gemini can explain better than me why CAS is heavier across cpus.
- Regarding Yield - i am not sure . We can try a simple (thread) spinning too but my guess is that won't help. You need to understand the use-case of when a slot can be busy. It's not about producer contention. For example, for a queue say at capacity 16, suppose we claim cell 0 at
pos32 and its busy. It necessary means that the queue is full. it may be that tail have not even started consuming at all, i.e. cell 0 has pos 0.
but maybe it consumed cell 0 with pos 0, but then a producer claimed that cell with pos 16 but consumer has not consumed it (it is somewhere between cells [1, 15]. We may have even 100 producers waiting on the same queue cell 0 but with uniqueposslots : 32, 48, 64 etc. They will wait for a) consumer dequeuing from cell 0 and b) their turn (pos) to enqueue. Soclaim/ready/commitis not only for atomics contention - it is a transactional algorithm that also ensures fairness and order. - We do not stall the queue. Again, say we enqueue at
pos(our head) and it's busy it means the queue is full, we do not think we can claim a cell that is busy in a non-full queue.
At least this is how I understand the algorithm. What I really like about this is that we can implement these differrent semantics using the same data-structure. It is very elegant, imho
pull_ec_ is written by every producer (notify() after each commit); push_ec_ is written by the consumer (notifyAll() per drained batch) and by blocked producers. Adjacent in memory they share a cache line and ping-pong between producer- and consumer-side wakeup traffic. alignas(64) each onto its own line. is_closed_ is left unaligned: it is written once at Shutdown and only read on the consumer's park path (which already touches push_ec_), so it shares a line harmlessly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Adds a fetch-and-add (FAA) enqueue discipline to
mpmc_bounded_queuefor callers that are allowed to block, and rewritesFiberQueue::Addto use it.claim_slot/slot_ready/commit_slot— a blocking caller claims a ticket with a singlefetch_add(never fails, no retry), waits until its cell is free, then commits. This avoids the cache-line contention of thetry_enqueueCAS retry loop onenqueue_pos_under many producers.try_enqueue, so the FAA and CAS disciplines coexist on the same queue.FiberQueue::Addnow claims a slot, yields once, then blocks onpush_ec_until the slot frees; theblocked_submitters_contention gauge moves intoFiberQueue.try_enqueue/TryAddare kept for the non-blocking path (AddAnyWorker), which still needs "detect full and bail" semantics FAA can't provide.FiberQueueblocking-path test.Benchmark
BM_MPMCContentionvsBM_MPMCContentionFAA, 8 producers / 1 consumer, q=1024, 2²⁰ ops/producer, identical workload:try_enqueue)claim_slot+commit_slot)FAA is ~2.8× faster. Key finding:
enq_fail/opis ~0.0007, so the queue is almost never full — the CAS path's cost is not full-queue backoff but thecompare_exchange_weakloop on the sharedenqueue_pos_cache line. Lost CAS races re-loop invisibly (the counter only ticks on a full queue), showing up purely as wall-clock. FAA replaces that with one unconditionalfetch_add, so every producer makes progress on the first try. The only remaining wait is enq_spin/op = 0.45, a spin onslot_ready()that loads the producer's own (uncontended) cell while the consumer drains it.Algorithm lineage & prior art
This is not a new algorithm — it is a composition of two well-known designs, specialized for a fiber runtime. Credit where it is due:
mpmc_bounded_queuealready implements his per-cellsequenceprotocol (cellistarts atsequence = i; publish bumps it topos + 1; dequeue frees it topos + mask + 1). This PR reuses that protocol verbatim —slot_ready/commit_slotare just the existing publish steps with the CAS removed.MPMCQueue(github, writeup). The "claim a slot with a singlefetch_add, then wait on that slot's turn before writing" idea is exactly Rigtorp'spush:auto head = head_.fetch_add(1); … while (turn(head) != slot.turn) …. Rigtorp's own code also validates the CAS-vs-FAA split used here: his blockingpush/popusefetch_add, while his non-blockingtry_push/try_popusecompare_exchange— the same reason we keeptry_enqueueon CAS for the non-blockingAddAnyWorkerpath.fetch_addto claim a cell as the way to beat CAS-loop contention.How this differs from Rigtorp's implementation
push/popbusy-spin on the slot turn (while (…) {}, no sleep). On a fiber runtime that pegs a core and stalls the whole proactor thread. Here the primitives are fiber-agnostic (pure atomics, noEventCount), andFiberQueue::Addowns the waiting — it yields once, then suspends onpush_ec_— so the proactor keeps running other fibers while a producer waits on a full queue.popdoestail_.fetch_add(multi-consumer). We pair the FAA enqueue with the existing single-consumertry_dequeue_scand a batch drain that wakes blocked producers, with no consumer-side atomic arbitration.sequencealready provides the per-slot "turn", we extend the queue we already ship rather than pulling in a third-party header. Rigtorp is the reference; Vyukov's queue is the implementation.🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes