AsyncCache: striped insert buffer + R type parameter#61
Conversation
There was a problem hiding this comment.
Pull request overview
This PR targets hit-ratio and throughput regressions under contention by reworking how inserts/updates are buffered and drained, and by reducing contention in the get-frequency ring.
Changes:
- Replaces the old per-item insert buffering + semaphore gating with a new striped, per-thread
InsertStripeRingthat sendsVec<Item<_>>batches and addswait()/clear()stripe-drain preludes. - Updates the get-frequency
RingStripeto use 64 independent stripes and avoids holding stripe locks while sending to the policy. - Aligns sync/async policy frequency channels (sync policy switches to
unbounded()), and adds executor-agnostic cooperative yielding for async inserts.
Reviewed changes
Copilot reviewed 17 out of 19 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/semaphore.rs | Removes the old semaphore-based bounded permit pool implementation. |
| src/cache/insert_stripe.rs | Adds the new striped insert buffer implementation with bounded batch channel + overflow behavior. |
| src/cache/sync.rs | Migrates sync cache insert/remove/wait/clear to the striped insert ring and batch-based processor loop. |
| src/cache/async.rs | Migrates async cache to a std::thread processor + striped insert ring; adds yield_once() usage. |
| src/cache/builder.rs / src/cache.rs | Replaces set_buffer_size with set_insert_stripe_high_water and adds set_drain_interval. |
| src/ring.rs | Introduces striped buffering for get-frequency keys (RingStripe/AsyncRingStripe). |
| src/policy/sync.rs | Switches sync policy frequency channel to unbounded() to avoid dropping frequency batches. |
| src/lib.rs | Adjusts async re-exports and adds runtime-agnostic yield_once() future. |
| src/cache/test.rs | Updates tests to use wait() barriers and new builder knobs; adds stripe-drain regressions. |
| Cargo.toml / README.md / CHANGELOG.md / examples/* / benches/* | Updates docs/benchmarks/examples to match the new architecture and APIs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Item::Update { key, .. } => { | ||
| self.0.metrics.add(MetricType::DropSets, key, 1); | ||
| // Graceful leak: the eager store row already holds the | ||
| // new value; rolling it back here would surprise readers | ||
| // mid-flight. Policy stays stale; eventual eviction | ||
| // through TTL cleanup or a future admission will catch | ||
| // this row. | ||
| } |
| Item::Update { key, .. } => { | ||
| metrics.add(MetricType::DropSets, *key, 1); |
| // remove would strand a policy entry with no store row. The send is | ||
| // sync (crossbeam) and may briefly park the tokio worker under | ||
| // contention; that is the deliberate tradeoff vs dropping a Delete. | ||
| let _ = | ||
| self | ||
| .0 | ||
| .insert_buf_ring | ||
| .send_single(Item::delete(index, conflict, captured_gen, prev_version)); |
| let v = replace(&mut *data, Vec::with_capacity(self.capa)); | ||
| drop(data); | ||
| let _ = self.cons.push(v); |
There was a problem hiding this comment.
Pull request overview
This PR targets improved cache admission accuracy and throughput under contention by replacing the prior insert-buffer/semaphore approach with a striped, batched insert ring (sync + async), and aligning async behavior more closely with sync.
Changes:
- Introduces a striped
InsertStripeRingfor insert batching/draining and updates both sync/async cache processors to consumeVec<Item<V>>batches. - Reduces contention in get-frequency buffering via striped
RingStripeimplementation and adjusts sync policy frequency channel behavior. - Updates public-facing docs/examples/benches for the new builder API (
build) and adds runtime convenience aliases (TokioCache,SmolCache).
Reviewed changes
Copilot reviewed 20 out of 22 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/cache/insert_stripe.rs | Adds the new striped insert ring implementation with batching, draining, overflow handling, and tests. |
| src/cache/sync.rs | Migrates sync cache insert/remove/wait/clear paths to the striped insert ring and batch-based processor loop. |
| src/cache/async.rs | Migrates async cache to crossbeam-backed processor thread + striped insert ring; adjusts builder API and shutdown behavior. |
| src/cache/builder.rs | Replaces insert_buffer_size knob with insert_stripe_high_water + drain_interval and wires defaults. |
| src/cache.rs | Exposes new builder knobs and adds batch handler helper in the processor macro; includes new module. |
| src/ring.rs | Implements striped get-frequency buffering to reduce mutex contention on hot get paths. |
| src/policy/sync.rs | Switches sync policy frequency channel to unbounded() to avoid dropping get-frequency batches. |
| src/lib.rs | Removes semaphore module, trims async-channel re-exports, and adds TokioCache/SmolCache type aliases. |
| src/semaphore.rs | Removes the custom sync/async semaphore implementation and its tests. |
| README.md | Updates benchmark narrative/results and documents the new striped buffering approach and tuning knobs. |
| Cargo.toml | Enables crossbeam-channel for async, adds crossbeam-utils, and introduces a criterion bench target. |
| CHANGELOG.md | Documents v0.9.0 breaking changes and the new striped buffering architecture. |
| examples/.rs / benches/ | Updates examples and benches to use AsyncCacheBuilder::build and new cache aliases; adds new perf examples/benches. |
| .gitignore | Ignores docs/ directory. |
Comments suppressed due to low confidence (1)
src/cache/builder.rs:64
- The builder docs for
cleanup_durationstate “Default is 500ms”, butDEFAULT_CLEANUP_DURATIONis2s(and async docs mention 2s). Also, thedrain_intervaldoc says it “runs TTL cleanup” which is true for sync but not for async (async uses a separate cleanup ticker). Please update these field docs to match the actual defaults/behavior so users don’t tune based on incorrect information.
/// Cadence for the processor's tick arm, which drains every stripe
/// inline and runs TTL cleanup. Default `500ms`. Min `1ms`.
pub(crate) drain_interval: Duration,
/// `cleanup_duration` is the duration for internal store to cleanup expired entry.
///
/// Default is 500ms.
pub(crate) cleanup_duration: Duration,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // remove would strand a policy entry with no store row. The send is | ||
| // sync (crossbeam) and may briefly park the tokio worker under | ||
| // contention; that is the deliberate tradeoff vs dropping a Delete. | ||
| let _ = | ||
| self | ||
| .0 | ||
| .insert_buf_ring | ||
| .send_single(Item::delete(index, conflict, captured_gen, prev_version)); |
| Item::Update { key, .. } => { | ||
| self.0.metrics.add(MetricType::DropSets, key, 1); | ||
| // Graceful leak: the eager store row already holds the | ||
| // new value; rolling it back here would surprise readers | ||
| // mid-flight. Policy stays stale; eventual eviction |
| Item::Update { key, .. } => { | ||
| metrics.add(MetricType::DropSets, *key, 1); |
| // 3-slot channel filled instantly, starving TinyLFU of access signal | ||
| // and degrading admission decisions. The processor consumes batches | ||
| // with a single Mutex+counter increment, so it keeps up with realistic | ||
| // get rates and the channel does not grow without bound in practice. | ||
| let (items_tx, items_rx) = unbounded(); |
| let v = replace(&mut *data, Vec::with_capacity(self.capa)); | ||
| drop(data); | ||
| let _ = self.cons.push(v); |
| let processor = CacheProcessor::new( | ||
| 100000, | ||
| self.inner.ignore_internal_cost, | ||
| self.inner.cleanup_duration, | ||
| self.inner.drain_interval, | ||
| store.clone(), | ||
| policy.clone(), | ||
| buf_rx, | ||
| insert_buf.clone(), |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #61 +/- ##
===========================================
+ Coverage 84.96% 95.69% +10.72%
===========================================
Files 17 18 +1
Lines 1823 2184 +361
===========================================
+ Hits 1549 2090 +541
+ Misses 274 94 -180
... and 2 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR refactors AsyncCache to use the same striped insert buffer architecture as the sync Cache, and shifts async runtime selection to a struct-level type parameter (R: RuntimeLite) bound at build time. It also updates docs/changelog and adds benchmarking examples to validate performance and behavior.
Changes:
- Replace async semaphore + per-item async-channel insert path with
InsertStripeRingbatch buffering (plus wait/clear preludes that drain stripes before markers). - Make
AsyncCachegeneric overR: RuntimeLite(builder now binds runtime via.build::<R>(); addTokioCache/SmolCachealiases). - Add/refresh benchmarks and documentation (README + CHANGELOG) around the new buffering and performance characteristics.
Reviewed changes
Copilot reviewed 20 out of 22 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/semaphore.rs | Removes custom sync/async semaphore implementation (no longer needed with striped insert buffering). |
| src/ring.rs | Stripes the get-frequency ring buffer to reduce contention; updates push/flush behavior. |
| src/policy/sync.rs | Switches policy frequency channel from bounded to unbounded to avoid starving TinyLFU under contention. |
| src/lib.rs | Removes semaphore module; adjusts async re-exports; adds TokioCache / SmolCache type aliases. |
| src/cache/sync.rs | Migrates sync cache insert buffering to InsertStripeRing, barrier preludes, and batch receive path. |
| src/cache/insert_stripe.rs | Introduces InsertStripeRing implementation (striped storage + bounded batch channel + overflow policy). |
| src/cache/builder.rs | Replaces insert buffer size knob with insert_stripe_high_water and adds drain_interval builder knob. |
| src/cache/async.rs | Migrates async cache to crossbeam-backed processor thread + striped insert ring; makes cache generic over runtime R. |
| src/cache.rs | Updates builder macro API; adds batch handler helper; updates delete handler ordering; wires insert_stripe module. |
| examples/tokio.rs | Updates example to builder-based construction and TokioCache alias. |
| examples/sync_bench.rs | Adds sync benchmark example mirroring async workload patterns. |
| examples/smol.rs | Updates smol example to builder-based construction and SmolCache alias. |
| examples/insert_only_bench.rs | Adds insert-only benchmark comparing sync vs async. |
| examples/get_only_bench.rs | Adds get-only benchmark comparing sync vs async. |
| examples/async_bench.rs | Adds async benchmark reproducing mixed get/insert workload. |
| benches/ristretto-rs/src/main.rs | Updates downstream bench harness to new builder/runtime binding API. |
| benches/insert.rs | Adds Criterion microbench for insert throughput hot path. |
| README.md | Major benchmark narrative refresh + updates configuration docs for striped buffering. |
| Cargo.toml | Adds examples/bench targets, feature wiring for async crossbeam usage, and new dev-deps/lints config. |
| CHANGELOG.md | Adds 0.9.0 release notes including breaking changes and behavior details. |
| .gitignore | Adds docs/ ignore rule. |
Comments suppressed due to low confidence (1)
Cargo.toml:113
[lints] workspace = trueand[workspace.lints.rust]require this manifest to be a workspace root (i.e., include a[workspace]table). As-is, Cargo will error because the package is not declared as a workspace but references workspace-scoped lint configuration. Either add a[workspace]section (and move lint config there), or revert to per-package lint config (e.g.[lints.rust]).
[lints]
workspace = true
[workspace.lints.rust]
rust_2018_idioms = "warn"
single_use_lifetimes = "warn"
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(all_tests)',
'cfg(tarpaulin)',
] }
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // any thread hashed to this stripe. | ||
| let v = replace(&mut *data, Vec::with_capacity(self.capa)); | ||
| drop(data); | ||
| let _ = self.cons.push(v); |
There was a problem hiding this comment.
The premise here is stale: the policy frequency channel is no longer bounded. LFUPolicy switched to crossbeam_channel::unbounded() in b5420dd (the bounded(3) it replaced was starving TinyLFU under multi-thread contention — DropGets was firing on most pushes). With the unbounded channel, try_send can only fail on Disconnected, which happens when the cache is shutting down — and that's the only path where push returns Some(vec) today.
So under normal load push always returns None (no allocation to recycle), and on the shutdown path the cache is on its way out — reusing the vec there doesn't help because subsequent stripe pushes will hit Disconnected again immediately, and the policy worker has already exited. The let _ = is intentional given current architecture; happy to add a comment documenting that.
| } | ||
| let v = replace(&mut *data, Vec::with_capacity(self.capa)); | ||
| drop(data); | ||
| let _ = self.cons.push(v); |
There was a problem hiding this comment.
Same answer as the sync ring above — AsyncLFUPolicy::push also uses async_channel::unbounded() (src/policy/async.rs:38). Under normal load try_send succeeds and push returns None; the Some(vec) arm only fires when the channel is Disconnected (shutdown), where reuse can't help (next stripe push will also see Disconnected, policy worker is gone). Pushback: this isn't allocation churn in the current architecture.
Sync was driving both stripe drain and TTL cleanup off a single ticker keyed on `drain_interval`, leaving `cleanup_duration` (and `set_cleanup_duration`) silently unused. Async already uses two separate tickers; sync now does too: - `drain_interval` (default 500ms) drives stripe drain only - `cleanup_duration` (default 2s) drives TTL cleanup only TTL cleanup walks every expiration bucket and is heavier than a stripe drain, so splitting lets drain stay aggressive without paying the cleanup cost on every tick. Also fix a stale CHANGELOG reference (`axync::yield_once` → the `R::yield_now()` call site that landed in the async refactor). Reported by Copilot review on PR #61. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Item::Delete handler was checking contains_key BEFORE try_remove_if_version, so when a racing reinsert had eager-written a row, contains_key=true caused us to skip policy.remove — and then try_remove_if_version proceeded to wipe the row, stranding the policy entry as a ghost. Swap the order: try_remove first, then gate policy cleanup on the post-removal index state. Item::New PushOutcome::Dropped path tore down the eager store row but never reconciled policy. If a stale Item::Delete had just skipped policy.remove (because contains_key saw our about-to-be-rolled-back eager row), nothing else would clean policy after we removed the row. Apply the same contains_key gate as the Delete handler. update test(sync): make test_tick_drains_stripes_after_idle non-flaky on macOS CI A fixed 150ms sleep was too tight on a loaded macOS GitHub Actions runner, where the processor thread can be scheduled late enough for the tick arm to miss the first window. Replace the sleep with a poll-and-retry loop that waits up to 5 s for all 16 keys to land in policy. The test still exercises the time-based drain path: nothing else can drain the stripes in this configuration (no wait/clear/stop). docs: clarify read-heavy is not the same as Stretto-friendly Add a short note after the "Where Stretto fits" decision rule. The fit axis is access pattern (frequency skew + working-set vs cache size), not read/write ratio. Read-heavy with a wide, churning keyspace (DS1) is Stretto's worst case despite the favorable read mix; read-heavy with a tight hot set (OLTP, P1-P13) is its sweet spot. fix(sync): split TTL cleanup onto its own ticker (mirrors async) Sync was driving both stripe drain and TTL cleanup off a single ticker keyed on `drain_interval`, leaving `cleanup_duration` (and `set_cleanup_duration`) silently unused. Async already uses two separate tickers; sync now does too: - `drain_interval` (default 500ms) drives stripe drain only - `cleanup_duration` (default 2s) drives TTL cleanup only TTL cleanup walks every expiration bucket and is heavier than a stripe drain, so splitting lets drain stay aggressive without paying the cleanup cost on every tick. Also fix a stale CHANGELOG reference (`axync::yield_once` → the `R::yield_now()` call site that landed in the async refactor). Reported by Copilot review on PR #61.
Summary
InsertStripeRing(introduced for syncCachein 0.9.0: bound in-flight inserts, fix clear/insert/remove races, harden against panicking user callbacks #60) replaces the per-itemasync_channel::bounded(N)+AsyncSemaphoreadmission path. Dropssrc/semaphore.rs(~375 lines) and ~825 net lines fromsrc/cache/async.rs. Drop-on-overflow rollback mirrors sync'sPushOutcome::Droppedcontract; barrier preludes (wait/clear) drain stripe-buffered items before sending markers.R: RuntimeLite. Runtime moves from method-level (per-callRT::yield_now/RT::spawn_detach) to a struct-level type parameter at position 3, no default. Convenience aliasesTokioCache<K, V>andSmolCache<K, V>are feature-gated.AsyncCacheBuilder::build::<R>()(renamed fromfinalize) binds the runtime at construction.AsyncCache::new/new_with_key_builderconstructors removed in favor of the builder.try_removepropagatessend_singlefailures asCacheError::ChannelError(mirrors sync). Sync + asynctry_insert_inreturnsOk(true)for anItem::Updaterolled back byPushOutcome::Dropped— the eager store write already committed, soOk(false)was a contract violation.async_bench/sync_bench(cachebench S3-style get-or-insert),get_only_bench,insert_only_bench.Test plan
cargo test --no-default-features --features sync— cleancargo test --no-default-features --features async— clean (CI smoke-test config)cargo test --no-default-features --features sync,async— 164 tests passcargo test --features tokio,sync— 210 tests passcargo clippyclean across the same matrixcargo build --examples --features tokio,syncbuilds all benchmark examples🤖 Generated with Claude Code