feat(inkless:consolidation): supplement local log with diskless data on fetch [KC-168]#638
feat(inkless:consolidation): supplement local log with diskless data on fetch [KC-168]#638jeqo wants to merge 6 commits into
Conversation
…al/diskless boundary Deterministic regression tests for the consolidating-partition fetch path, using a MockTimer-backed delayed-fetch purgatory so timing is asserted without real waits: - testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable: local log < minBytes with diskless data available must respond SYNCHRONOUSLY (merged), never parking in the purgatory. This RED/GREEN test reproduces the degradation: it FAILS on a build without the supplement (request parks and waits maxWaitMs) and PASSES with it. - testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData: bounds the contract — when no diskless data is available the request correctly still parks and only completes after maxWaitMs elapses. Adds an optional delayedFetchPurgatory param to the test's createReplicaManager helper to inject a MockTimer-backed purgatory. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR improves fetch behavior for consolidating diskless topics at the local↔diskless boundary so that consumers with large minBytes can receive responses promptly by supplementing local-log fetch results with diskless data beyond the local logEndOffset. It also updates the delayed-fetch completion path to run the consolidation supplement and pure-diskless fetches concurrently.
Changes:
- Add a consolidation supplement request builder and a merge routine to concatenate local + diskless records for consolidating partitions.
- Update
ReplicaManager.fetchMessagesto synchronously supplement consolidating local reads (when appropriate) to avoid parking in delayed fetch. - Update
DelayedFetch.onCompleteto fetch consolidation supplements in parallel with pure-diskless fetches, and add/extend tests for these behaviors.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| core/src/main/scala/kafka/server/ReplicaManager.scala | Adds supplement request building + merge logic; integrates inline supplement path into fetchMessages. |
| core/src/main/scala/kafka/server/DelayedFetch.scala | Runs consolidation supplement fetch concurrently with pure-diskless fetch during delayed completion and merges results. |
| core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala | Adds unit/integration-style tests validating supplement behavior and merge/budget helpers. |
| core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala | Adds a focused test for the concurrent supplement path in DelayedFetch.onComplete. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
29a4228 to
703daf4
Compare
703daf4 to
7764549
Compare
dfa297f to
ca00f3f
Compare
ca00f3f to
250606d
Compare
250606d to
1b63092
Compare
1b63092 to
d4e0688
Compare
b27fd9c to
f2d9890
Compare
f2d9890 to
5aa471f
Compare
5aa471f to
93d3069
Compare
93d3069 to
f363671
Compare
f363671 to
88fd802
Compare
…on fetch When a consolidating partition's local log doesn't satisfy minBytes, supplement with a synchronous diskless fetch starting at logEndOffset, then merge local + diskless records via ConcatenatedRecords.concat(). For consumer requests that only span consolidating partitions, the supplement is applied inline and the response is produced immediately without parking in the delayed-fetch purgatory. For mixed requests (consolidating + pure-diskless partitions), the supplement and the diskless fetch run concurrently in DelayedFetch.onComplete so latency equals the slower of the two, not their sum. An AtomicBoolean ensures the response callback fires exactly once even if an exception escapes the future composition. Guards on the inline supplement path: - !isFromFollower: followers must not receive merged diskless data - maxWaitMs > 0: non-blocking polls must not be held by a round-trip - !hasPreferredReadReplica: redirect responses must not be delayed - disklessFetchInfos.isEmpty: avoids a wasted blocking call when the request will park in DelayedFetch anyway - inklessSharedState.isDefined checked at collection time bytesReadable only counts error-free supplement bytes — a supplement response with an error but non-empty records cannot trick the broker into responding prematurely. The merge preserves localData.abortedTransactions and localData.preferredReadReplica (diskless has no transaction index and no replica selector). Warns when aborted transactions are present since the diskless portion will have no abort markers. Falls back to local-only data on unknown Records types or merge failures. Exception handling on the inline supplement fetch: - InterruptedException: re-interrupts the thread and falls back - TimeoutException: logged specifically for operator visibility - Other failures: logged and falls back to local-only On diskless future failure in DelayedFetch.onComplete, per-partition error entries (Errors.forException) are returned so clients see the partition in the response rather than it being silently dropped. ConcatenatedRecords.concat(prefix, tail) static factory added to encapsulate type dispatch over MemoryRecords/ConcatenatedRecords and avoid materializing supplement backing records into a contiguous buffer. Validates inputs with Objects.requireNonNull. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
88fd802 to
7299d77
Compare
| if (readResult != null && readResult.error == Errors.NONE) { | ||
| replicaManager.getPartitionOrError(tp.topicPartition).foreach { partition => | ||
| partition.log.foreach { log => | ||
| if (status.startOffsetMetadata.messageOffset < log.logEndOffset) |
There was a problem hiding this comment.
So if I'm interpreting it right, this might not be enough. When a classic read happens on an older segment, that has less than minBytes remaining in the segment it forces completion instead of reading from the next segment. The current supplementation read I think breaks this and continues from log end instead IF I'm not missing something.
So suppose that a log is [0,5000), segment 0 is [0,2000), segment 1 is [2000,4000), segment 2 is [4000,5000). Now we have minBytes as 1MB and suppose we're reading at 1995. Reading [1995-2000) may be 500KB so we would still need to read [2000-2005) from the next segment. In this case the broker would force completion and thus force the consumer to issue a next request (and "break" minBytes). The next classic fetch then continue from 2000.
As I read it, in the classic case if there aren't enough data to return, then we force a diskless fetch from [localLogEndOffset, fetchEnd) which seems to be incorrect.
Let me know if I'm missing something.
There was a problem hiding this comment.
Good catch! I think I may unintentionally changed how classic partitions are handled. Also this helped me to catch how the transition from local logs to diskless had issues. Looking into how to fix this.
There was a problem hiding this comment.
I have added consolidating partitions map to DelayedFetch so classic (non-consolidating, non-diskless), diskless, and consolidating partitions are separated here and are easier to control. PTAL
df2de81 to
7cf2044
Compare
viktorsomogyi
left a comment
There was a problem hiding this comment.
I think if we can resolve that remaining issue (if you can confirm it's an issue) then we should be good to go with this.
| if (consolidatingLocalFetchSupplements.nonEmpty && | ||
| disklessFetchInfos.isEmpty && | ||
| !params.isFromFollower && // safeguard: followers must not receive diskless records merged into local-log data | ||
| params.maxWaitMs > 0 && // safeguard: non-blocking polls must not be held by a diskless round-trip | ||
| !hasPreferredReadReplica && | ||
| bytesReadable < params.minBytes && | ||
| !errorReadingData) { |
There was a problem hiding this comment.
I think this path may have the same problem because we're not checking here if the local data has been exhausted, just that bytesReadable < params.minBytes and then switch to diskless.
There was a problem hiding this comment.
This should be solved by buildConsolidationSupplementFetchInfos fix should solve that by taking lastBatch().nextOffset() as the supplement start offset. aa6be12#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0R2023-R2031
testBuildConsolidationSupplementFetchInfosStartsAtLastBatchNextOffset should also validate this scenario.
There was a problem hiding this comment.
Ok, your change indeed seems to cover the gap that I addressed in the previous review but I discovered a different gap this time (well it consists of 2 parts actually) and I had to dig a bit deeper to find it.
I generated a test case (should be in ReplicaManagerInklessTest) that showcases what I'm thinking of. So essentially when the diskless part doesn't cover the whole local range, then there seems to be a gap still. So let's take that the local range is [0,1000) but the segments end at every 500th offset, so 500, 1000 and so on. And the seal offset (classicToDisklessStartOffset) is at 1000 because the topic was just freshly switched. Then reading 480 for instance as in the test and falling below minBytes will result in a supplement read from offset 1000. I think we should somewhere check if we're still below the diskless range before supplementing (supplementStartOffset >= seal).
As for the second part, you can see that the read from diskless is mocked to 1000 specifically, but if you take a look at the PG control plane, it never checks the lower bound, so it never returns an error. Therefore the first part never gets exposed. Also, since 500 passes the checks in the find_batches routine, it returns the first record without a second thought. So this is partly on the diskless control plane because it never expects that its logs don't start at 0 AND you can fetch something higher than it but still below its log start.
This is the first check where we should really check log_start_offset instead of 0.
And this is the second one where it slips through and returns the 1st batch.
// REPRODUCES the "sub-seal gap" silent-data-loss bug.
//
// Setup of a consolidating partition:
// - classic-to-diskless seal = 1000 (offsets [1000, ...) live in object storage / diskless)
// - local classic prefix = [0, 1000) and spans multiple local log segments
// - local logEndOffset (LEO) = 1000
// - localLogStartOffset = 0 (nothing tiered, so the tiered-range guard is a no-op)
//
// A consumer fetches at offset 480, which lands in an *earlier* local segment. LocalLog.read
// serves from a single segment, so the local read returns only [480, 500) (a segment boundary
// well below the seal). Because that is smaller than minBytes, the consolidating path triggers
// a diskless supplement, computed by buildConsolidationSupplementFetchInfos as
// lastBatch().nextOffset() == 500.
//
// But 500 is BELOW the seal (1000): the diskless control plane holds no batches for offsets
// below the seal, so a FindBatch at 500 returns the first available batch — at the seal, 1000.
// (The fixed FetchHandler stub returns records starting at 1000 regardless of the requested
// offset, faithfully modelling that tailSet/"first batch at-or-after" control-plane behavior.)
//
// The merge therefore stitches local [480, 500) directly onto diskless [1000, ...), and the
// committed range [500, 1000) is silently dropped from the consumer-visible response.
@Test
def testFetchConsolidatingSupplementBelowSealSilentlySkipsClassicPrefix(): Unit = {
val seal = 1000L
val localLeo = 1000L
// Local single-segment read: offsets [480, 500). 20 records in one batch -> nextOffset == 500.
val localRecordList = (0 until 20).map(i => new SimpleRecord(i.toLong, s"local-$i".getBytes())).toArray
val localMemRecords = MemoryRecords.withRecords(
2.toByte, 480L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, localRecordList: _*
)
var localFileRecords: FileRecords = null
// Diskless serves from the seal: offsets [1000, 1005). Models the control plane returning the
// first batch at-or-after the requested (sub-seal) offset of 500.
val supplementRecordList = (0 until 5).map(i => new SimpleRecord(i.toLong, s"diskless-$i".getBytes())).toArray
val supplementRecords = MemoryRecords.withRecords(
2.toByte, seal, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, supplementRecordList: _*
)
val disklessResponse = Map(disklessTopicPartition ->
new FetchPartitionData(
Errors.NONE,
localLeo, 0L,
supplementRecords,
Optional.empty(), OptionalLong.of(localLeo), Optional.empty(), OptionalInt.empty(), false)
)
val fetchHandlerCtor = mockFetchHandler(disklessResponse)
val cp = mock(classOf[ControlPlane])
val replicaManager = spy(createReplicaManager(
List(disklessTopicPartition.topic()),
controlPlane = Some(cp),
disklessManagedReplicasEnabled = true,
disklessRemoteStorageConsolidationEnabled = true,
consolidatingDisklessTopics = Set(disklessTopicPartition.topic()),
))
try {
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(seal)
stubConsolidatingPartitionWithLocalLeo(replicaManager, localLeo = localLeo)
localFileRecords = memoryRecordsToFileRecords(localMemRecords)
doReturn(Seq(disklessTopicPartition ->
new LogReadResult(
new FetchDataInfo(new LogOffsetMetadata(480L, 0L, 0), localFileRecords),
Optional.empty(), localLeo, 0L, localLeo, 0L, 0L, OptionalLong.empty(), Errors.NONE
))
).when(replicaManager).readFromLog(any(), any(), any(), any())
val fetchParams = new FetchParams(
FetchRequest.ORDINARY_CONSUMER_ID, -1L,
1000L,
localMemRecords.sizeInBytes + 1, // minBytes > local-only size to trigger the supplement
1024 * 1024,
FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 480L, 0L, 1024 * 1024, Optional.empty())
)
@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)
waitForFetchResponse(responseData)
val result = responseData(disklessTopicPartition)
assertEquals(Errors.NONE, result.error)
val offsets = result.records.records().asScala.map(_.offset()).toList
val gaps = offsets.sliding(2).collect { case Seq(a, b) if b != a + 1 => (a, b) }.toList
assertTrue(gaps.isEmpty,
s"Consumer-visible records must be contiguous, but found offset gap(s): $gaps. " +
s"The supplement started at offset 500 (below the seal $seal); diskless served from the " +
s"seal, so the committed range [500, $seal) was silently skipped. Offsets=$offsets")
} finally {
replicaManager.shutdown(checkpointHW = false)
fetchHandlerCtor.close()
if (localFileRecords != null) localFileRecords.close()
}
}
There was a problem hiding this comment.
Ah, good catch! I wasn't focusing on topics switched, so I was missing this scenario.
This is very relevant actually, so I work on the fix, and have a follow up for the PG validation.
viktorsomogyi
left a comment
There was a problem hiding this comment.
@jeqo although I think I found some further gaps, I approve this so it doesn't block you in the benchmarks and further work. Those gaps if you verified it, can be addressed separately.
Problem
At the local/diskless boundary of a consolidating partition, consumers fetching with large minBytes would park in the delayed-fetch purgatory and wait the full maxWaitMs before getting a response — even when diskless data was already available past the local logEndOffset.
Solution
When the local log read doesn't satisfy minBytes, supplement with a synchronous diskless fetch starting at logEndOffset and merge the results via
ConcatenatedRecords.concat().For requests spanning only consolidating partitions, the supplement is applied inline and the response is produced immediately without entering the purgatory. For mixed requests (consolidating + pure-diskless), the supplement and the diskless fetch run concurrently in
DelayedFetch.onCompleteso latency equals the slower of the two, not their sum.Changes
Inline supplement in
fetchMessages!isFromFollower,maxWaitMs > 0,!hasPreferredReadReplica,!errorReadingDatainklessSharedStatechecked at collection time so supplements are never gathered on a non-inkless brokerbytesReadableonly counts error-free supplement bytes — a supplement response with an error cannot trick the broker into responding prematurelyInterruptedExceptionre-interrupts the thread;TimeoutExceptionlogged specifically for operator visibility; other failures fall back to local-onlyConcurrent supplement in
DelayedFetch.onCompleteCompletableFuture.thenCombineAtomicBooleanensures the response callback fires exactly oncefetchOffset >= logEndOffsetErrors.forException) are returned so clients see the partition in the response rather than it being silently droppedConcatenatedRecords.concat(prefix, tail)static factoryMemoryRecords/ConcatenatedRecordsObjects.requireNonNullbuildConsolidationSupplementFetchInfosmergeConsolidationSupplementFileRecords→MemoryRecords(standardreadInto/flipidiom)Math.min(local, supplement)for logStartOffsetlocalData.abortedTransactions(warns when present since diskless has no transaction index)localData.preferredReadReplica(diskless returns empty for this field)Tests
testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable: RED/GREEN — fails without the supplement (parks for maxWaitMs), passes with it (immediate response, purgatory stays empty)testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData: bounds the contract — no diskless data means the request correctly waitstestFetchConsolidatingSupplementWithErrorDoesNotInflateBytesReadable: supplement with error status must not satisfy minBytes — request still parkstestFetchConsolidatingSupplementPreservesInterruptedStatus: thread interrupted status is preserved after InterruptedException in supplement fetchtestFetchMixedConsolidatingPureDisklessAndClassicPartitions: all three partition types in one request, asserts merged consolidating data and two diskless handler callstestCompletionWithConsolidationSupplement: focused unit test for the concurrent supplement path inDelayedFetch.onCompletetestConsolidationSupplementSkippedWhenLocalReadHasError: supplement merge is not applied when local read returns an errortestConsolidationSupplementNotIssuedWhenLocalReadHasError: supplement fetch is not issued for error-result partitionstestDisklessFetchFailureReturnsPerPartitionErrors: diskless future failure produces per-partition error entries, not silent omissionbuildConsolidationSupplementFetchInfos: budget deduction, zero-budget drop, missing partitions filteredmergeConsolidationSupplement: MemoryRecords merge, FileRecords materialization, unknown-type fallback