Skip to content

feat(inkless:consolidation): supplement local log with diskless data on fetch [KC-168]#638

Open
jeqo wants to merge 6 commits into
mainfrom
jeqo/diskless-consolidation-supplement-fetch
Open

feat(inkless:consolidation): supplement local log with diskless data on fetch [KC-168]#638
jeqo wants to merge 6 commits into
mainfrom
jeqo/diskless-consolidation-supplement-fetch

Conversation

@jeqo

@jeqo jeqo commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Problem

At the local/diskless boundary of a consolidating partition, consumers fetching with large minBytes would park in the delayed-fetch purgatory and wait the full maxWaitMs before getting a response — even when diskless data was already available past the local logEndOffset.

Solution

When the local log read doesn't satisfy minBytes, supplement with a synchronous diskless fetch starting at logEndOffset and merge the results via ConcatenatedRecords.concat().

For requests spanning only consolidating partitions, the supplement is applied inline and the response is produced immediately without entering the purgatory. For mixed requests (consolidating + pure-diskless), the supplement and the diskless fetch run concurrently in DelayedFetch.onComplete so latency equals the slower of the two, not their sum.

Changes

Inline supplement in fetchMessages

  • Fires when consolidating partitions are below minBytes and no pure-diskless partitions are present
  • Guards: !isFromFollower, maxWaitMs > 0, !hasPreferredReadReplica, !errorReadingData
  • inklessSharedState checked at collection time so supplements are never gathered on a non-inkless broker
  • bytesReadable only counts error-free supplement bytes — a supplement response with an error cannot trick the broker into responding prematurely
  • Exception handling: InterruptedException re-interrupts the thread; TimeoutException logged specifically for operator visibility; other failures fall back to local-only

Concurrent supplement in DelayedFetch.onComplete

  • Supplement and pure-diskless fetches run in parallel via CompletableFuture.thenCombine
  • AtomicBoolean ensures the response callback fires exactly once
  • Skips partitions with local read errors or fetchOffset >= logEndOffset
  • Skips entirely for follower fetches
  • On diskless future failure, per-partition error entries (Errors.forException) are returned so clients see the partition in the response rather than it being silently dropped

ConcatenatedRecords.concat(prefix, tail) static factory

  • Encapsulates type dispatch over MemoryRecords/ConcatenatedRecords
  • Unwraps inner backing records directly — no extra materialization copy
  • Validates inputs with Objects.requireNonNull

buildConsolidationSupplementFetchInfos

  • Computes per-partition remaining byte budget (original maxBytes minus local bytes read)
  • Drops partitions with zero budget remaining

mergeConsolidationSupplement

  • Materializes local FileRecordsMemoryRecords (standard readInto/flip idiom)
  • Promotes diskless HW/LSO; uses Math.min(local, supplement) for logStartOffset
  • Preserves localData.abortedTransactions (warns when present since diskless has no transaction index)
  • Preserves localData.preferredReadReplica (diskless returns empty for this field)
  • Falls back to local-only data on unknown Records types or merge failures with error log

Tests

  • testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable: RED/GREEN — fails without the supplement (parks for maxWaitMs), passes with it (immediate response, purgatory stays empty)
  • testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData: bounds the contract — no diskless data means the request correctly waits
  • testFetchConsolidatingSupplementWithErrorDoesNotInflateBytesReadable: supplement with error status must not satisfy minBytes — request still parks
  • testFetchConsolidatingSupplementPreservesInterruptedStatus: thread interrupted status is preserved after InterruptedException in supplement fetch
  • testFetchMixedConsolidatingPureDisklessAndClassicPartitions: all three partition types in one request, asserts merged consolidating data and two diskless handler calls
  • testCompletionWithConsolidationSupplement: focused unit test for the concurrent supplement path in DelayedFetch.onComplete
  • testConsolidationSupplementSkippedWhenLocalReadHasError: supplement merge is not applied when local read returns an error
  • testConsolidationSupplementNotIssuedWhenLocalReadHasError: supplement fetch is not issued for error-result partitions
  • testDisklessFetchFailureReturnsPerPartitionErrors: diskless future failure produces per-partition error entries, not silent omission
  • Unit tests for buildConsolidationSupplementFetchInfos: budget deduction, zero-budget drop, missing partitions filtered
  • Unit tests for mergeConsolidationSupplement: MemoryRecords merge, FileRecords materialization, unknown-type fallback

…al/diskless boundary

Deterministic regression tests for the consolidating-partition fetch path,
using a MockTimer-backed delayed-fetch purgatory so timing is asserted without
real waits:

- testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable:
  local log < minBytes with diskless data available must respond SYNCHRONOUSLY
  (merged), never parking in the purgatory. This RED/GREEN test reproduces the
  degradation: it FAILS on a build without the supplement (request parks and
  waits maxWaitMs) and PASSES with it.
- testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData:
  bounds the contract — when no diskless data is available the request correctly
  still parks and only completes after maxWaitMs elapses.

Adds an optional delayedFetchPurgatory param to the test's createReplicaManager
helper to inject a MockTimer-backed purgatory.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves fetch behavior for consolidating diskless topics at the local↔diskless boundary so that consumers with large minBytes can receive responses promptly by supplementing local-log fetch results with diskless data beyond the local logEndOffset. It also updates the delayed-fetch completion path to run the consolidation supplement and pure-diskless fetches concurrently.

Changes:

  • Add a consolidation supplement request builder and a merge routine to concatenate local + diskless records for consolidating partitions.
  • Update ReplicaManager.fetchMessages to synchronously supplement consolidating local reads (when appropriate) to avoid parking in delayed fetch.
  • Update DelayedFetch.onComplete to fetch consolidation supplements in parallel with pure-diskless fetches, and add/extend tests for these behaviors.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
core/src/main/scala/kafka/server/ReplicaManager.scala Adds supplement request building + merge logic; integrates inline supplement path into fetchMessages.
core/src/main/scala/kafka/server/DelayedFetch.scala Runs consolidation supplement fetch concurrently with pure-diskless fetch during delayed completion and merges results.
core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala Adds unit/integration-style tests validating supplement behavior and merge/budget helpers.
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala Adds a focused test for the concurrent supplement path in DelayedFetch.onComplete.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 29a4228 to 703daf4 Compare June 10, 2026 11:39
@jeqo jeqo requested a review from Copilot June 10, 2026 11:39

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 703daf4 to 7764549 Compare June 10, 2026 12:37
@jeqo jeqo requested a review from Copilot June 10, 2026 12:38

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch 2 times, most recently from dfa297f to ca00f3f Compare June 10, 2026 13:03
@jeqo jeqo requested a review from Copilot June 10, 2026 13:04
@jeqo jeqo marked this pull request as ready for review June 10, 2026 13:07

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from ca00f3f to 250606d Compare June 10, 2026 13:25
@jeqo jeqo requested a review from Copilot June 10, 2026 13:25

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 250606d to 1b63092 Compare June 10, 2026 16:46
@jeqo jeqo requested a review from Copilot June 10, 2026 16:47

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 1b63092 to d4e0688 Compare June 10, 2026 17:10
@jeqo jeqo requested a review from Copilot June 10, 2026 17:11
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from b27fd9c to f2d9890 Compare June 10, 2026 19:48
@jeqo jeqo requested a review from Copilot June 10, 2026 19:49

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from f2d9890 to 5aa471f Compare June 10, 2026 20:02
@jeqo jeqo requested a review from Copilot June 10, 2026 20:02

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 5aa471f to 93d3069 Compare June 10, 2026 20:34
@jeqo jeqo requested a review from Copilot June 10, 2026 20:35

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/DelayedFetch.scala Outdated
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 93d3069 to f363671 Compare June 10, 2026 20:52
@jeqo jeqo requested a review from Copilot June 10, 2026 20:53

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.

@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from f363671 to 88fd802 Compare June 10, 2026 21:15
…on fetch

When a consolidating partition's local log doesn't satisfy minBytes,
supplement with a synchronous diskless fetch starting at logEndOffset,
then merge local + diskless records via ConcatenatedRecords.concat().
For consumer requests that only span consolidating partitions, the
supplement is applied inline and the response is produced immediately
without parking in the delayed-fetch purgatory.

For mixed requests (consolidating + pure-diskless partitions), the
supplement and the diskless fetch run concurrently in
DelayedFetch.onComplete so latency equals the slower of the two,
not their sum. An AtomicBoolean ensures the response callback fires
exactly once even if an exception escapes the future composition.

Guards on the inline supplement path:
- !isFromFollower: followers must not receive merged diskless data
- maxWaitMs > 0: non-blocking polls must not be held by a round-trip
- !hasPreferredReadReplica: redirect responses must not be delayed
- disklessFetchInfos.isEmpty: avoids a wasted blocking call when the
  request will park in DelayedFetch anyway
- inklessSharedState.isDefined checked at collection time

bytesReadable only counts error-free supplement bytes — a supplement
response with an error but non-empty records cannot trick the broker
into responding prematurely.

The merge preserves localData.abortedTransactions and
localData.preferredReadReplica (diskless has no transaction index and
no replica selector). Warns when aborted transactions are present since
the diskless portion will have no abort markers. Falls back to
local-only data on unknown Records types or merge failures.

Exception handling on the inline supplement fetch:
- InterruptedException: re-interrupts the thread and falls back
- TimeoutException: logged specifically for operator visibility
- Other failures: logged and falls back to local-only

On diskless future failure in DelayedFetch.onComplete, per-partition
error entries (Errors.forException) are returned so clients see the
partition in the response rather than it being silently dropped.

