Skip to content

fix(inkless:consolidation): prevent ConsolidationFetcherThread crash on topic deletion#627

Merged
viktorsomogyi merged 3 commits into
mainfrom
jeqo/fix-consolidation-fetcher-crash
Jun 3, 2026
Merged

fix(inkless:consolidation): prevent ConsolidationFetcherThread crash on topic deletion#627
viktorsomogyi merged 3 commits into
mainfrom
jeqo/fix-consolidation-fetcher-crash

Conversation

@jeqo

@jeqo jeqo commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

When a diskless topic is deleted, ConsolidationFetcherThread dies permanently — blocking
consolidation for all topics on that broker until restart.

ERROR [ConsolidationFetcherThread-0-1]: Error due to
      org.apache.kafka.common.errors.UnknownTopicOrPartitionException
INFO  [ConsolidationFetcherThread-0-1]: Stopped

Root cause

DisklessLeaderEndPoint.buildFetch() calls replicaManager.localLogOrException(tp) which
throws UnknownTopicOrPartitionException for deleted partitions. Only KafkaStorageException
is caught — the exception escapes doWork(), and ShutdownableThread exits permanently.

The race: BrokerMetadataPublisher sets metadataCache.setImage(newImage) (line 132)
before applyDelta (line 149) calls stopPartitionsremoveFetcherForPartitions. In
that window the fetcher still has the partition in partitionStates but it's gone from
allPartitions/metadataCache.

The regular ReplicaFetcher never hits this because stopPartitions removes from the
fetcher BEFORE allPartitions.remove — proper ordering prevents the fetcher from ever
seeing a deleted partition.

The fix

Catch UnknownTopicOrPartitionException alongside KafkaStorageException in
DisklessLeaderEndPoint.buildFetch():

} catch {
  case e @ (_: KafkaStorageException | _: UnknownTopicOrPartitionException) =>
    logger.info("Partition {} unavailable during buildFetch: {}", topicPartition, e.getMessage)
    partitionsWithError += topicPartition
}

Partition goes to partitionsWithError → delayed → evicted on next cycle once
removePartitions completes.

Test plan

  • DisklessLeaderEndPointTest.testBuildFetchMarksPartitionWithUnknownTopicOrPartitionException
  • DisklessLeaderEndPointTest — full suite passes
  • ConsolidationFetcherThreadTest — full suite passes

@jeqo jeqo changed the title fix(inkless): prevent ConsolidationFetcherThread crash on topic deletion fix(inkless:consolidation): prevent ConsolidationFetcherThread crash on topic deletion Jun 2, 2026
@jeqo jeqo requested a review from Copilot June 2, 2026 17:31

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a reliability issue in inkless consolidation fetching where deleting a diskless topic could permanently crash the ConsolidationFetcherThread, halting consolidation for all topics until broker restart.

Changes:

  • Handle UnknownTopicOrPartitionException in DisklessLeaderEndPoint.buildFetch so deleted partitions are marked as errored instead of crashing the fetcher loop.
  • Make AbstractFetcherThread.addPartitions resilient to per-partition UnknownTopicOrPartitionException so one deleted partition doesn’t abort adding others.
  • Add targeted unit tests and ensure CI executes the relevant test coverage.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.

Show a summary per file
File Description
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala Adds coverage to ensure addPartitions skips deleted partitions without crashing.
core/src/test/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPointTest.scala Adds coverage verifying buildFetch marks partitions with UnknownTopicOrPartitionException as errored.
core/src/main/scala/kafka/server/AbstractFetcherThread.scala Adds per-partition exception handling in addPartitions for deleted topics.
core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala Extends exception handling in buildFetch to include UnknownTopicOrPartitionException.
.github/workflows/inkless.yml Expands the CI test selection to include the newly relevant suites.
Comments suppressed due to low confidence (1)

core/src/main/scala/kafka/server/AbstractFetcherThread.scala:547

  • addPartitions now skips partitions that throw UnknownTopicOrPartitionException, but it still returns initialFetchStates.keySet (including skipped partitions). This makes the return value inaccurate for callers that rely on it to know which partitions were actually added/updated. Track and return only the partitions successfully added to partitionStates.
      }

      partitionMapCond.signalAll()
      initialFetchStates.keySet
    } finally partitionMapLock.unlock()

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo requested a review from viktorsomogyi June 2, 2026 17:36
@jeqo jeqo marked this pull request as ready for review June 2, 2026 17:36
@jeqo jeqo requested a review from Copilot June 3, 2026 10:00

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 6 changed files in this pull request and generated 2 comments.

@jeqo jeqo force-pushed the jeqo/fix-consolidation-fetcher-crash branch 2 times, most recently from 0a94d1d to af120eb Compare June 3, 2026 10:14
@jeqo jeqo requested a review from Copilot June 3, 2026 10:27

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 6 changed files in this pull request and generated 2 comments.

Comment thread core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala Outdated

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 6 changed files in this pull request and generated 1 comment.

@jeqo jeqo force-pushed the jeqo/fix-consolidation-fetcher-crash branch from 29b99ac to d55cc6c Compare June 3, 2026 13:15
…letion

DisklessLeaderEndPointTest: buildFetch throws
UnknownTopicOrPartitionException
for a deleted partition instead of adding it to partitionsWithError.

AbstractFetcherThreadTest: addPartitions propagates exception from a
deleted
partition, potentially crashing the caller thread.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jeqo jeqo force-pushed the jeqo/fix-consolidation-fetcher-crash branch from d55cc6c to d149d33 Compare June 3, 2026 13:17
@jeqo jeqo requested a review from Copilot June 3, 2026 13:17
@jeqo jeqo force-pushed the jeqo/fix-consolidation-fetcher-crash branch from d149d33 to de556b6 Compare June 3, 2026 13:20

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

Comment on lines +255 to +256
// UnknownTopicOrPartitionException from localLogOrException when partition is
// deleted between removePartitions acquiring the lock and buildFetch reading state.
Comment on lines 23 to 26
import kafka.utils.Logging
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.errors.{KafkaStorageException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
jeqo and others added 2 commits June 3, 2026 16:34
When a diskless topic is deleted, the ConsolidationFetcherThread crashes
permanently in buildFetch():
- localLogOrException() throws UnknownTopicOrPartitionException
- only KafkaStorageException is caught ? exception escapes doWork()
- ShutdownableThread exits with no recovery

This kills consolidation for ALL topics on the broker until restart.

The race: BrokerMetadataPublisher sets metadataCache.setImage(newImage)
(removing the topic from the cache) before applyDelta calls
stopPartitions (which removes the partition from the fetcher). In that
window, the fetcher's buildFetch reads a partition still in
partitionStates but gone from allPartitions/metadataCache.

The regular ReplicaFetcher never hits this because stopPartitions removes
from the fetcher BEFORE allPartitions.remove Ñ so the fetcher never sees
a partition that's been deleted from allPartitions. The consolidation
fetcher hits the window because metadata-view staleness creates a gap.

Fix: catch UnknownTopicOrPartitionException alongside KafkaStorageException
in DisklessLeaderEndPoint.buildFetch. The partition is added to
partitionsWithError, delayed, and evicted on the next cycle once
removePartitions completes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…CI filter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jeqo jeqo force-pushed the jeqo/fix-consolidation-fetcher-crash branch from de556b6 to 7d10050 Compare June 3, 2026 13:34
@viktorsomogyi viktorsomogyi merged commit 5e7c7c9 into main Jun 3, 2026
4 checks passed
@viktorsomogyi viktorsomogyi deleted the jeqo/fix-consolidation-fetcher-crash branch June 3, 2026 14:19
giuseppelillo pushed a commit that referenced this pull request Jun 4, 2026
…on topic deletion (#627)

When a diskless topic is deleted, the ConsolidationFetcherThread crashes
permanently in buildFetch():
- localLogOrException() throws UnknownTopicOrPartitionException
- only KafkaStorageException is caught ? exception escapes doWork()
- ShutdownableThread exits with no recovery

This kills consolidation for ALL topics on the broker until restart.

The race: BrokerMetadataPublisher sets metadataCache.setImage(newImage)
(removing the topic from the cache) before applyDelta calls
stopPartitions (which removes the partition from the fetcher). In that
window, the fetcher's buildFetch reads a partition still in
partitionStates but gone from allPartitions/metadataCache.

The regular ReplicaFetcher never hits this because stopPartitions removes
from the fetcher BEFORE allPartitions.remove Ñ so the fetcher never sees
a partition that's been deleted from allPartitions. The consolidation
fetcher hits the window because metadata-view staleness creates a gap.

Fix: catch UnknownTopicOrPartitionException alongside KafkaStorageException
in DisklessLeaderEndPoint.buildFetch. The partition is added to
partitionsWithError, delayed, and evicted on the next cycle once
removePartitions completes.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
gqmelo pushed a commit that referenced this pull request Jun 4, 2026
…on topic deletion (#627)

When a diskless topic is deleted, the ConsolidationFetcherThread crashes
permanently in buildFetch():
- localLogOrException() throws UnknownTopicOrPartitionException
- only KafkaStorageException is caught ? exception escapes doWork()
- ShutdownableThread exits with no recovery

This kills consolidation for ALL topics on the broker until restart.

The race: BrokerMetadataPublisher sets metadataCache.setImage(newImage)
(removing the topic from the cache) before applyDelta calls
stopPartitions (which removes the partition from the fetcher). In that
window, the fetcher's buildFetch reads a partition still in
partitionStates but gone from allPartitions/metadataCache.

The regular ReplicaFetcher never hits this because stopPartitions removes
from the fetcher BEFORE allPartitions.remove Ñ so the fetcher never sees
a partition that's been deleted from allPartitions. The consolidation
fetcher hits the window because metadata-view staleness creates a gap.

Fix: catch UnknownTopicOrPartitionException alongside KafkaStorageException
in DisklessLeaderEndPoint.buildFetch. The partition is added to
partitionsWithError, delayed, and evicted on the next cycle once
removePartitions completes.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants