Skip to content

feat(inkless:consolidation): supplement local log with diskless data on fetch [KC-168]#638

Open
jeqo wants to merge 6 commits into
mainfrom
jeqo/diskless-consolidation-supplement-fetch
Open

feat(inkless:consolidation): supplement local log with diskless data on fetch [KC-168]#638
jeqo wants to merge 6 commits into
mainfrom
jeqo/diskless-consolidation-supplement-fetch

Conversation

@jeqo

@jeqo jeqo commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Problem

At the local/diskless boundary of a consolidating partition, consumers fetching with large minBytes would park in the delayed-fetch purgatory and wait the full maxWaitMs before getting a response — even when diskless data was already available past the local logEndOffset.

Solution

When the local log read doesn't satisfy minBytes, supplement with a synchronous diskless fetch starting at logEndOffset and merge the results via ConcatenatedRecords.concat().

For requests spanning only consolidating partitions, the supplement is applied inline and the response is produced immediately without entering the purgatory. For mixed requests (consolidating + pure-diskless), the supplement and the diskless fetch run concurrently in DelayedFetch.onComplete so latency equals the slower of the two, not their sum.

Changes

Inline supplement in fetchMessages

  • Fires when consolidating partitions are below minBytes and no pure-diskless partitions are present
  • Guards: !isFromFollower, maxWaitMs > 0, !hasPreferredReadReplica, !errorReadingData
  • inklessSharedState checked at collection time so supplements are never gathered on a non-inkless broker
  • bytesReadable only counts error-free supplement bytes — a supplement response with an error cannot trick the broker into responding prematurely
  • Exception handling: InterruptedException re-interrupts the thread; TimeoutException logged specifically for operator visibility; other failures fall back to local-only

Concurrent supplement in DelayedFetch.onComplete

  • Supplement and pure-diskless fetches run in parallel via CompletableFuture.thenCombine
  • AtomicBoolean ensures the response callback fires exactly once
  • Skips partitions with local read errors or fetchOffset >= logEndOffset
  • Skips entirely for follower fetches
  • On diskless future failure, per-partition error entries (Errors.forException) are returned so clients see the partition in the response rather than it being silently dropped

ConcatenatedRecords.concat(prefix, tail) static factory

  • Encapsulates type dispatch over MemoryRecords/ConcatenatedRecords
  • Unwraps inner backing records directly — no extra materialization copy
  • Validates inputs with Objects.requireNonNull

buildConsolidationSupplementFetchInfos

  • Computes per-partition remaining byte budget (original maxBytes minus local bytes read)
  • Drops partitions with zero budget remaining

mergeConsolidationSupplement

  • Materializes local FileRecordsMemoryRecords (standard readInto/flip idiom)
  • Promotes diskless HW/LSO; uses Math.min(local, supplement) for logStartOffset
  • Preserves localData.abortedTransactions (warns when present since diskless has no transaction index)
  • Preserves localData.preferredReadReplica (diskless returns empty for this field)
  • Falls back to local-only data on unknown Records types or merge failures with error log

Tests

  • testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable: RED/GREEN — fails without the supplement (parks for maxWaitMs), passes with it (immediate response, purgatory stays empty)
  • testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData: bounds the contract — no diskless data means the request correctly waits
  • testFetchConsolidatingSupplementWithErrorDoesNotInflateBytesReadable: supplement with error status must not satisfy minBytes — request still parks
  • testFetchConsolidatingSupplementPreservesInterruptedStatus: thread interrupted status is preserved after InterruptedException in supplement fetch
  • testFetchMixedConsolidatingPureDisklessAndClassicPartitions: all three partition types in one request, asserts merged consolidating data and two diskless handler calls
  • testCompletionWithConsolidationSupplement: focused unit test for the concurrent supplement path in DelayedFetch.onComplete
  • testConsolidationSupplementSkippedWhenLocalReadHasError: supplement merge is not applied when local read returns an error
  • testConsolidationSupplementNotIssuedWhenLocalReadHasError: supplement fetch is not issued for error-result partitions
  • testDisklessFetchFailureReturnsPerPartitionErrors: diskless future failure produces per-partition error entries, not silent omission
  • Unit tests for buildConsolidationSupplementFetchInfos: budget deduction, zero-budget drop, missing partitions filtered
  • Unit tests for mergeConsolidationSupplement: MemoryRecords merge, FileRecords materialization, unknown-type fallback

…al/diskless boundary

Deterministic regression tests for the consolidating-partition fetch path,
using a MockTimer-backed delayed-fetch purgatory so timing is asserted without
real waits:

- testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable:
  local log < minBytes with diskless data available must respond SYNCHRONOUSLY
  (merged), never parking in the purgatory. This RED/GREEN test reproduces the
  degradation: it FAILS on a build without the supplement (request parks and
  waits maxWaitMs) and PASSES with it.
- testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData:
  bounds the contract — when no diskless data is available the request correctly
  still parks and only completes after maxWaitMs elapses.

Adds an optional delayedFetchPurgatory param to the test's createReplicaManager
helper to inject a MockTimer-backed purgatory.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves fetch behavior for consolidating diskless topics at the local↔diskless boundary so that consumers with large minBytes can receive responses promptly by supplementing local-log fetch results with diskless data beyond the local logEndOffset. It also updates the delayed-fetch completion path to run the consolidation supplement and pure-diskless fetches concurrently.

Changes:

  • Add a consolidation supplement request builder and a merge routine to concatenate local + diskless records for consolidating partitions.
  • Update ReplicaManager.fetchMessages to synchronously supplement consolidating local reads (when appropriate) to avoid parking in delayed fetch.
  • Update DelayedFetch.onComplete to fetch consolidation supplements in parallel with pure-diskless fetches, and add/extend tests for these behaviors.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
core/src/main/scala/kafka/server/ReplicaManager.scala Adds supplement request building + merge logic; integrates inline supplement path into fetchMessages.
core/src/main/scala/kafka/server/DelayedFetch.scala Runs consolidation supplement fetch concurrently with pure-diskless fetch during delayed completion and merges results.
core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala Adds unit/integration-style tests validating supplement behavior and merge/budget helpers.
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala Adds a focused test for the concurrent supplement path in DelayedFetch.onComplete.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 29a4228 to 703daf4 Compare June 10, 2026 11:39
@jeqo jeqo requested a review from Copilot June 10, 2026 11:39

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 703daf4 to 7764549 Compare June 10, 2026 12:37
@jeqo jeqo requested a review from Copilot June 10, 2026 12:38

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch 2 times, most recently from dfa297f to ca00f3f Compare June 10, 2026 13:03
@jeqo jeqo requested a review from Copilot June 10, 2026 13:04
@jeqo jeqo marked this pull request as ready for review June 10, 2026 13:07

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from ca00f3f to 250606d Compare June 10, 2026 13:25
@jeqo jeqo requested a review from Copilot June 10, 2026 13:25

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 250606d to 1b63092 Compare June 10, 2026 16:46
@jeqo jeqo requested a review from Copilot June 10, 2026 16:47

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 1b63092 to d4e0688 Compare June 10, 2026 17:10
@jeqo jeqo requested a review from Copilot June 10, 2026 17:11

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from f2d9890 to 5aa471f Compare June 10, 2026 20:02
@jeqo jeqo requested a review from Copilot June 10, 2026 20:02

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 5aa471f to 93d3069 Compare June 10, 2026 20:34
@jeqo jeqo requested a review from Copilot June 10, 2026 20:35

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 93d3069 to f363671 Compare June 10, 2026 20:52
@jeqo jeqo requested a review from Copilot June 10, 2026 20:53

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.

@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from f363671 to 88fd802 Compare June 10, 2026 21:15
…on fetch

When a consolidating partition's local log doesn't satisfy minBytes,
supplement with a synchronous diskless fetch starting at logEndOffset,
then merge local + diskless records via ConcatenatedRecords.concat().
For consumer requests that only span consolidating partitions, the
supplement is applied inline and the response is produced immediately
without parking in the delayed-fetch purgatory.

For mixed requests (consolidating + pure-diskless partitions), the
supplement and the diskless fetch run concurrently in
DelayedFetch.onComplete so latency equals the slower of the two,
not their sum. An AtomicBoolean ensures the response callback fires
exactly once even if an exception escapes the future composition.

Guards on the inline supplement path:
- !isFromFollower: followers must not receive merged diskless data
- maxWaitMs > 0: non-blocking polls must not be held by a round-trip
- !hasPreferredReadReplica: redirect responses must not be delayed
- disklessFetchInfos.isEmpty: avoids a wasted blocking call when the
  request will park in DelayedFetch anyway
- inklessSharedState.isDefined checked at collection time

bytesReadable only counts error-free supplement bytes — a supplement
response with an error but non-empty records cannot trick the broker
into responding prematurely.

The merge preserves localData.abortedTransactions and
localData.preferredReadReplica (diskless has no transaction index and
no replica selector). Warns when aborted transactions are present since
the diskless portion will have no abort markers. Falls back to
local-only data on unknown Records types or merge failures.

Exception handling on the inline supplement fetch:
- InterruptedException: re-interrupts the thread and falls back
- TimeoutException: logged specifically for operator visibility
- Other failures: logged and falls back to local-only

On diskless future failure in DelayedFetch.onComplete, per-partition
error entries (Errors.forException) are returned so clients see the
partition in the response rather than it being silently dropped.

ConcatenatedRecords.concat(prefix, tail) static factory added to
encapsulate type dispatch over MemoryRecords/ConcatenatedRecords and
avoid materializing supplement backing records into a contiguous buffer.
Validates inputs with Objects.requireNonNull.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 88fd802 to 7299d77 Compare June 10, 2026 21:15
Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from df2de81 to 7cf2044 Compare June 12, 2026 06:56
@jeqo jeqo requested a review from viktorsomogyi June 12, 2026 08:04

@viktorsomogyi viktorsomogyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we can resolve that remaining issue (if you can confirm it's an issue) then we should be good to go with this.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo requested a review from viktorsomogyi June 12, 2026 17:07
viktorsomogyi
viktorsomogyi previously approved these changes Jun 17, 2026

@viktorsomogyi viktorsomogyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeqo although I think I found some further gaps, I approve this so it doesn't block you in the benchmarks and further work. Those gaps if you verified it, can be addressed separately.

@viktorsomogyi

Copy link
Copy Markdown
Contributor

Thanks for the fix. From my side it's good, but please check those copilot comments if they're relevant or if we wanted to fix them.

@jeqo

jeqo commented Jun 22, 2026

Copy link
Copy Markdown
Contributor Author

@viktorsomogyi thanks! I think all copilot suggestions are resolved now 👍🏽

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants