feat(inkless:consolidation): supplement local log with diskless data on fetch [KC-168]#638
feat(inkless:consolidation): supplement local log with diskless data on fetch [KC-168]#638jeqo wants to merge 6 commits into
Conversation
…al/diskless boundary Deterministic regression tests for the consolidating-partition fetch path, using a MockTimer-backed delayed-fetch purgatory so timing is asserted without real waits: - testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable: local log < minBytes with diskless data available must respond SYNCHRONOUSLY (merged), never parking in the purgatory. This RED/GREEN test reproduces the degradation: it FAILS on a build without the supplement (request parks and waits maxWaitMs) and PASSES with it. - testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData: bounds the contract — when no diskless data is available the request correctly still parks and only completes after maxWaitMs elapses. Adds an optional delayedFetchPurgatory param to the test's createReplicaManager helper to inject a MockTimer-backed purgatory. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR improves fetch behavior for consolidating diskless topics at the local↔diskless boundary so that consumers with large minBytes can receive responses promptly by supplementing local-log fetch results with diskless data beyond the local logEndOffset. It also updates the delayed-fetch completion path to run the consolidation supplement and pure-diskless fetches concurrently.
Changes:
- Add a consolidation supplement request builder and a merge routine to concatenate local + diskless records for consolidating partitions.
- Update
ReplicaManager.fetchMessagesto synchronously supplement consolidating local reads (when appropriate) to avoid parking in delayed fetch. - Update
DelayedFetch.onCompleteto fetch consolidation supplements in parallel with pure-diskless fetches, and add/extend tests for these behaviors.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| core/src/main/scala/kafka/server/ReplicaManager.scala | Adds supplement request building + merge logic; integrates inline supplement path into fetchMessages. |
| core/src/main/scala/kafka/server/DelayedFetch.scala | Runs consolidation supplement fetch concurrently with pure-diskless fetch during delayed completion and merges results. |
| core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala | Adds unit/integration-style tests validating supplement behavior and merge/budget helpers. |
| core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala | Adds a focused test for the concurrent supplement path in DelayedFetch.onComplete. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
29a4228 to
703daf4
Compare
703daf4 to
7764549
Compare
dfa297f to
ca00f3f
Compare
ca00f3f to
250606d
Compare
250606d to
1b63092
Compare
1b63092 to
d4e0688
Compare
f2d9890 to
5aa471f
Compare
5aa471f to
93d3069
Compare
93d3069 to
f363671
Compare
f363671 to
88fd802
Compare
…on fetch When a consolidating partition's local log doesn't satisfy minBytes, supplement with a synchronous diskless fetch starting at logEndOffset, then merge local + diskless records via ConcatenatedRecords.concat(). For consumer requests that only span consolidating partitions, the supplement is applied inline and the response is produced immediately without parking in the delayed-fetch purgatory. For mixed requests (consolidating + pure-diskless partitions), the supplement and the diskless fetch run concurrently in DelayedFetch.onComplete so latency equals the slower of the two, not their sum. An AtomicBoolean ensures the response callback fires exactly once even if an exception escapes the future composition. Guards on the inline supplement path: - !isFromFollower: followers must not receive merged diskless data - maxWaitMs > 0: non-blocking polls must not be held by a round-trip - !hasPreferredReadReplica: redirect responses must not be delayed - disklessFetchInfos.isEmpty: avoids a wasted blocking call when the request will park in DelayedFetch anyway - inklessSharedState.isDefined checked at collection time bytesReadable only counts error-free supplement bytes — a supplement response with an error but non-empty records cannot trick the broker into responding prematurely. The merge preserves localData.abortedTransactions and localData.preferredReadReplica (diskless has no transaction index and no replica selector). Warns when aborted transactions are present since the diskless portion will have no abort markers. Falls back to local-only data on unknown Records types or merge failures. Exception handling on the inline supplement fetch: - InterruptedException: re-interrupts the thread and falls back - TimeoutException: logged specifically for operator visibility - Other failures: logged and falls back to local-only On diskless future failure in DelayedFetch.onComplete, per-partition error entries (Errors.forException) are returned so clients see the partition in the response rather than it being silently dropped. ConcatenatedRecords.concat(prefix, tail) static factory added to encapsulate type dispatch over MemoryRecords/ConcatenatedRecords and avoid materializing supplement backing records into a contiguous buffer. Validates inputs with Objects.requireNonNull. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
88fd802 to
7299d77
Compare
df2de81 to
7cf2044
Compare
viktorsomogyi
left a comment
There was a problem hiding this comment.
I think if we can resolve that remaining issue (if you can confirm it's an issue) then we should be good to go with this.
viktorsomogyi
left a comment
There was a problem hiding this comment.
@jeqo although I think I found some further gaps, I approve this so it doesn't block you in the benchmarks and further work. Those gaps if you verified it, can be addressed separately.
|
Thanks for the fix. From my side it's good, but please check those copilot comments if they're relevant or if we wanted to fix them. |
|
@viktorsomogyi thanks! I think all copilot suggestions are resolved now 👍🏽 |
Problem
At the local/diskless boundary of a consolidating partition, consumers fetching with large minBytes would park in the delayed-fetch purgatory and wait the full maxWaitMs before getting a response — even when diskless data was already available past the local logEndOffset.
Solution
When the local log read doesn't satisfy minBytes, supplement with a synchronous diskless fetch starting at logEndOffset and merge the results via
ConcatenatedRecords.concat().For requests spanning only consolidating partitions, the supplement is applied inline and the response is produced immediately without entering the purgatory. For mixed requests (consolidating + pure-diskless), the supplement and the diskless fetch run concurrently in
DelayedFetch.onCompleteso latency equals the slower of the two, not their sum.Changes
Inline supplement in
fetchMessages!isFromFollower,maxWaitMs > 0,!hasPreferredReadReplica,!errorReadingDatainklessSharedStatechecked at collection time so supplements are never gathered on a non-inkless brokerbytesReadableonly counts error-free supplement bytes — a supplement response with an error cannot trick the broker into responding prematurelyInterruptedExceptionre-interrupts the thread;TimeoutExceptionlogged specifically for operator visibility; other failures fall back to local-onlyConcurrent supplement in
DelayedFetch.onCompleteCompletableFuture.thenCombineAtomicBooleanensures the response callback fires exactly oncefetchOffset >= logEndOffsetErrors.forException) are returned so clients see the partition in the response rather than it being silently droppedConcatenatedRecords.concat(prefix, tail)static factoryMemoryRecords/ConcatenatedRecordsObjects.requireNonNullbuildConsolidationSupplementFetchInfosmergeConsolidationSupplementFileRecords→MemoryRecords(standardreadInto/flipidiom)Math.min(local, supplement)for logStartOffsetlocalData.abortedTransactions(warns when present since diskless has no transaction index)localData.preferredReadReplica(diskless returns empty for this field)Tests
testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable: RED/GREEN — fails without the supplement (parks for maxWaitMs), passes with it (immediate response, purgatory stays empty)testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData: bounds the contract — no diskless data means the request correctly waitstestFetchConsolidatingSupplementWithErrorDoesNotInflateBytesReadable: supplement with error status must not satisfy minBytes — request still parkstestFetchConsolidatingSupplementPreservesInterruptedStatus: thread interrupted status is preserved after InterruptedException in supplement fetchtestFetchMixedConsolidatingPureDisklessAndClassicPartitions: all three partition types in one request, asserts merged consolidating data and two diskless handler callstestCompletionWithConsolidationSupplement: focused unit test for the concurrent supplement path inDelayedFetch.onCompletetestConsolidationSupplementSkippedWhenLocalReadHasError: supplement merge is not applied when local read returns an errortestConsolidationSupplementNotIssuedWhenLocalReadHasError: supplement fetch is not issued for error-result partitionstestDisklessFetchFailureReturnsPerPartitionErrors: diskless future failure produces per-partition error entries, not silent omissionbuildConsolidationSupplementFetchInfos: budget deduction, zero-budget drop, missing partitions filteredmergeConsolidationSupplement: MemoryRecords merge, FileRecords materialization, unknown-type fallback