fix: avoid bthread mutex waits in shard completion paths#492
fix: avoid bthread mutex waits in shard completion paths#492starrysky9959 wants to merge 2 commits into
Conversation
(cherry picked from commit 4d8d001)
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughThis PR replaces bthread mutex+condition-variable completion coordination with std::atomic counters and exponential-backoff polling across WaitableCc, ActiveTxMaxTsCc, and DbSizeCc; RemoteDbSizeCc's asserts and finalization were updated to use the new atomic helpers. ChangesAtomic completion tracking and polling
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR cherry-picks a fix to avoid blocking on bthread::Mutex/bthread::ConditionVariable in shard completion paths by switching to atomic counters and polling-based waits, aligning behavior with an already-merged upstream fix.
Changes:
- Replaced mutex/CV-based completion tracking with atomics in
DbSizeCc,ActiveTxMaxTsCc, andWaitableCc. - Updated wait paths to use
bthread_usleep()with exponential backoff instead of condition-variable waits. - Updated
RemoteDbSizeCccompletion logic to use the new atomic ref-count helpers.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| tx_service/src/remote/remote_cc_request.cpp | Uses atomic ref counting (OnLocalRefFinished) in RemoteDbSizeCc completion path. |
| tx_service/include/cc/cc_request.h | Converts ActiveTxMaxTsCc and DbSizeCc completion/wait logic from mutex/CV to atomics + polling. |
| tx_service/include/cc/cc_req_misc.h | Converts WaitableCc wait/finish signaling from mutex/CV to atomics + polling/yield-resume coordination. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| waiting_.store(true, std::memory_order_release); | ||
| lk.unlock(); | ||
| if (unfinished_cnt_.load(std::memory_order_acquire) == 0) | ||
| { | ||
| waiting_.store(false, std::memory_order_release); | ||
| break; |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tx_service/include/cc/cc_req_misc.h`:
- Around line 908-919: The loop leaves waiting_ true after (*yield_fn)()
returns, allowing a concurrent completion to call resume_fn_ while the waiter is
no longer suspended; change the loop so waiting_ is cleared immediately after
the yield returns (i.e., store(false, std::memory_order_release) right after
(*yield_fn)() inside the loop), keeping the existing early-exit check (if
unfinished_cnt_.load(...) == 0) that also clears waiting_, so waiting_ is only
true for the actual suspended window; references: waiting_, unfinished_cnt_,
(*yield_fn)(), AbortCcRequest/Execute and resume_fn_.
In `@tx_service/include/cc/cc_request.h`:
- Line 8464: Guard the unsigned atomic refcount decrements in
OnRemoteRefFinished and OnLocalRefFinished to avoid underflow when late DBSize
callbacks arrive: before calling fetch_sub(1) on the atomic counters used in
cc_request.h, read/check the current value and only decrement if it is > 0 (use
an atomic compare-and-swap loop or fetch-update API to do this atomically), and
ensure the functions return or no-op when the counter is already zero so they
cannot wrap to SIZE_MAX and corrupt completion state; apply the same
guarded-decrement pattern to the other similar sites referenced (the other calls
around the OnRemoteRefFinished/OnLocalRefFinished usages).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fefca9d7-239d-4377-b8e9-66f9850acf41
📒 Files selected for processing (3)
tx_service/include/cc/cc_req_misc.htx_service/include/cc/cc_request.htx_service/src/remote/remote_cc_request.cpp
| while (unfinished_cnt_.load(std::memory_order_acquire) > 0) | ||
| { | ||
| waiting_.store(true, std::memory_order_release); | ||
| lk.unlock(); | ||
| if (unfinished_cnt_.load(std::memory_order_acquire) == 0) | ||
| { | ||
| waiting_.store(false, std::memory_order_release); | ||
| break; | ||
| } | ||
| (*yield_fn)(); | ||
| lk.lock(); | ||
| waiting_.store(false, std::memory_order_release); | ||
| } | ||
| waiting_.store(false, std::memory_order_release); | ||
| } |
There was a problem hiding this comment.
waiting_ remains true outside the actual suspended window, so completion can issue a stale resume.
After (*yield_fn)() returns, waiting_ is still true until loop exit. If final completion happens in that runnable window, AbortCcRequest/Execute can call resume_fn_ even though the waiter is not suspended.
Suggested fix
while (unfinished_cnt_.load(std::memory_order_acquire) > 0)
{
waiting_.store(true, std::memory_order_release);
if (unfinished_cnt_.load(std::memory_order_acquire) == 0)
{
waiting_.store(false, std::memory_order_release);
break;
}
(*yield_fn)();
+ waiting_.store(false, std::memory_order_release);
}
waiting_.store(false, std::memory_order_release);Also applies to: 939-949, 957-965
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tx_service/include/cc/cc_req_misc.h` around lines 908 - 919, The loop leaves
waiting_ true after (*yield_fn)() returns, allowing a concurrent completion to
call resume_fn_ while the waiter is no longer suspended; change the loop so
waiting_ is cleared immediately after the yield returns (i.e., store(false,
std::memory_order_release) right after (*yield_fn)() inside the loop), keeping
the existing early-exit check (if unfinished_cnt_.load(...) == 0) that also
clears waiting_, so waiting_ is only true for the actual suspended window;
references: waiting_, unfinished_cnt_, (*yield_fn)(), AbortCcRequest/Execute and
resume_fn_.
| { | ||
| cv_.notify_one(); | ||
| } | ||
| OnRemoteRefFinished(); |
There was a problem hiding this comment.
Guard atomic ref counters against underflow on late DBSize callbacks.
OnRemoteRefFinished() / OnLocalRefFinished() unconditionally fetch_sub(1) on unsigned atomics. Given the known timeout race in the DBSize response flow (tx_service/src/remote/cc_stream_receiver.cpp TODO near DBSize timeout handling), late callbacks can arrive after counters were reset/cleared and wrap to SIZE_MAX, corrupting completion state.
🔧 Suggested fix
protected:
+ static bool TryDec(std::atomic_size_t &cnt, bool *is_last = nullptr)
+ {
+ size_t cur = cnt.load(std::memory_order_acquire);
+ while (cur > 0)
+ {
+ if (cnt.compare_exchange_weak(cur,
+ cur - 1,
+ std::memory_order_acq_rel,
+ std::memory_order_acquire))
+ {
+ if (is_last != nullptr)
+ {
+ *is_last = (cur == 1);
+ }
+ return true;
+ }
+ }
+ if (is_last != nullptr)
+ {
+ *is_last = false;
+ }
+ return false;
+ }
+
bool OnLocalRefFinished()
{
- return total_ref_cnt_.fetch_sub(1, std::memory_order_acq_rel) == 1;
+ bool is_last = false;
+ return TryDec(total_ref_cnt_, &is_last) && is_last;
}
bool OnRemoteRefFinished()
{
- remote_ref_cnt_.fetch_sub(1, std::memory_order_acq_rel);
- return total_ref_cnt_.fetch_sub(1, std::memory_order_acq_rel) == 1;
+ if (!TryDec(remote_ref_cnt_))
+ {
+ return false; // stale/duplicate callback
+ }
+ bool is_last = false;
+ return TryDec(total_ref_cnt_, &is_last) && is_last;
}Also applies to: 8481-8482, 8537-8546
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tx_service/include/cc/cc_request.h` at line 8464, Guard the unsigned atomic
refcount decrements in OnRemoteRefFinished and OnLocalRefFinished to avoid
underflow when late DBSize callbacks arrive: before calling fetch_sub(1) on the
atomic counters used in cc_request.h, read/check the current value and only
decrement if it is > 0 (use an atomic compare-and-swap loop or fetch-update API
to do this atomically), and ensure the functions return or no-op when the
counter is already zero so they cannot wrap to SIZE_MAX and corrupt completion
state; apply the same guarded-decrement pattern to the other similar sites
referenced (the other calls around the OnRemoteRefFinished/OnLocalRefFinished
usages).
| lk.unlock(); | ||
| if (unfinished_cnt_.load(std::memory_order_acquire) == 0) | ||
| { | ||
| waiting_.store(false, std::memory_order_release); |
There was a problem hiding this comment.
The waiter enters the loop after observing unfinished_cnt_ == 1 (W1) and sets waiting_ = true (W2). At this point the completer on the TxProcessor thread decrements the counter to zero (C1) and then exchanges waiting_ from true to false (C2) — since it observed true, it commits to calling resume_fn. Meanwhile, the waiter's re-check (W3) observes unfinished_cnt_ == 0 and takes the early-exit branch: it stores waiting_ = false (W4, which is now a no-op because C2 already cleared the flag) and breaks out of the loop without ever calling yield_fn (W5). Finally the completer invokes resume_fn() (C3), which unconditionally enqueues the coroutine's CoroCtx into resume_queue_.
▎
▎ The result is an orphan resume: a resume has been published with no matching yield to consume it. resume_fn is not a level-triggered notification like a condition variable — every enqueued entry will eventually be popped by the flush worker, which calls ctx->coro_.resume(). Since the coroutine never suspended (and may have already run to completion by then), the worker ends up resuming a running or finished boost::context continuation, which is undefined behavior.
▎
▎ Note the bug is timing-dependent: if W4 wins the race against C2 instead, the completer's exchange reads false, no resume is published, and everything works — the failure window is only the gap between W2 and W4.
| } | ||
|
|
||
| return false; | ||
| return OnLocalRefFinished(); |
There was a problem hiding this comment.
this should always return false? or DbSizeCc is recycled and reused by another caller.
OnLocalRefFinished();
return false;
Summary
Testing
Summary by CodeRabbit