Skip to content

fix: avoid bthread mutex waits in shard completion paths#492

Open
starrysky9959 wants to merge 2 commits into
eloqdata:mainfrom
starrysky9959:fix/avoid-bthread-mutex-waits-main
Open

fix: avoid bthread mutex waits in shard completion paths#492
starrysky9959 wants to merge 2 commits into
eloqdata:mainfrom
starrysky9959:fix/avoid-bthread-mutex-waits-main

Conversation

@starrysky9959

@starrysky9959 starrysky9959 commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • cherry-pick release branch fix onto
  • avoid waiting on bthread mutexes in shard completion paths
  • keep aligned with the fix already merged into

Testing

  • not run

Summary by CodeRabbit

  • Performance Improvements
    • Replaced heavy lock-based coordination with atomic counters and backoff polling, improving responsiveness and reducing lock contention in transaction processing.
  • Bug Fixes / Reliability
    • More robust completion and abort handling with atomic state updates and timeout-aware wait logic to reduce stalls and spurious waits.

Copilot AI review requested due to automatic review settings June 12, 2026 10:16
@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b13c9e3a-f1fe-4b8a-a9bd-3dfcc9f0c94c

📥 Commits

Reviewing files that changed from the base of the PR and between 6a151d2 and a8ec8e9.

📒 Files selected for processing (1)
  • tx_service/include/cc/cc_request.h
🚧 Files skipped from review as they are similar to previous changes (1)
  • tx_service/include/cc/cc_request.h

Walkthrough

This PR replaces bthread mutex+condition-variable completion coordination with std::atomic counters and exponential-backoff polling across WaitableCc, ActiveTxMaxTsCc, and DbSizeCc; RemoteDbSizeCc's asserts and finalization were updated to use the new atomic helpers.

Changes

Atomic completion tracking and polling

Layer / File(s) Summary
WaitableCc atomic conversion
tx_service/include/cc/cc_req_misc.h
Reset() now atomically stores unfinished_cnt_, error_code_, and waiting_. Wait() polls unfinished_cnt_ with exponential backoff. Wait(yield_fn,resume_fn) toggles waiting_ around yields and re-checks completion. AbortCcRequest and Execute decrement via fetch_sub; when count reaches zero they conditionally invoke resume_fn_ based on waiting_. Private state fields become std::atomic and bthread mutex/cv members are removed.
ActiveTxMaxTsCc atomic conversion
tx_service/include/cc/cc_request.h
Constructor initializes unfinish_cnt_ as std::atomic_size_t. Execute() uses atomic fetch_sub to decrement. Wait() polls unfinish_cnt_ until zero using exponential-backoff sleep loops, replacing condition-variable wait.
DbSizeCc dual-counter atomic conversion
tx_service/include/cc/cc_request.h
Reset() atomically stores total_ref_cnt_ and remote_ref_cnt_. Execute() returns OnLocalRefFinished(); AddRemoteObjSize() invokes OnRemoteRefFinished(). New helpers decrement via fetch_sub and detect overall completion. Clear() atomically zeros both counters. Wait() polls total_ref_cnt_ with backoff and a remaining time budget.
RemoteDbSizeCc atomic integration
tx_service/src/remote/remote_cc_request.cpp
Post-handler, Reset, and Execute asserts now read atomics via load(memory_order_acquire/relaxed). Execute completion delegates to OnLocalRefFinished() instead of direct decrements, aligning with the new atomic completion pattern.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • eloqdata/tx_service#470: touches Wait(yield_fn, resume_fn) and resume_fn_/waiting_ handling related to WaitableCc coroutine yield/resume logic.

Suggested reviewers

  • liunyl
  • githubzilla

Poem

🐰 From locks to atoms a rabbit hops light,

Counters that spin in the soft backoff night,
Fetch-sub whispers and waiting flags fade,
Polls that ease gently, no mutex parade,
A tiny hop forward — async delight.

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is largely incomplete relative to the template. Testing was not run, and there is no reference to issue numbers, RFC links, or confirmation that tests/documentation were added. Complete the checklist items: add tests for changes, document changes, reference issue number using 'fixes eloqdb/tx_service#issue_id', reference RFC if applicable, and confirm test suite passage or explain testing status.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The pull request title clearly and concisely describes the main change: replacing bthread mutex-based waiting with atomic-based polling in shard completion paths.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR cherry-picks a fix to avoid blocking on bthread::Mutex/bthread::ConditionVariable in shard completion paths by switching to atomic counters and polling-based waits, aligning behavior with an already-merged upstream fix.

Changes:

  • Replaced mutex/CV-based completion tracking with atomics in DbSizeCc, ActiveTxMaxTsCc, and WaitableCc.
  • Updated wait paths to use bthread_usleep() with exponential backoff instead of condition-variable waits.
  • Updated RemoteDbSizeCc completion logic to use the new atomic ref-count helpers.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
tx_service/src/remote/remote_cc_request.cpp Uses atomic ref counting (OnLocalRefFinished) in RemoteDbSizeCc completion path.
tx_service/include/cc/cc_request.h Converts ActiveTxMaxTsCc and DbSizeCc completion/wait logic from mutex/CV to atomics + polling.
tx_service/include/cc/cc_req_misc.h Converts WaitableCc wait/finish signaling from mutex/CV to atomics + polling/yield-resume coordination.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 910 to +914
waiting_.store(true, std::memory_order_release);
lk.unlock();
if (unfinished_cnt_.load(std::memory_order_acquire) == 0)
{
waiting_.store(false, std::memory_order_release);
break;
Comment thread tx_service/include/cc/cc_request.h Outdated
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tx_service/include/cc/cc_req_misc.h`:
- Around line 908-919: The loop leaves waiting_ true after (*yield_fn)()
returns, allowing a concurrent completion to call resume_fn_ while the waiter is
no longer suspended; change the loop so waiting_ is cleared immediately after
the yield returns (i.e., store(false, std::memory_order_release) right after
(*yield_fn)() inside the loop), keeping the existing early-exit check (if
unfinished_cnt_.load(...) == 0) that also clears waiting_, so waiting_ is only
true for the actual suspended window; references: waiting_, unfinished_cnt_,
(*yield_fn)(), AbortCcRequest/Execute and resume_fn_.

In `@tx_service/include/cc/cc_request.h`:
- Line 8464: Guard the unsigned atomic refcount decrements in
OnRemoteRefFinished and OnLocalRefFinished to avoid underflow when late DBSize
callbacks arrive: before calling fetch_sub(1) on the atomic counters used in
cc_request.h, read/check the current value and only decrement if it is > 0 (use
an atomic compare-and-swap loop or fetch-update API to do this atomically), and
ensure the functions return or no-op when the counter is already zero so they
cannot wrap to SIZE_MAX and corrupt completion state; apply the same
guarded-decrement pattern to the other similar sites referenced (the other calls
around the OnRemoteRefFinished/OnLocalRefFinished usages).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fefca9d7-239d-4377-b8e9-66f9850acf41

📥 Commits

Reviewing files that changed from the base of the PR and between f62f0fa and 6a151d2.

📒 Files selected for processing (3)
  • tx_service/include/cc/cc_req_misc.h
  • tx_service/include/cc/cc_request.h
  • tx_service/src/remote/remote_cc_request.cpp

Comment on lines +908 to 919
while (unfinished_cnt_.load(std::memory_order_acquire) > 0)
{
waiting_.store(true, std::memory_order_release);
lk.unlock();
if (unfinished_cnt_.load(std::memory_order_acquire) == 0)
{
waiting_.store(false, std::memory_order_release);
break;
}
(*yield_fn)();
lk.lock();
waiting_.store(false, std::memory_order_release);
}
waiting_.store(false, std::memory_order_release);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

waiting_ remains true outside the actual suspended window, so completion can issue a stale resume.

After (*yield_fn)() returns, waiting_ is still true until loop exit. If final completion happens in that runnable window, AbortCcRequest/Execute can call resume_fn_ even though the waiter is not suspended.

Suggested fix
 while (unfinished_cnt_.load(std::memory_order_acquire) > 0)
 {
     waiting_.store(true, std::memory_order_release);
     if (unfinished_cnt_.load(std::memory_order_acquire) == 0)
     {
         waiting_.store(false, std::memory_order_release);
         break;
     }
     (*yield_fn)();
+    waiting_.store(false, std::memory_order_release);
 }
 waiting_.store(false, std::memory_order_release);

Also applies to: 939-949, 957-965

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tx_service/include/cc/cc_req_misc.h` around lines 908 - 919, The loop leaves
waiting_ true after (*yield_fn)() returns, allowing a concurrent completion to
call resume_fn_ while the waiter is no longer suspended; change the loop so
waiting_ is cleared immediately after the yield returns (i.e., store(false,
std::memory_order_release) right after (*yield_fn)() inside the loop), keeping
the existing early-exit check (if unfinished_cnt_.load(...) == 0) that also
clears waiting_, so waiting_ is only true for the actual suspended window;
references: waiting_, unfinished_cnt_, (*yield_fn)(), AbortCcRequest/Execute and
resume_fn_.

{
cv_.notify_one();
}
OnRemoteRefFinished();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard atomic ref counters against underflow on late DBSize callbacks.

OnRemoteRefFinished() / OnLocalRefFinished() unconditionally fetch_sub(1) on unsigned atomics. Given the known timeout race in the DBSize response flow (tx_service/src/remote/cc_stream_receiver.cpp TODO near DBSize timeout handling), late callbacks can arrive after counters were reset/cleared and wrap to SIZE_MAX, corrupting completion state.

🔧 Suggested fix
 protected:
+    static bool TryDec(std::atomic_size_t &cnt, bool *is_last = nullptr)
+    {
+        size_t cur = cnt.load(std::memory_order_acquire);
+        while (cur > 0)
+        {
+            if (cnt.compare_exchange_weak(cur,
+                                          cur - 1,
+                                          std::memory_order_acq_rel,
+                                          std::memory_order_acquire))
+            {
+                if (is_last != nullptr)
+                {
+                    *is_last = (cur == 1);
+                }
+                return true;
+            }
+        }
+        if (is_last != nullptr)
+        {
+            *is_last = false;
+        }
+        return false;
+    }
+
     bool OnLocalRefFinished()
     {
-        return total_ref_cnt_.fetch_sub(1, std::memory_order_acq_rel) == 1;
+        bool is_last = false;
+        return TryDec(total_ref_cnt_, &is_last) && is_last;
     }

     bool OnRemoteRefFinished()
     {
-        remote_ref_cnt_.fetch_sub(1, std::memory_order_acq_rel);
-        return total_ref_cnt_.fetch_sub(1, std::memory_order_acq_rel) == 1;
+        if (!TryDec(remote_ref_cnt_))
+        {
+            return false;  // stale/duplicate callback
+        }
+        bool is_last = false;
+        return TryDec(total_ref_cnt_, &is_last) && is_last;
     }

Also applies to: 8481-8482, 8537-8546

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tx_service/include/cc/cc_request.h` at line 8464, Guard the unsigned atomic
refcount decrements in OnRemoteRefFinished and OnLocalRefFinished to avoid
underflow when late DBSize callbacks arrive: before calling fetch_sub(1) on the
atomic counters used in cc_request.h, read/check the current value and only
decrement if it is > 0 (use an atomic compare-and-swap loop or fetch-update API
to do this atomically), and ensure the functions return or no-op when the
counter is already zero so they cannot wrap to SIZE_MAX and corrupt completion
state; apply the same guarded-decrement pattern to the other similar sites
referenced (the other calls around the OnRemoteRefFinished/OnLocalRefFinished
usages).

lk.unlock();
if (unfinished_cnt_.load(std::memory_order_acquire) == 0)
{
waiting_.store(false, std::memory_order_release);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The waiter enters the loop after observing unfinished_cnt_ == 1 (W1) and sets waiting_ = true (W2). At this point the completer on the TxProcessor thread decrements the counter to zero (C1) and then exchanges waiting_ from true to false (C2) — since it observed true, it commits to calling resume_fn. Meanwhile, the waiter's re-check (W3) observes unfinished_cnt_ == 0 and takes the early-exit branch: it stores waiting_ = false (W4, which is now a no-op because C2 already cleared the flag) and breaks out of the loop without ever calling yield_fn (W5). Finally the completer invokes resume_fn() (C3), which unconditionally enqueues the coroutine's CoroCtx into resume_queue_.

▎ The result is an orphan resume: a resume has been published with no matching yield to consume it. resume_fn is not a level-triggered notification like a condition variable — every enqueued entry will eventually be popped by the flush worker, which calls ctx->coro_.resume(). Since the coroutine never suspended (and may have already run to completion by then), the worker ends up resuming a running or finished boost::context continuation, which is undefined behavior.

▎ Note the bug is timing-dependent: if W4 wins the race against C2 instead, the completer's exchange reads false, no resume is published, and everything works — the failure window is only the gap between W2 and W4.

}

return false;
return OnLocalRefFinished();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should always return false? or DbSizeCc is recycled and reused by another caller.

OnLocalRefFinished();
return false;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants