feat(inkless): KC-156 diskless leader epoch for consolidation truncation#631
Conversation
5d778a4 to
3381627
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a “frozen” diskless leader epoch (E_d) to keep leader-epoch lineage monotonic after classic→diskless switching, enabling standard OffsetsForLeaderEpoch divergence truncation to correctly truncate stale classic tails back to the seal.
Changes:
- Persist a new diskless leader epoch in KRaft metadata via a new tagged field (tag
102) on partition records, and preserve it across merges. - Expose the diskless leader epoch on brokers (via
InklessMetadataView/ReplicaManager) and stamp it onto materialized diskless batches during consolidation fetch. - Implement epoch-aware
OffsetsForLeaderEpochbehavior inDisklessLeaderEndPoint(epochs belowE_dresolve to the seal;E_dand above resolve to diskless LEO), with comprehensive tests.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java | Adds disklessLeaderEpoch field, merge/toRecord behavior, and equality/hash/diff support. |
| metadata/src/main/java/org/apache/kafka/metadata/InitDisklessLogFields.java | Defines tag 102 and encode/decode helpers for the diskless leader epoch. |
| metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java | Emits the frozen diskless leader epoch on the initDisklessLog change record and logs it. |
| metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java | Adds unit tests for defaulting, record round-trip, and merge semantics of disklessLeaderEpoch. |
| metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java | Verifies initDisklessLog records/replay carry and apply the frozen diskless epoch. |
| core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala | Exposes getDisklessLeaderEpoch from the KRaft metadata image. |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Adds delegation accessors for seal and diskless leader epoch. |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala | Stamps E_d onto fetched diskless batches prior to append to maintain epoch monotonicity. |
| core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala | Implements epoch-aware fetchEpochEndOffsets behavior using seal + E_d. |
| core/src/test/scala/kafka/server/metadata/InklessMetadataViewTest.scala | Adds tests for getDisklessLeaderEpoch and additional seal retrieval cases. |
| core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala | Adds delegation test for ReplicaManager.disklessLeaderEpoch. |
| core/src/test/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThreadTest.scala | Adds tests for epoch stamping and for truncation of a stale classic tail back to the seal. |
| core/src/test/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPointTest.scala | Adds tests for seal-vs-LEO resolution based on queried epoch relative to E_d. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Sentinel for a partition that has no captured diskless leader epoch (never switched, born-diskless, | ||
| // or switch still pending). A real diskless leader epoch is always >= 0. | ||
| public static final int NO_DISKLESS_LEADER_EPOCH = -1; |
There was a problem hiding this comment.
Would it be possible to assign 0 for born-diskless partitions? In this way this is not only useful for classic to diskless switch but for all diskless partitions
There was a problem hiding this comment.
I think it would be possible but I'm not sure about what we gain. How can we use it in the future?
Currently using NO_DISKLESS_LEADER_EPOCH covers the functionality that we need and we can also differentiate between born-diskless and switched-to-diskless partitions in this case (seal < 0). I'm afraid I don't really see a good use-case for it right now but let me know if you think otherwise or have a future use-case in mind that is not on my radar.
There was a problem hiding this comment.
Not a real use case, it's just for overall clarity and coherence
| /** | ||
| * Stamp the captured diskless leader epoch (E_d) onto materialized batches so the local log keeps a | ||
| * monotonic epoch lineage (diskless records are produced with epoch 0, which would otherwise break | ||
| * the LeaderEpochFileCache after a switched partition's higher classic epochs and disable divergence | ||
| * truncation). Born-diskless / not-yet-switched partitions (E_d == NO_DISKLESS_LEADER_EPOCH) are left | ||
| * at epoch 0. Done in place to reuse the append path's single flatten; partitionLeaderEpoch is outside | ||
| * the batch CRC, so no checksum recompute is needed. | ||
| */ | ||
| private def maybeStampDisklessLeaderEpoch(topicPartition: TopicPartition, partitionData: FetchData): Unit = { | ||
| val disklessLeaderEpoch = replicaMgr.disklessLeaderEpoch(topicPartition) | ||
| if (disklessLeaderEpoch == PartitionRegistration.NO_DISKLESS_LEADER_EPOCH) { | ||
| return | ||
| } | ||
| FetchResponse.recordsOrFail(partitionData) match { | ||
| case records: ConcatenatedRecords => | ||
| records.batches().forEach(batch => batch.setPartitionLeaderEpoch(disklessLeaderEpoch)) | ||
| case records: MemoryRecords => | ||
| records.batches().forEach(batch => batch.setPartitionLeaderEpoch(disklessLeaderEpoch)) | ||
| case _ => | ||
| } | ||
| } |
There was a problem hiding this comment.
How hard would it be to instead use the real leader epoch also for diskless records? So we don't need to do this, and so E_d is only relevant to understand if the records' LE is > E_d for the truncation
There was a problem hiding this comment.
So in classic Kafka the flow is that the partition leader stamps the records with the epoch information and the followers populate their own cache based on what's in the record. What's more, it is assigned when offsets are assigned, so around production. In diskless we assign the offsets later, so that already breaks this flow, because by that time someone else could be the "leader".
Also, in diskless every replica is basically a follower of the diskless control plane. So if we wanted to stamp the real leader epoch here, we must query it from the leader first or query the metadata and find who is the current leader. This introduces a race condition, because we can't be sure that the metadata is up to date, so we could stamp something stale. But again, that leader epoch might not have been up to date even during production due to the late assignment in diskless.
All in all, these situations may cause non-monotonic leader epochs. When we switch back to classic though (E_c2), we should be able to use the real leader epoch though because we can again do a leader epoch bump just before switching back (in theory), so then we could reinstate the monotonicity as the order would be E_c < E_d < E_c2 (where E_c is the classic epoch before the switch to diskless).
…on truncation Consolidating diskless followers had no way to truncate a stale classic tail past the seal: diskless records are produced with leader epoch 0, so appending them after a classic prefix broke epoch-cache monotonicity and disabled OffsetsForLeaderEpoch divergence truncation. Introduce a frozen, control-plane-owned diskless leader epoch (E_d): - Persist E_d in KRaft metadata as a new tagged field (tag 102) on PartitionRegistration; the controller freezes the partition's current leader epoch at the initDisklessLog commit, where it is already greater than every classic-prefix epoch and robust to leader changes between the switch phases. merge() preserves it across later change records. - Expose it on the broker via InklessMetadataView/ReplicaManager. - Stamp E_d onto materialized batches in ConsolidationFetcherThread so the local log keeps a monotonic epoch lineage (in place, reusing the existing flatten; partitionLeaderEpoch is outside the batch CRC). - Implement DisklessLeaderEndPoint.fetchEpochEndOffsets: a queried epoch below E_d returns the seal (truncating a stale tail back to it), while E_d / born-diskless returns the diskless LEO. E_d lives only in KRaft metadata; no control-plane schema change is needed since both the stamping and epoch-query paths run on the broker.
0cd083d to
ff40ccf
Compare
giuseppelillo
left a comment
There was a problem hiding this comment.
LGTM for the current use case, we can tweak it later if necessary for other use cases
Consolidating diskless followers had no way to truncate a stale classic tail past the seal: diskless records are produced with leader epoch 0, so appending them after a classic prefix broke epoch-cache monotonicity and disabled OffsetsForLeaderEpoch divergence truncation.
Introduce a captured, control-plane-owned diskless leader epoch (E_d):
E_d lives only in KRaft metadata; no control-plane schema change is needed since both the stamping and epoch-query paths run on the broker.