Skip to content

feat(inkless): KC-72 Reconcile stale records after diskless switch#612

Merged
giuseppelillo merged 5 commits into
mainfrom
svv/ts-unification-reconcile
Jun 3, 2026
Merged

feat(inkless): KC-72 Reconcile stale records after diskless switch#612
giuseppelillo merged 5 commits into
mainfrom
svv/ts-unification-reconcile

Conversation

@viktorsomogyi

@viktorsomogyi viktorsomogyi commented May 26, 2026

Copy link
Copy Markdown
Contributor

When a partition switches to diskless, a replica can still hold uncommitted records above the high watermark. If the switch is quick and consolidation is enabled, the consolidation fetchers would start appending on top of that stale tail. This commit implements the following:

  • Truncate a newly switched partition's local log down to the just-committed seal (classic-to-diskless start offset). This runs after makeLeader/makeFollower and before any fetcher starts, so catch-up and consolidation always initialize from the truncated LEO.
  • Add ConsolidationReconciler to decide when a consolidating partition joins the consolidation fetcher, covering both the initial switch and resume after failover, restart, or reassignment. ReplicaManager now delegates consolidation fetcher startup to it.
  • Hand off from the classic ReplicaFetcher to consolidation once a follower reaches the seal, via the fetcher's self-eviction path.
  • Gate pruning of switched diskless partitions behind a prune floor and skip partitions whose switch is still pending; born-consolidated partitions keep pruning directly.
  • Transform the per-partition diskless watermark into a single monotonic safeConsolidationPruningFloor (gate + progress tracker), replacing lastAppliedDisklessLogStartOffset.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a “consolidation reconciliation” step to prevent consolidating diskless fetchers from appending onto stale local log data that may exist above the high watermark after a classic→diskless switch.

Changes:

  • Added ConsolidationReconciler to reconcile/truncate local logs (when needed) before starting consolidation fetchers on leader/follower transitions.
  • Introduced a per-partition “safe pruning floor” used to gate diskless-log pruning and avoid unsafe prune requests.
  • Expanded/adjusted unit tests around consolidating diskless fetch behavior and pruning eligibility.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
core/src/main/scala/kafka/server/ReplicaManager.scala Wires the new reconciler into leader/follower transitions before starting consolidation fetchers (and includes an unrelated FileMerger disablement).
core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationReconciler.scala New reconciliation logic to determine fetch start offset and optionally truncate local logs based on control-plane log info.
core/src/main/scala/io/aiven/inkless/consolidation/ConsolidatedDisklessLogPruner.scala Tightens prune eligibility by filtering switch-pending partitions and requiring a “safe prune offset” from Partition.
core/src/main/scala/kafka/cluster/Partition.scala Adds safeConsolidationPruningFloor with getters/setters to gate pruning.
core/src/main/scala/io/aiven/inkless/consolidation/ReconciliationException.scala Adds a reconciler-specific exception type.
core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala Removes an outdated TODO comment.
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala Adds/adjusts tests to ensure unified-log reads vs diskless path selection for consolidating diskless fetches.
core/src/test/scala/io/aiven/inkless/consolidation/ConsolidationReconcilerTest.scala New unit tests for reconciliation/truncation decisions and failure handling.
core/src/test/scala/io/aiven/inkless/consolidation/ConsolidatedDisklessLogPrunerTest.scala Updates tests for the new prune “safe floor” behavior and switch-pending filtering.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationReconciler.scala Outdated
Comment thread core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationReconciler.scala Outdated
Comment thread core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationReconciler.scala Outdated
Comment thread core/src/main/scala/io/aiven/inkless/consolidation/ReconciliationException.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-reconcile branch from 30d1d15 to 82d0fa7 Compare May 26, 2026 14:01
@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-delete-tiered-diskless branch from 331715d to 5f47cc2 Compare May 27, 2026 14:57
Base automatically changed from svv/ts-unification-delete-tiered-diskless to main May 27, 2026 19:20
@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-reconcile branch 2 times, most recently from d09af83 to 51dadb3 Compare May 29, 2026 14:04
@viktorsomogyi viktorsomogyi changed the title feat(inkless): KC-72 Reconcile stale commits in old follower feat(inkless): KC-72 Reconcile stale records after diskless switch May 29, 2026
@viktorsomogyi viktorsomogyi requested a review from Copilot May 29, 2026 14:05

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment on lines +117 to +131
// LEO >= seal. This covers both the initial switch (LEO == seal, nothing consolidated
// yet) and resuming an already-progressed partition after a restart, leadership
// failover, or reassignment (the local log either kept its consolidated frontier or
// was rehydrated from tiered storage). In every case we resume from the current local
// LEO so we never re-consolidate or skip data the local log already holds.
//
// The prune floor is the higher of the seal and the current log start offset:
// - at first switch logStartOffset is still the classic prefix start, so the floor is
// the seal, which blocks pruning the diskless region until consolidation has tiered
// past the boundary;
// - on resume logStartOffset has advanced past the seal as consolidated segments were
// tiered and deleted, so it reflects real pruning progress.
val pruneFloor = math.max(seal, log.logStartOffset)
partition.maybeAdvanceConsolidationPruneFloor(pruneFloor)
ConsolidationStartState.Ready(log.logEndOffset)

@viktorsomogyi viktorsomogyi May 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is valid, but would require us to track epoch possibly or persist the consolidation frontier. Will defer to a follow-up PR.

@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-reconcile branch 2 times, most recently from 58dfd23 to a366d0d Compare May 29, 2026 16:06
@viktorsomogyi viktorsomogyi marked this pull request as ready for review May 29, 2026 16:06
@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-reconcile branch from a366d0d to 3d5ca60 Compare June 1, 2026 07:49
@viktorsomogyi

viktorsomogyi commented Jun 1, 2026

Copy link
Copy Markdown
Contributor Author

Generated a document with AI that could serve as a companion for the review.

TL;DR

When a classic topic is switched to a diskless ("consolidating") topic, the leader seals its
local log at a committed offset (the seal, classicToDisklessStartOffset). After that, the
diskless control plane owns the log and consolidation fetchers continue it from the seal onward.

This PR closes three gaps in that handover:

  1. Stale tail records — a replica may hold uncommitted records beyond the committed seal.
    They are now truncated before any fetcher starts, so consolidation begins from a clean boundary.
  2. Resuming consolidation correctly — a new ConsolidationReconciler decides where consolidation
    fetching should (re)start across first-switch, restart, failover and reassignment.
  3. Safe pruning — diskless pruning is gated behind a monotonic prune floor so the consolidated
    region is never pruned before tiered (remote) storage has caught up to the seal.

The classic catch-up fetcher now also hands off to the consolidation fetcher once it reads past
the seal.

Background

The seal lifecycle is -1 (NO_CLASSIC_TO_DISKLESS_START_OFFSET, never switched) → -2
(CLASSIC_TO_DISKLESS_SWITCH_PENDING) → committed (seal >= 0). The broker driving the switch
seals at its own HW == LEO and is frozen before it can append further.

Significant changes

ConsolidationReconciler.scala (new) — where consolidation resumes

Decides the start offset and initial prune floor per partition, via a small state machine keyed on
the seal:

Seal Decision
-1 Born-diskless; Ready(initialFetchOffset), no floor.
-2 (PENDING) Retry — switch not committed yet.
>= 0, LEO < seal Retry — classic prefix not fully replicated locally yet.
>= 0, LEO >= seal Ready(LEO) and set floor to max(seal, logStartOffset).
< -2 Failed.

The Ready(LEO) branch is the core resume logic: starting from the current LEO covers first
switch, restart/failover, and reassignment without re-consolidating or skipping local data.
Retry states are not self-rescheduling — they rely on a later metadata delta or the catch-up
hand-off. Consolidation fetches use the synthetic endpoint BrokerEndPoint(-1, "diskless", -1).