ConcatenatedRecords.concat(prefix, tail) static factory added to
encapsulate type dispatch over MemoryRecords/ConcatenatedRecords and
avoid materializing supplement backing records into a contiguous buffer.
Validates inputs with Objects.requireNonNull.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from 88fd802 to 7299d77 Compare June 10, 2026 21:15
if (readResult != null && readResult.error == Errors.NONE) {
replicaManager.getPartitionOrError(tp.topicPartition).foreach { partition =>
partition.log.foreach { log =>
if (status.startOffsetMetadata.messageOffset < log.logEndOffset)

@viktorsomogyi viktorsomogyi Jun 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I'm interpreting it right, this might not be enough. When a classic read happens on an older segment, that has less than minBytes remaining in the segment it forces completion instead of reading from the next segment. The current supplementation read I think breaks this and continues from log end instead IF I'm not missing something.
So suppose that a log is [0,5000), segment 0 is [0,2000), segment 1 is [2000,4000), segment 2 is [4000,5000). Now we have minBytes as 1MB and suppose we're reading at 1995. Reading [1995-2000) may be 500KB so we would still need to read [2000-2005) from the next segment. In this case the broker would force completion and thus force the consumer to issue a next request (and "break" minBytes). The next classic fetch then continue from 2000.
As I read it, in the classic case if there aren't enough data to return, then we force a diskless fetch from [localLogEndOffset, fetchEnd) which seems to be incorrect.
Let me know if I'm missing something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I think I may unintentionally changed how classic partitions are handled. Also this helped me to catch how the transition from local logs to diskless had issues. Looking into how to fix this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added consolidating partitions map to DelayedFetch so classic (non-consolidating, non-diskless), diskless, and consolidating partitions are separated here and are easier to control. PTAL

@jeqo jeqo force-pushed the jeqo/diskless-consolidation-supplement-fetch branch from df2de81 to 7cf2044 Compare June 12, 2026 06:56
@jeqo jeqo requested a review from viktorsomogyi June 12, 2026 08:04

@viktorsomogyi viktorsomogyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we can resolve that remaining issue (if you can confirm it's an issue) then we should be good to go with this.

Comment on lines +2347 to +2353
if (consolidatingLocalFetchSupplements.nonEmpty &&
disklessFetchInfos.isEmpty &&
!params.isFromFollower && // safeguard: followers must not receive diskless records merged into local-log data
params.maxWaitMs > 0 && // safeguard: non-blocking polls must not be held by a diskless round-trip
!hasPreferredReadReplica &&
bytesReadable < params.minBytes &&
!errorReadingData) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this path may have the same problem because we're not checking here if the local data has been exhausted, just that bytesReadable < params.minBytes and then switch to diskless.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be solved by buildConsolidationSupplementFetchInfos fix should solve that by taking lastBatch().nextOffset() as the supplement start offset. aa6be12#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0R2023-R2031

testBuildConsolidationSupplementFetchInfosStartsAtLastBatchNextOffset should also validate this scenario.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, your change indeed seems to cover the gap that I addressed in the previous review but I discovered a different gap this time (well it consists of 2 parts actually) and I had to dig a bit deeper to find it.

I generated a test case (should be in ReplicaManagerInklessTest) that showcases what I'm thinking of. So essentially when the diskless part doesn't cover the whole local range, then there seems to be a gap still. So let's take that the local range is [0,1000) but the segments end at every 500th offset, so 500, 1000 and so on. And the seal offset (classicToDisklessStartOffset) is at 1000 because the topic was just freshly switched. Then reading 480 for instance as in the test and falling below minBytes will result in a supplement read from offset 1000. I think we should somewhere check if we're still below the diskless range before supplementing (supplementStartOffset >= seal).

As for the second part, you can see that the read from diskless is mocked to 1000 specifically, but if you take a look at the PG control plane, it never checks the lower bound, so it never returns an error. Therefore the first part never gets exposed. Also, since 500 passes the checks in the find_batches routine, it returns the first record without a second thought. So this is partly on the diskless control plane because it never expects that its logs don't start at 0 AND you can fetch something higher than it but still below its log start.

This is the first check where we should really check log_start_offset instead of 0.

And this is the second one where it slips through and returns the 1st batch.

  // REPRODUCES the "sub-seal gap" silent-data-loss bug.
  //
  // Setup of a consolidating partition:
  //   - classic-to-diskless seal  = 1000  (offsets [1000, ...) live in object storage / diskless)
  //   - local classic prefix      = [0, 1000) and spans multiple local log segments
  //   - local logEndOffset (LEO)  = 1000
  //   - localLogStartOffset       = 0     (nothing tiered, so the tiered-range guard is a no-op)
  //
  // A consumer fetches at offset 480, which lands in an *earlier* local segment. LocalLog.read
  // serves from a single segment, so the local read returns only [480, 500) (a segment boundary
  // well below the seal). Because that is smaller than minBytes, the consolidating path triggers
  // a diskless supplement, computed by buildConsolidationSupplementFetchInfos as
  // lastBatch().nextOffset() == 500.
  //
  // But 500 is BELOW the seal (1000): the diskless control plane holds no batches for offsets
  // below the seal, so a FindBatch at 500 returns the first available batch — at the seal, 1000.
  // (The fixed FetchHandler stub returns records starting at 1000 regardless of the requested
  // offset, faithfully modelling that tailSet/"first batch at-or-after" control-plane behavior.)
  //
  // The merge therefore stitches local [480, 500) directly onto diskless [1000, ...), and the
  // committed range [500, 1000) is silently dropped from the consumer-visible response.
  @Test
  def testFetchConsolidatingSupplementBelowSealSilentlySkipsClassicPrefix(): Unit = {
    val seal = 1000L
    val localLeo = 1000L

    // Local single-segment read: offsets [480, 500). 20 records in one batch -> nextOffset == 500.
    val localRecordList = (0 until 20).map(i => new SimpleRecord(i.toLong, s"local-$i".getBytes())).toArray
    val localMemRecords = MemoryRecords.withRecords(
      2.toByte, 480L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, localRecordList: _*
    )
    var localFileRecords: FileRecords = null

    // Diskless serves from the seal: offsets [1000, 1005). Models the control plane returning the
    // first batch at-or-after the requested (sub-seal) offset of 500.
    val supplementRecordList = (0 until 5).map(i => new SimpleRecord(i.toLong, s"diskless-$i".getBytes())).toArray
    val supplementRecords = MemoryRecords.withRecords(
      2.toByte, seal, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, supplementRecordList: _*
    )
    val disklessResponse = Map(disklessTopicPartition ->
      new FetchPartitionData(
        Errors.NONE,
        localLeo, 0L,
        supplementRecords,
        Optional.empty(), OptionalLong.of(localLeo), Optional.empty(), OptionalInt.empty(), false)
    )

    val fetchHandlerCtor = mockFetchHandler(disklessResponse)
    val cp = mock(classOf[ControlPlane])
    val replicaManager = spy(createReplicaManager(
      List(disklessTopicPartition.topic()),
      controlPlane = Some(cp),
      disklessManagedReplicasEnabled = true,
      disklessRemoteStorageConsolidationEnabled = true,
      consolidatingDisklessTopics = Set(disklessTopicPartition.topic()),
    ))
    try {
      when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
        .thenReturn(seal)
      stubConsolidatingPartitionWithLocalLeo(replicaManager, localLeo = localLeo)

      localFileRecords = memoryRecordsToFileRecords(localMemRecords)
      doReturn(Seq(disklessTopicPartition ->
        new LogReadResult(
          new FetchDataInfo(new LogOffsetMetadata(480L, 0L, 0), localFileRecords),
          Optional.empty(), localLeo, 0L, localLeo, 0L, 0L, OptionalLong.empty(), Errors.NONE
        ))
      ).when(replicaManager).readFromLog(any(), any(), any(), any())

      val fetchParams = new FetchParams(
        FetchRequest.ORDINARY_CONSUMER_ID, -1L,
        1000L,
        localMemRecords.sizeInBytes + 1, // minBytes > local-only size to trigger the supplement
        1024 * 1024,
        FetchIsolation.HIGH_WATERMARK, Optional.empty()
      )
      val fetchInfos = Seq(
        disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 480L, 0L, 1024 * 1024, Optional.empty())
      )

      @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
      val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
        responseData = response.toMap
      }
      replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

      waitForFetchResponse(responseData)
      val result = responseData(disklessTopicPartition)
      assertEquals(Errors.NONE, result.error)

      val offsets = result.records.records().asScala.map(_.offset()).toList
      val gaps = offsets.sliding(2).collect { case Seq(a, b) if b != a + 1 => (a, b) }.toList
      assertTrue(gaps.isEmpty,
        s"Consumer-visible records must be contiguous, but found offset gap(s): $gaps. " +
          s"The supplement started at offset 500 (below the seal $seal); diskless served from the " +
          s"seal, so the committed range [500, $seal) was silently skipped. Offsets=$offsets")
    } finally {
      replicaManager.shutdown(checkpointHW = false)
      fetchHandlerCtor.close()
      if (localFileRecords != null) localFileRecords.close()
    }
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch! I wasn't focusing on topics switched, so I was missing this scenario.
This is very relevant actually, so I work on the fix, and have a follow up for the PG validation.

@jeqo jeqo requested a review from viktorsomogyi June 12, 2026 17:07
viktorsomogyi
viktorsomogyi previously approved these changes Jun 17, 2026

@viktorsomogyi viktorsomogyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeqo although I think I found some further gaps, I approve this so it doesn't block you in the benchmarks and further work. Those gaps if you verified it, can be addressed separately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants