feat(inkless): POD-2392 Implement consolidating partition tracking#567
Conversation
This commit implements ConsolidationFetcherManager and ConsolidationFetcherThread in ReplicaManager to track diskless partitions that are being consolidated into classic Kafka logs.
135edb8 to
d257e1c
Compare
There was a problem hiding this comment.
Pull request overview
Implements initial plumbing for tracking and fetching data for “consolidating” diskless partitions (diskless topics with remote storage enabled), wiring a new ConsolidationFetcherManager/ConsolidationFetcherThread into ReplicaManager leadership transitions.
Changes:
- Added
ConsolidationFetcherManager,ConsolidationFetcherThread, and aDisklessLeaderEndPointimplementation for consolidation fetcher threads. - Extended
MetadataView/InklessMetadataViewwithisConsolidatingDisklessTopicand updatedReplicaManagerto start/stop consolidation fetchers during leadership changes and shutdown. - Added
ConcatenatedRecords#toMemoryRecords()plus unit tests, and updated RAT exclusions for the new inkless core Scala package.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/test/java/io/aiven/inkless/consume/ConcatenatedRecordsTest.java | Adds unit tests for ConcatenatedRecords#toMemoryRecords() behavior. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/MetadataView.java | Adds isConsolidatingDisklessTopic API. |
| storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java | Adds toMemoryRecords() to materialize concatenated batches into a single MemoryRecords. |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Wires consolidation fetcher manager into partition stop/offline/shutdown and leader/follower transitions. |
| core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala | Implements isConsolidatingDisklessTopic. |
| core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala | Introduces a LeaderEndPoint for consolidation fetchers (currently stubbed). |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala | Adds toMemoryRecords override to handle ConcatenatedRecords. |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherManager.scala | Adds a fetcher manager to run consolidation fetchers. |
| build.gradle | Excludes core/src/main/scala/io/aiven/inkless/** from RAT checks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (isConsolidatingDisklessTopic) { | ||
| partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, topicId = partition.topicId, targetLogDirectoryId = partitionAssignedDirectoryId) | ||
| consolidatingDisklessPartitionsToStartFetching.put(tp, partition) | ||
| } |
There was a problem hiding this comment.
This could be done in its own else if condition to improve readability, even though it requires duplicating the code to get partitionAssignedDirectoryId.
There was a problem hiding this comment.
Yea, it can be separated if you think that's better for readability. I also considered that, however I wanted the least amount of deviation from the classic code, so it seemed safer and easier to hook into this branch.
If we duplicate this code part, it carries the risk that we might miss changes in this branch in case of rebases.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 10 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
) * feat(inkless): POD-2392 Implement consolidating partition tracking This commit implements ConsolidationFetcherManager and ConsolidationFetcherThread in ReplicaManager to track diskless partitions that are being consolidated into classic Kafka logs. * Address copilot review comments * Use full leader set to remove from fetchers * Update core/src/main/scala/kafka/server/ReplicaManager.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Address copilot review comments --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit implements ConsolidationFetcherManager and ConsolidationFetcherThread in ReplicaManager to track diskless partitions that are being consolidated into classic Kafka logs.