ReplicaManager.scala — truncation + wiring

  • maybeTruncateNewlySwitchedPartition(...) runs after makeLeader/makeFollower and before
    fetchers start. It acts only when the seal was just committed in this delta
    (sealJustCommitted: previous registration seal < 0, new >= 0):
    • LEO > seal → truncate to seal (drop uncommitted tail);
    • LEO < seal and leader → warn only (a leader can't catch up from a peer; defensive);
    • storage error → mark offline.
  • Consolidation-fetcher startup is delegated to the new consolidationReconciler (replacing the
    old inline peer-BrokerEndPoint resolution).
  • New startConsolidationFetchersForCaughtUpClassicPartitions(...) is the entry point for the
    catch-up hand-off.

Partition.scala — prune floor

lastAppliedDisklessLogStartOffset: Long becomes safeConsolidationPruningFloor: Option[Long].
None makes "no boundary yet" explicit, so the pruner refuses to prune a switched partition until a
floor is set. The floor is monotonic and serves two roles: a gate (don't prune until remote
storage reaches it) and a progress tracker (advances as pruning proceeds).

ConsolidatedDisklessLogPruner.scala — gated pruning

PENDING partitions are skipped. Each request goes through safePruneOffset: born-diskless prunes
directly; switched partitions defer to the floor gate (highestRemoteOffset >= floor); successful
prunes advance the floor.

Fetcher hand-off

ReplicaFetcherThread gains shouldEvictFullySwitchedDisklessPartitions (default true); after
evicting a caught-up classic follower it triggers the consolidation fetcher.
ConsolidationFetcherThread overrides it to false so it keeps fetching the diskless tail instead
of self-evicting at the seal.

End-to-end (first switch, follower)

  1. Controller commits the seal.
  2. makeFollower → truncation drops any tail past the seal.
  3. If LEO < seal, a classic catch-up fetcher is scheduled.
  4. On reading past the seal the classic fetcher self-evicts and hands off to consolidation.
  5. Reconciler sees LEO >= sealReady(LEO) and sets the prune floor.

Restart/failover/reassignment skip steps 2–4: the seal was committed in a prior delta
(sealJustCommitted == false, no truncation), and the reconciler resumes from the current LEO.

Tests

  • ConsolidationReconcilerTest (new) — every reconcile state (first switch, failover resume,
    consolidated-past-seal-not-tiered, reassignment rehydrate, born-diskless, PENDING/below-seal retry).
  • ConsolidatedDisklessLogPrunerTest — born-consolidated prunes directly; switched partition omitted
    when floor not reached; PENDING skipped.
  • ReplicaFetcherThreadTest — hand-off is (not) called on (non-)eviction.
  • ReplicaManagerTest — diskless endpoint wiring; PENDING→committed truncates (13→10); plus two
    truncation-guard tests added in review: seal already committed in base image is not
    re-truncated, and a leader below the committed seal stays online and untruncated.

@jeqo jeqo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, solid PR. Just left some minor comments if you agree.

Comment thread core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Comment thread core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationReconciler.scala Outdated
// Defined after stateChangeLogger so the reconciler is constructed with a non-null logger.
private val consolidationReconciler: Option[ConsolidationReconciler] =
if (config.disklessRemoteStorageConsolidationEnabled) {
consolidationFetcherManager.flatMap(cfm =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we defensively validate consolidationFetchManager and consolidationMetrics are not none to catch any wiring bug?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a similar thing when initializing consolidationFetcherManager itself (but then checking the fetch handler) and there we log a warning and return None. Do you think both could warrant an exception too instead of the warning log? Feels like a wiring bug shouldn't be passed down with just logging.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, failure should be safer here than just logging.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment on lines +3119 to +3122
} else if (log.logEndOffset < seal && partition.isLeader) {
stateChangeLogger.warn(s"Leader partition $tp has LEO ${log.logEndOffset} below " +
s"classic-to-diskless start offset $seal; cannot catch up from another replica")
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this state seems worrying -- should we consider marking the partition offline or do any follow-up?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I can increase this to error and mark it offline.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
if (log.logEndOffset > seal) {
stateChangeLogger.info(s"Truncating switched partition $tp from LEO ${log.logEndOffset} " +
s"to classic-to-diskless start offset $seal")
partition.truncateTo(seal, isFuture = false)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if here, but somewhere it has to be documented that classicToDisklessStartOffset means first diskless offset and not last classic offset, so truncation to this offset is good here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we have a documentation in InitDisklessLogRequest.json but I'm not sure if you were thinking something closer 🙂.
I think a short comment can be made here too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it is there already -- but shouldn't hurt having it here or maybe in PartitionRegistration

When a partition switches to diskless, a replica can still hold
uncommitted records above the high watermark. If the switch is quick and
consolidation is enabled, the consolidation fetchers would start
appending on top of that stale tail. This commit implements the following:

- Truncate a newly switched partition's local log down to the
  just-committed seal (classic-to-diskless start offset). This runs after
  makeLeader/makeFollower and before any fetcher starts, so catch-up and
  consolidation always initialize from the truncated LEO.
- Add ConsolidationReconciler to decide when a consolidating partition
  joins the consolidation fetcher, covering both the initial switch and
  resume after failover, restart, or reassignment. ReplicaManager now
  delegates consolidation fetcher startup to it.
- Hand off from the classic ReplicaFetcher to consolidation once a
  follower reaches the seal, via the fetcher's self-eviction path.
- Gate pruning of switched diskless partitions behind a prune floor and
  skip partitions whose switch is still pending; born-consolidated
  partitions keep pruning directly.
- Transform the per-partition diskless watermark into a single monotonic
  safeConsolidationPruningFloor (gate + progress tracker), replacing
  lastAppliedDisklessLogStartOffset.
@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-reconcile branch from 3d5ca60 to 0b32be6 Compare June 1, 2026 13:27
Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-reconcile branch from 2db4c0b to 2ada2ff Compare June 1, 2026 13:46
Some tests were incorrectly using a LEO that is < seal offset and
therefore the most recent review change where we marked such partitions
fenced, failed.

@jeqo jeqo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the suggestions! LGTM, just a minor comment suggestion -- should be good to approve from my side.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment on lines +3100 to +3104
val previousPartition = Option(delta.image().getTopic(topicId)).flatMap { topicImage =>
Option(topicImage.partitions().get(tp.partition()))
}
val sealJustCommitted = previousPartition.exists(_.classicToDisklessStartOffset < 0)
if (!sealJustCommitted) return

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunately not enough to catch 100% of cases. There was a similar check for gating the init diskless log for switching, but then I've removed it for a more safe approach: #603
This is not exactly the same but it suffers from similar scenario, like this one:

Initial state:
  partition is classic
  broker A local log: LEO = 120
  cluster HW / safe seal point =  100
  records [100, 120) are uncommitted stale classic tail on A
Then:
  broker A is down
  controller/leader commits classicToDisklessStartOffset = 100
  metadata now says the partition is switched
Later:
  broker A restarts, on restart, the delta may be effectively: "empty image -> committed switched image"
  it loads metadata image where seal = 100 is already present
  truncation is skipped

or this other one:

1. Broker receives/applyDelta for PENDING -> seal=100.
2. makeLeader/makeFollower runs.
3. Before maybeTruncateNewlySwitchedPartition actually truncates LEO 120 -> 100, broker crashes.
4. On restart, metadata already contains seal=100.
5. sealJustCommitted is false.
6. truncation is skipped

Similar to the other comment by Copilot, we might need to track Leader Epochs in a more precise way and have a safe mechanism for truncation

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose therefore to tackle this into another PR, so I will not block this one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I started working on that (KC-156). In short, the idea is to record the bumped leader epoch (done in #626 ), store it as the diskless leader epoch and then we could use it for recovery, this way kafka will be able to correctly identify that it should truncate to offset 100.

giuseppelillo
giuseppelillo previously approved these changes Jun 3, 2026

@giuseppelillo giuseppelillo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for now, we need to address a couple of issues about truncation but it's cleaner to do it in another PR.

Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>

@giuseppelillo giuseppelillo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@giuseppelillo giuseppelillo merged commit f82bb06 into main Jun 3, 2026
5 checks passed
@giuseppelillo giuseppelillo deleted the svv/ts-unification-reconcile branch June 3, 2026 13:36
giuseppelillo pushed a commit that referenced this pull request Jun 4, 2026
)

* feat(inkless): KC-72 Reconcile stale records after diskless switch

When a partition switches to diskless, a replica can still hold
uncommitted records above the high watermark. If the switch is quick and
consolidation is enabled, the consolidation fetchers would start
appending on top of that stale tail. This commit implements the following:

- Truncate a newly switched partition's local log down to the
  just-committed seal (classic-to-diskless start offset). This runs after
  makeLeader/makeFollower and before any fetcher starts, so catch-up and
  consolidation always initialize from the truncated LEO.
- Add ConsolidationReconciler to decide when a consolidating partition
  joins the consolidation fetcher, covering both the initial switch and
  resume after failover, restart, or reassignment. ReplicaManager now
  delegates consolidation fetcher startup to it.
- Hand off from the classic ReplicaFetcher to consolidation once a
  follower reaches the seal, via the fetcher's self-eviction path.
- Gate pruning of switched diskless partitions behind a prune floor and
  skip partitions whose switch is still pending; born-consolidated
  partitions keep pruning directly.
- Transform the per-partition diskless watermark into a single monotonic
  safeConsolidationPruningFloor (gate + progress tracker), replacing
  lastAppliedDisklessLogStartOffset.

* Apply suggestions from code review

Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>

* Address review comments

* Fix failing tests

Some tests were incorrectly using a LEO that is < seal offset and
therefore the most recent review change where we marked such partitions
fenced, failed.

* Update core/src/main/scala/kafka/server/ReplicaManager.scala

Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>

---------

Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
gqmelo pushed a commit that referenced this pull request Jun 4, 2026
)

* feat(inkless): KC-72 Reconcile stale records after diskless switch

When a partition switches to diskless, a replica can still hold
uncommitted records above the high watermark. If the switch is quick and
consolidation is enabled, the consolidation fetchers would start
appending on top of that stale tail. This commit implements the following:

- Truncate a newly switched partition's local log down to the
  just-committed seal (classic-to-diskless start offset). This runs after
  makeLeader/makeFollower and before any fetcher starts, so catch-up and
  consolidation always initialize from the truncated LEO.
- Add ConsolidationReconciler to decide when a consolidating partition
  joins the consolidation fetcher, covering both the initial switch and
  resume after failover, restart, or reassignment. ReplicaManager now
  delegates consolidation fetcher startup to it.
- Hand off from the classic ReplicaFetcher to consolidation once a
  follower reaches the seal, via the fetcher's self-eviction path.
- Gate pruning of switched diskless partitions behind a prune floor and
  skip partitions whose switch is still pending; born-consolidated
  partitions keep pruning directly.
- Transform the per-partition diskless watermark into a single monotonic
  safeConsolidationPruningFloor (gate + progress tracker), replacing
  lastAppliedDisklessLogStartOffset.

* Apply suggestions from code review

Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>

* Address review comments

* Fix failing tests

Some tests were incorrectly using a LEO that is < seal offset and
therefore the most recent review change where we marked such partitions
fenced, failed.

* Update core/src/main/scala/kafka/server/ReplicaManager.scala

Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>

---------

Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants