fix(inkless:consolidation): prevent ConsolidationFetcherThread crash on topic deletion#627
Merged
Merged
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR addresses a reliability issue in inkless consolidation fetching where deleting a diskless topic could permanently crash the ConsolidationFetcherThread, halting consolidation for all topics until broker restart.
Changes:
- Handle
UnknownTopicOrPartitionExceptioninDisklessLeaderEndPoint.buildFetchso deleted partitions are marked as errored instead of crashing the fetcher loop. - Make
AbstractFetcherThread.addPartitionsresilient to per-partitionUnknownTopicOrPartitionExceptionso one deleted partition doesn’t abort adding others. - Add targeted unit tests and ensure CI executes the relevant test coverage.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala | Adds coverage to ensure addPartitions skips deleted partitions without crashing. |
| core/src/test/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPointTest.scala | Adds coverage verifying buildFetch marks partitions with UnknownTopicOrPartitionException as errored. |
| core/src/main/scala/kafka/server/AbstractFetcherThread.scala | Adds per-partition exception handling in addPartitions for deleted topics. |
| core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala | Extends exception handling in buildFetch to include UnknownTopicOrPartitionException. |
| .github/workflows/inkless.yml | Expands the CI test selection to include the newly relevant suites. |
Comments suppressed due to low confidence (1)
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:547
addPartitionsnow skips partitions that throwUnknownTopicOrPartitionException, but it still returnsinitialFetchStates.keySet(including skipped partitions). This makes the return value inaccurate for callers that rely on it to know which partitions were actually added/updated. Track and return only the partitions successfully added topartitionStates.
}
partitionMapCond.signalAll()
initialFetchStates.keySet
} finally partitionMapLock.unlock()
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
0a94d1d to
af120eb
Compare
29b99ac to
d55cc6c
Compare
…letion DisklessLeaderEndPointTest: buildFetch throws UnknownTopicOrPartitionException for a deleted partition instead of adding it to partitionsWithError. AbstractFetcherThreadTest: addPartitions propagates exception from a deleted partition, potentially crashing the caller thread. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
d55cc6c to
d149d33
Compare
d149d33 to
de556b6
Compare
Comment on lines
+255
to
+256
| // UnknownTopicOrPartitionException from localLogOrException when partition is | ||
| // deleted between removePartitions acquiring the lock and buildFetch reading state. |
Comment on lines
23
to
26
| import kafka.utils.Logging | ||
| import org.apache.kafka.common.Uuid | ||
| import org.apache.kafka.common.errors.KafkaStorageException | ||
| import org.apache.kafka.common.errors.{KafkaStorageException, UnknownTopicOrPartitionException} | ||
| import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} |
When a diskless topic is deleted, the ConsolidationFetcherThread crashes permanently in buildFetch(): - localLogOrException() throws UnknownTopicOrPartitionException - only KafkaStorageException is caught ? exception escapes doWork() - ShutdownableThread exits with no recovery This kills consolidation for ALL topics on the broker until restart. The race: BrokerMetadataPublisher sets metadataCache.setImage(newImage) (removing the topic from the cache) before applyDelta calls stopPartitions (which removes the partition from the fetcher). In that window, the fetcher's buildFetch reads a partition still in partitionStates but gone from allPartitions/metadataCache. The regular ReplicaFetcher never hits this because stopPartitions removes from the fetcher BEFORE allPartitions.remove Ñ so the fetcher never sees a partition that's been deleted from allPartitions. The consolidation fetcher hits the window because metadata-view staleness creates a gap. Fix: catch UnknownTopicOrPartitionException alongside KafkaStorageException in DisklessLeaderEndPoint.buildFetch. The partition is added to partitionsWithError, delayed, and evicted on the next cycle once removePartitions completes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…CI filter Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
de556b6 to
7d10050
Compare
viktorsomogyi
approved these changes
Jun 3, 2026
giuseppelillo
pushed a commit
that referenced
this pull request
Jun 4, 2026
…on topic deletion (#627) When a diskless topic is deleted, the ConsolidationFetcherThread crashes permanently in buildFetch(): - localLogOrException() throws UnknownTopicOrPartitionException - only KafkaStorageException is caught ? exception escapes doWork() - ShutdownableThread exits with no recovery This kills consolidation for ALL topics on the broker until restart. The race: BrokerMetadataPublisher sets metadataCache.setImage(newImage) (removing the topic from the cache) before applyDelta calls stopPartitions (which removes the partition from the fetcher). In that window, the fetcher's buildFetch reads a partition still in partitionStates but gone from allPartitions/metadataCache. The regular ReplicaFetcher never hits this because stopPartitions removes from the fetcher BEFORE allPartitions.remove Ñ so the fetcher never sees a partition that's been deleted from allPartitions. The consolidation fetcher hits the window because metadata-view staleness creates a gap. Fix: catch UnknownTopicOrPartitionException alongside KafkaStorageException in DisklessLeaderEndPoint.buildFetch. The partition is added to partitionsWithError, delayed, and evicted on the next cycle once removePartitions completes. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
gqmelo
pushed a commit
that referenced
this pull request
Jun 4, 2026
…on topic deletion (#627) When a diskless topic is deleted, the ConsolidationFetcherThread crashes permanently in buildFetch(): - localLogOrException() throws UnknownTopicOrPartitionException - only KafkaStorageException is caught ? exception escapes doWork() - ShutdownableThread exits with no recovery This kills consolidation for ALL topics on the broker until restart. The race: BrokerMetadataPublisher sets metadataCache.setImage(newImage) (removing the topic from the cache) before applyDelta calls stopPartitions (which removes the partition from the fetcher). In that window, the fetcher's buildFetch reads a partition still in partitionStates but gone from allPartitions/metadataCache. The regular ReplicaFetcher never hits this because stopPartitions removes from the fetcher BEFORE allPartitions.remove Ñ so the fetcher never sees a partition that's been deleted from allPartitions. The consolidation fetcher hits the window because metadata-view staleness creates a gap. Fix: catch UnknownTopicOrPartitionException alongside KafkaStorageException in DisklessLeaderEndPoint.buildFetch. The partition is added to partitionsWithError, delayed, and evicted on the next cycle once removePartitions completes. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
When a diskless topic is deleted,
ConsolidationFetcherThreaddies permanently — blockingconsolidation for all topics on that broker until restart.
Root cause
DisklessLeaderEndPoint.buildFetch()callsreplicaManager.localLogOrException(tp)whichthrows
UnknownTopicOrPartitionExceptionfor deleted partitions. OnlyKafkaStorageExceptionis caught — the exception escapes
doWork(), andShutdownableThreadexits permanently.The race:
BrokerMetadataPublishersetsmetadataCache.setImage(newImage)(line 132)before
applyDelta(line 149) callsstopPartitions→removeFetcherForPartitions. Inthat window the fetcher still has the partition in
partitionStatesbut it's gone fromallPartitions/metadataCache.The regular
ReplicaFetchernever hits this becausestopPartitionsremoves from thefetcher BEFORE
allPartitions.remove— proper ordering prevents the fetcher from everseeing a deleted partition.
The fix
Catch
UnknownTopicOrPartitionExceptionalongsideKafkaStorageExceptioninDisklessLeaderEndPoint.buildFetch():Partition goes to
partitionsWithError→ delayed → evicted on next cycle onceremovePartitionscompletes.Test plan
DisklessLeaderEndPointTest.testBuildFetchMarksPartitionWithUnknownTopicOrPartitionExceptionDisklessLeaderEndPointTest— full suite passesConsolidationFetcherThreadTest— full suite passes