feat(inkless): KC-72 Reconcile stale records after diskless switch#612
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a “consolidation reconciliation” step to prevent consolidating diskless fetchers from appending onto stale local log data that may exist above the high watermark after a classic→diskless switch.
Changes:
- Added
ConsolidationReconcilerto reconcile/truncate local logs (when needed) before starting consolidation fetchers on leader/follower transitions. - Introduced a per-partition “safe pruning floor” used to gate diskless-log pruning and avoid unsafe prune requests.
- Expanded/adjusted unit tests around consolidating diskless fetch behavior and pruning eligibility.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| core/src/main/scala/kafka/server/ReplicaManager.scala | Wires the new reconciler into leader/follower transitions before starting consolidation fetchers (and includes an unrelated FileMerger disablement). |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationReconciler.scala | New reconciliation logic to determine fetch start offset and optionally truncate local logs based on control-plane log info. |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidatedDisklessLogPruner.scala | Tightens prune eligibility by filtering switch-pending partitions and requiring a “safe prune offset” from Partition. |
| core/src/main/scala/kafka/cluster/Partition.scala | Adds safeConsolidationPruningFloor with getters/setters to gate pruning. |
| core/src/main/scala/io/aiven/inkless/consolidation/ReconciliationException.scala | Adds a reconciler-specific exception type. |
| core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala | Removes an outdated TODO comment. |
| core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | Adds/adjusts tests to ensure unified-log reads vs diskless path selection for consolidating diskless fetches. |
| core/src/test/scala/io/aiven/inkless/consolidation/ConsolidationReconcilerTest.scala | New unit tests for reconciliation/truncation decisions and failure handling. |
| core/src/test/scala/io/aiven/inkless/consolidation/ConsolidatedDisklessLogPrunerTest.scala | Updates tests for the new prune “safe floor” behavior and switch-pending filtering. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
30d1d15 to
82d0fa7
Compare
331715d to
5f47cc2
Compare
d09af83 to
51dadb3
Compare
| // LEO >= seal. This covers both the initial switch (LEO == seal, nothing consolidated | ||
| // yet) and resuming an already-progressed partition after a restart, leadership | ||
| // failover, or reassignment (the local log either kept its consolidated frontier or | ||
| // was rehydrated from tiered storage). In every case we resume from the current local | ||
| // LEO so we never re-consolidate or skip data the local log already holds. | ||
| // | ||
| // The prune floor is the higher of the seal and the current log start offset: | ||
| // - at first switch logStartOffset is still the classic prefix start, so the floor is | ||
| // the seal, which blocks pruning the diskless region until consolidation has tiered | ||
| // past the boundary; | ||
| // - on resume logStartOffset has advanced past the seal as consolidated segments were | ||
| // tiered and deleted, so it reflects real pruning progress. | ||
| val pruneFloor = math.max(seal, log.logStartOffset) | ||
| partition.maybeAdvanceConsolidationPruneFloor(pruneFloor) | ||
| ConsolidationStartState.Ready(log.logEndOffset) |
There was a problem hiding this comment.
This one is valid, but would require us to track epoch possibly or persist the consolidation frontier. Will defer to a follow-up PR.
58dfd23 to
a366d0d
Compare
a366d0d to
3d5ca60
Compare
|
Generated a document with AI that could serve as a companion for the review. TL;DRWhen a classic topic is switched to a diskless ("consolidating") topic, the leader seals its This PR closes three gaps in that handover:
The classic catch-up fetcher now also hands off to the consolidation fetcher once it reads past BackgroundThe seal lifecycle is Significant changes
|
| Seal | Decision |
|---|---|
-1 |
Born-diskless; Ready(initialFetchOffset), no floor. |
-2 (PENDING) |
Retry — switch not committed yet. |
>= 0, LEO < seal |
Retry — classic prefix not fully replicated locally yet. |
>= 0, LEO >= seal |
Ready(LEO) and set floor to max(seal, logStartOffset). |
< -2 |
Failed. |
The Ready(LEO) branch is the core resume logic: starting from the current LEO covers first
switch, restart/failover, and reassignment without re-consolidating or skipping local data.
Retry states are not self-rescheduling — they rely on a later metadata delta or the catch-up
hand-off. Consolidation fetches use the synthetic endpoint BrokerEndPoint(-1, "diskless", -1).
ReplicaManager.scala — truncation + wiring
maybeTruncateNewlySwitchedPartition(...)runs aftermakeLeader/makeFollowerand before
fetchers start. It acts only when the seal was just committed in this delta
(sealJustCommitted: previous registrationseal < 0, new>= 0):LEO > seal→ truncate to seal (drop uncommitted tail);LEO < sealand leader → warn only (a leader can't catch up from a peer; defensive);- storage error → mark offline.
- Consolidation-fetcher startup is delegated to the new
consolidationReconciler(replacing the
old inline peer-BrokerEndPointresolution). - New
startConsolidationFetchersForCaughtUpClassicPartitions(...)is the entry point for the
catch-up hand-off.
Partition.scala — prune floor
lastAppliedDisklessLogStartOffset: Long becomes safeConsolidationPruningFloor: Option[Long].
None makes "no boundary yet" explicit, so the pruner refuses to prune a switched partition until a
floor is set. The floor is monotonic and serves two roles: a gate (don't prune until remote
storage reaches it) and a progress tracker (advances as pruning proceeds).
ConsolidatedDisklessLogPruner.scala — gated pruning
PENDING partitions are skipped. Each request goes through safePruneOffset: born-diskless prunes
directly; switched partitions defer to the floor gate (highestRemoteOffset >= floor); successful
prunes advance the floor.
Fetcher hand-off
ReplicaFetcherThread gains shouldEvictFullySwitchedDisklessPartitions (default true); after
evicting a caught-up classic follower it triggers the consolidation fetcher.
ConsolidationFetcherThread overrides it to false so it keeps fetching the diskless tail instead
of self-evicting at the seal.
End-to-end (first switch, follower)
- Controller commits the seal.
makeFollower→ truncation drops any tail past the seal.- If
LEO < seal, a classic catch-up fetcher is scheduled. - On reading past the seal the classic fetcher self-evicts and hands off to consolidation.
- Reconciler sees
LEO >= seal→Ready(LEO)and sets the prune floor.
Restart/failover/reassignment skip steps 2–4: the seal was committed in a prior delta
(sealJustCommitted == false, no truncation), and the reconciler resumes from the current LEO.
Tests
ConsolidationReconcilerTest(new) — every reconcile state (first switch, failover resume,
consolidated-past-seal-not-tiered, reassignment rehydrate, born-diskless, PENDING/below-seal retry).ConsolidatedDisklessLogPrunerTest— born-consolidated prunes directly; switched partition omitted
when floor not reached; PENDING skipped.ReplicaFetcherThreadTest— hand-off is (not) called on (non-)eviction.ReplicaManagerTest— diskless endpoint wiring; PENDING→committed truncates (13→10); plus two
truncation-guard tests added in review: seal already committed in base image is not
re-truncated, and a leader below the committed seal stays online and untruncated.
jeqo
left a comment
There was a problem hiding this comment.
LGTM overall, solid PR. Just left some minor comments if you agree.
| // Defined after stateChangeLogger so the reconciler is constructed with a non-null logger. | ||
| private val consolidationReconciler: Option[ConsolidationReconciler] = | ||
| if (config.disklessRemoteStorageConsolidationEnabled) { | ||
| consolidationFetcherManager.flatMap(cfm => |
There was a problem hiding this comment.
Should we defensively validate consolidationFetchManager and consolidationMetrics are not none to catch any wiring bug?
There was a problem hiding this comment.
We had a similar thing when initializing consolidationFetcherManager itself (but then checking the fetch handler) and there we log a warning and return None. Do you think both could warrant an exception too instead of the warning log? Feels like a wiring bug shouldn't be passed down with just logging.
There was a problem hiding this comment.
Yes, failure should be safer here than just logging.
| } else if (log.logEndOffset < seal && partition.isLeader) { | ||
| stateChangeLogger.warn(s"Leader partition $tp has LEO ${log.logEndOffset} below " + | ||
| s"classic-to-diskless start offset $seal; cannot catch up from another replica") | ||
| } |
There was a problem hiding this comment.
this state seems worrying -- should we consider marking the partition offline or do any follow-up?
There was a problem hiding this comment.
Yea, I can increase this to error and mark it offline.
| if (log.logEndOffset > seal) { | ||
| stateChangeLogger.info(s"Truncating switched partition $tp from LEO ${log.logEndOffset} " + | ||
| s"to classic-to-diskless start offset $seal") | ||
| partition.truncateTo(seal, isFuture = false) |
There was a problem hiding this comment.
Not sure if here, but somewhere it has to be documented that classicToDisklessStartOffset means first diskless offset and not last classic offset, so truncation to this offset is good here.
There was a problem hiding this comment.
Well, we have a documentation in InitDisklessLogRequest.json but I'm not sure if you were thinking something closer 🙂.
I think a short comment can be made here too.
There was a problem hiding this comment.
Right, it is there already -- but shouldn't hurt having it here or maybe in PartitionRegistration
When a partition switches to diskless, a replica can still hold uncommitted records above the high watermark. If the switch is quick and consolidation is enabled, the consolidation fetchers would start appending on top of that stale tail. This commit implements the following: - Truncate a newly switched partition's local log down to the just-committed seal (classic-to-diskless start offset). This runs after makeLeader/makeFollower and before any fetcher starts, so catch-up and consolidation always initialize from the truncated LEO. - Add ConsolidationReconciler to decide when a consolidating partition joins the consolidation fetcher, covering both the initial switch and resume after failover, restart, or reassignment. ReplicaManager now delegates consolidation fetcher startup to it. - Hand off from the classic ReplicaFetcher to consolidation once a follower reaches the seal, via the fetcher's self-eviction path. - Gate pruning of switched diskless partitions behind a prune floor and skip partitions whose switch is still pending; born-consolidated partitions keep pruning directly. - Transform the per-partition diskless watermark into a single monotonic safeConsolidationPruningFloor (gate + progress tracker), replacing lastAppliedDisklessLogStartOffset.
3d5ca60 to
0b32be6
Compare
Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
2db4c0b to
2ada2ff
Compare
Some tests were incorrectly using a LEO that is < seal offset and therefore the most recent review change where we marked such partitions fenced, failed.
| val previousPartition = Option(delta.image().getTopic(topicId)).flatMap { topicImage => | ||
| Option(topicImage.partitions().get(tp.partition())) | ||
| } | ||
| val sealJustCommitted = previousPartition.exists(_.classicToDisklessStartOffset < 0) | ||
| if (!sealJustCommitted) return |
There was a problem hiding this comment.
This is unfortunately not enough to catch 100% of cases. There was a similar check for gating the init diskless log for switching, but then I've removed it for a more safe approach: #603
This is not exactly the same but it suffers from similar scenario, like this one:
Initial state:
partition is classic
broker A local log: LEO = 120
cluster HW / safe seal point = 100
records [100, 120) are uncommitted stale classic tail on A
Then:
broker A is down
controller/leader commits classicToDisklessStartOffset = 100
metadata now says the partition is switched
Later:
broker A restarts, on restart, the delta may be effectively: "empty image -> committed switched image"
it loads metadata image where seal = 100 is already present
truncation is skipped
or this other one:
1. Broker receives/applyDelta for PENDING -> seal=100.
2. makeLeader/makeFollower runs.
3. Before maybeTruncateNewlySwitchedPartition actually truncates LEO 120 -> 100, broker crashes.
4. On restart, metadata already contains seal=100.
5. sealJustCommitted is false.
6. truncation is skipped
Similar to the other comment by Copilot, we might need to track Leader Epochs in a more precise way and have a safe mechanism for truncation
There was a problem hiding this comment.
I propose therefore to tackle this into another PR, so I will not block this one.
There was a problem hiding this comment.
giuseppelillo
left a comment
There was a problem hiding this comment.
LGTM for now, we need to address a couple of issues about truncation but it's cleaner to do it in another PR.
Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
) * feat(inkless): KC-72 Reconcile stale records after diskless switch When a partition switches to diskless, a replica can still hold uncommitted records above the high watermark. If the switch is quick and consolidation is enabled, the consolidation fetchers would start appending on top of that stale tail. This commit implements the following: - Truncate a newly switched partition's local log down to the just-committed seal (classic-to-diskless start offset). This runs after makeLeader/makeFollower and before any fetcher starts, so catch-up and consolidation always initialize from the truncated LEO. - Add ConsolidationReconciler to decide when a consolidating partition joins the consolidation fetcher, covering both the initial switch and resume after failover, restart, or reassignment. ReplicaManager now delegates consolidation fetcher startup to it. - Hand off from the classic ReplicaFetcher to consolidation once a follower reaches the seal, via the fetcher's self-eviction path. - Gate pruning of switched diskless partitions behind a prune floor and skip partitions whose switch is still pending; born-consolidated partitions keep pruning directly. - Transform the per-partition diskless watermark into a single monotonic safeConsolidationPruningFloor (gate + progress tracker), replacing lastAppliedDisklessLogStartOffset. * Apply suggestions from code review Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io> * Address review comments * Fix failing tests Some tests were incorrectly using a LEO that is < seal offset and therefore the most recent review change where we marked such partitions fenced, failed. * Update core/src/main/scala/kafka/server/ReplicaManager.scala Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io> --------- Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
) * feat(inkless): KC-72 Reconcile stale records after diskless switch When a partition switches to diskless, a replica can still hold uncommitted records above the high watermark. If the switch is quick and consolidation is enabled, the consolidation fetchers would start appending on top of that stale tail. This commit implements the following: - Truncate a newly switched partition's local log down to the just-committed seal (classic-to-diskless start offset). This runs after makeLeader/makeFollower and before any fetcher starts, so catch-up and consolidation always initialize from the truncated LEO. - Add ConsolidationReconciler to decide when a consolidating partition joins the consolidation fetcher, covering both the initial switch and resume after failover, restart, or reassignment. ReplicaManager now delegates consolidation fetcher startup to it. - Hand off from the classic ReplicaFetcher to consolidation once a follower reaches the seal, via the fetcher's self-eviction path. - Gate pruning of switched diskless partitions behind a prune floor and skip partitions whose switch is still pending; born-consolidated partitions keep pruning directly. - Transform the per-partition diskless watermark into a single monotonic safeConsolidationPruningFloor (gate + progress tracker), replacing lastAppliedDisklessLogStartOffset. * Apply suggestions from code review Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io> * Address review comments * Fix failing tests Some tests were incorrectly using a LEO that is < seal offset and therefore the most recent review change where we marked such partitions fenced, failed. * Update core/src/main/scala/kafka/server/ReplicaManager.scala Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io> --------- Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
When a partition switches to diskless, a replica can still hold uncommitted records above the high watermark. If the switch is quick and consolidation is enabled, the consolidation fetchers would start appending on top of that stale tail. This commit implements the following: