Skip to content

feat(inkless): KC-156 diskless leader epoch for consolidation truncation#631

Merged
giuseppelillo merged 3 commits into
mainfrom
svv/ts-unification-leader-epoch
Jun 16, 2026
Merged

feat(inkless): KC-156 diskless leader epoch for consolidation truncation#631
giuseppelillo merged 3 commits into
mainfrom
svv/ts-unification-leader-epoch

Conversation

@viktorsomogyi

@viktorsomogyi viktorsomogyi commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Consolidating diskless followers had no way to truncate a stale classic tail past the seal: diskless records are produced with leader epoch 0, so appending them after a classic prefix broke epoch-cache monotonicity and disabled OffsetsForLeaderEpoch divergence truncation.

Introduce a captured, control-plane-owned diskless leader epoch (E_d):

  • Persist E_d in KRaft metadata as a new tagged field (tag 102) on PartitionRegistration; the controller freezes the partition's current leader epoch at the initDisklessLog commit, where it is already greater than every classic-prefix epoch and robust to leader changes between the switch phases. merge() preserves it across later change records.
  • Expose it on the broker via InklessMetadataView/ReplicaManager.
  • Stamp E_d onto materialized batches in ConsolidationFetcherThread so the local log keeps a monotonic epoch lineage (in place, reusing the existing flatten; partitionLeaderEpoch is outside the batch CRC).
  • Implement DisklessLeaderEndPoint.fetchEpochEndOffsets: a queried epoch below E_d returns the seal (truncating a stale tail back to it), while E_d / born-diskless returns the diskless LEO.

E_d lives only in KRaft metadata; no control-plane schema change is needed since both the stamping and epoch-query paths run on the broker.

@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-leader-epoch branch from 5d778a4 to 3381627 Compare June 5, 2026 10:06
@viktorsomogyi viktorsomogyi marked this pull request as ready for review June 5, 2026 10:08
@viktorsomogyi viktorsomogyi requested a review from Copilot June 5, 2026 10:08

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a “frozen” diskless leader epoch (E_d) to keep leader-epoch lineage monotonic after classic→diskless switching, enabling standard OffsetsForLeaderEpoch divergence truncation to correctly truncate stale classic tails back to the seal.

Changes:

  • Persist a new diskless leader epoch in KRaft metadata via a new tagged field (tag 102) on partition records, and preserve it across merges.
  • Expose the diskless leader epoch on brokers (via InklessMetadataView/ReplicaManager) and stamp it onto materialized diskless batches during consolidation fetch.
  • Implement epoch-aware OffsetsForLeaderEpoch behavior in DisklessLeaderEndPoint (epochs below E_d resolve to the seal; E_d and above resolve to diskless LEO), with comprehensive tests.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated no comments.

Show a summary per file
File Description
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java Adds disklessLeaderEpoch field, merge/toRecord behavior, and equality/hash/diff support.
metadata/src/main/java/org/apache/kafka/metadata/InitDisklessLogFields.java Defines tag 102 and encode/decode helpers for the diskless leader epoch.
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Emits the frozen diskless leader epoch on the initDisklessLog change record and logs it.
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java Adds unit tests for defaulting, record round-trip, and merge semantics of disklessLeaderEpoch.
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java Verifies initDisklessLog records/replay carry and apply the frozen diskless epoch.
core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala Exposes getDisklessLeaderEpoch from the KRaft metadata image.
core/src/main/scala/kafka/server/ReplicaManager.scala Adds delegation accessors for seal and diskless leader epoch.
core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala Stamps E_d onto fetched diskless batches prior to append to maintain epoch monotonicity.
core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala Implements epoch-aware fetchEpochEndOffsets behavior using seal + E_d.
core/src/test/scala/kafka/server/metadata/InklessMetadataViewTest.scala Adds tests for getDisklessLeaderEpoch and additional seal retrieval cases.
core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala Adds delegation test for ReplicaManager.disklessLeaderEpoch.
core/src/test/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThreadTest.scala Adds tests for epoch stamping and for truncation of a stale classic tail back to the seal.
core/src/test/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPointTest.scala Adds tests for seal-vs-LEO resolution based on queried epoch relative to E_d.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java Outdated
Comment on lines +176 to +178
// Sentinel for a partition that has no captured diskless leader epoch (never switched, born-diskless,
// or switch still pending). A real diskless leader epoch is always >= 0.
public static final int NO_DISKLESS_LEADER_EPOCH = -1;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to assign 0 for born-diskless partitions? In this way this is not only useful for classic to diskless switch but for all diskless partitions

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be possible but I'm not sure about what we gain. How can we use it in the future?
Currently using NO_DISKLESS_LEADER_EPOCH covers the functionality that we need and we can also differentiate between born-diskless and switched-to-diskless partitions in this case (seal < 0). I'm afraid I don't really see a good use-case for it right now but let me know if you think otherwise or have a future use-case in mind that is not on my radar.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a real use case, it's just for overall clarity and coherence

Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Comment on lines +77 to +97
/**
* Stamp the captured diskless leader epoch (E_d) onto materialized batches so the local log keeps a
* monotonic epoch lineage (diskless records are produced with epoch 0, which would otherwise break
* the LeaderEpochFileCache after a switched partition's higher classic epochs and disable divergence
* truncation). Born-diskless / not-yet-switched partitions (E_d == NO_DISKLESS_LEADER_EPOCH) are left
* at epoch 0. Done in place to reuse the append path's single flatten; partitionLeaderEpoch is outside
* the batch CRC, so no checksum recompute is needed.
*/
private def maybeStampDisklessLeaderEpoch(topicPartition: TopicPartition, partitionData: FetchData): Unit = {
val disklessLeaderEpoch = replicaMgr.disklessLeaderEpoch(topicPartition)
if (disklessLeaderEpoch == PartitionRegistration.NO_DISKLESS_LEADER_EPOCH) {
return
}
FetchResponse.recordsOrFail(partitionData) match {
case records: ConcatenatedRecords =>
records.batches().forEach(batch => batch.setPartitionLeaderEpoch(disklessLeaderEpoch))
case records: MemoryRecords =>
records.batches().forEach(batch => batch.setPartitionLeaderEpoch(disklessLeaderEpoch))
case _ =>
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How hard would it be to instead use the real leader epoch also for diskless records? So we don't need to do this, and so E_d is only relevant to understand if the records' LE is > E_d for the truncation

@viktorsomogyi viktorsomogyi Jun 5, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in classic Kafka the flow is that the partition leader stamps the records with the epoch information and the followers populate their own cache based on what's in the record. What's more, it is assigned when offsets are assigned, so around production. In diskless we assign the offsets later, so that already breaks this flow, because by that time someone else could be the "leader".
Also, in diskless every replica is basically a follower of the diskless control plane. So if we wanted to stamp the real leader epoch here, we must query it from the leader first or query the metadata and find who is the current leader. This introduces a race condition, because we can't be sure that the metadata is up to date, so we could stamp something stale. But again, that leader epoch might not have been up to date even during production due to the late assignment in diskless.
All in all, these situations may cause non-monotonic leader epochs. When we switch back to classic though (E_c2), we should be able to use the real leader epoch though because we can again do a leader epoch bump just before switching back (in theory), so then we could reinstate the monotonicity as the order would be E_c < E_d < E_c2 (where E_c is the classic epoch before the switch to diskless).

…on truncation

Consolidating diskless followers had no way to truncate a stale classic
tail past the seal: diskless records are produced with leader epoch 0, so
appending them after a classic prefix broke epoch-cache monotonicity and
disabled OffsetsForLeaderEpoch divergence truncation.

Introduce a frozen, control-plane-owned diskless leader epoch (E_d):

- Persist E_d in KRaft metadata as a new tagged field (tag 102) on
  PartitionRegistration; the controller freezes the partition's current
  leader epoch at the initDisklessLog commit, where it is already greater
  than every classic-prefix epoch and robust to leader changes between the
  switch phases. merge() preserves it across later change records.
- Expose it on the broker via InklessMetadataView/ReplicaManager.
- Stamp E_d onto materialized batches in ConsolidationFetcherThread so the
  local log keeps a monotonic epoch lineage (in place, reusing the existing
  flatten; partitionLeaderEpoch is outside the batch CRC).
- Implement DisklessLeaderEndPoint.fetchEpochEndOffsets: a queried epoch
  below E_d returns the seal (truncating a stale tail back to it), while
  E_d / born-diskless returns the diskless LEO.

E_d lives only in KRaft metadata; no control-plane schema change is needed
since both the stamping and epoch-query paths run on the broker.
@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-leader-epoch branch from 0cd083d to ff40ccf Compare June 15, 2026 13:44

@giuseppelillo giuseppelillo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the current use case, we can tweak it later if necessary for other use cases

@giuseppelillo giuseppelillo merged commit 02395b0 into main Jun 16, 2026
4 checks passed
@giuseppelillo giuseppelillo deleted the svv/ts-unification-leader-epoch branch June 16, 2026 14:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants