Skip to content

feat(inkless): POD-2392 Implement consolidating partition tracking#567

Merged
giuseppelillo merged 5 commits into
mainfrom
svv/ts-unification-fetchers
Apr 22, 2026
Merged

feat(inkless): POD-2392 Implement consolidating partition tracking#567
giuseppelillo merged 5 commits into
mainfrom
svv/ts-unification-fetchers

Conversation

@viktorsomogyi

Copy link
Copy Markdown
Contributor

This commit implements ConsolidationFetcherManager and ConsolidationFetcherThread in ReplicaManager to track diskless partitions that are being consolidated into classic Kafka logs.

This commit implements ConsolidationFetcherManager and ConsolidationFetcherThread
in ReplicaManager to track diskless partitions that are being
consolidated into classic Kafka logs.
@viktorsomogyi viktorsomogyi force-pushed the svv/ts-unification-fetchers branch from 135edb8 to d257e1c Compare April 9, 2026 12:58
@viktorsomogyi viktorsomogyi marked this pull request as ready for review April 10, 2026 08:14
@giuseppelillo giuseppelillo requested a review from Copilot April 10, 2026 13:17

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements initial plumbing for tracking and fetching data for “consolidating” diskless partitions (diskless topics with remote storage enabled), wiring a new ConsolidationFetcherManager/ConsolidationFetcherThread into ReplicaManager leadership transitions.

Changes:

  • Added ConsolidationFetcherManager, ConsolidationFetcherThread, and a DisklessLeaderEndPoint implementation for consolidation fetcher threads.
  • Extended MetadataView/InklessMetadataView with isConsolidatingDisklessTopic and updated ReplicaManager to start/stop consolidation fetchers during leadership changes and shutdown.
  • Added ConcatenatedRecords#toMemoryRecords() plus unit tests, and updated RAT exclusions for the new inkless core Scala package.

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
storage/inkless/src/test/java/io/aiven/inkless/consume/ConcatenatedRecordsTest.java Adds unit tests for ConcatenatedRecords#toMemoryRecords() behavior.
storage/inkless/src/main/java/io/aiven/inkless/control_plane/MetadataView.java Adds isConsolidatingDisklessTopic API.
storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java Adds toMemoryRecords() to materialize concatenated batches into a single MemoryRecords.
core/src/main/scala/kafka/server/ReplicaManager.scala Wires consolidation fetcher manager into partition stop/offline/shutdown and leader/follower transitions.
core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala Implements isConsolidatingDisklessTopic.
core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala Introduces a LeaderEndPoint for consolidation fetchers (currently stubbed).
core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala Adds toMemoryRecords override to handle ConcatenatedRecords.
core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherManager.scala Adds a fetcher manager to run consolidation fetchers.
build.gradle Excludes core/src/main/scala/io/aiven/inkless/** from RAT checks.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread build.gradle
Comment on lines +2859 to +2862
if (isConsolidatingDisklessTopic) {
partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, topicId = partition.topicId, targetLogDirectoryId = partitionAssignedDirectoryId)
consolidatingDisklessPartitionsToStartFetching.put(tp, partition)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be done in its own else if condition to improve readability, even though it requires duplicating the code to get partitionAssignedDirectoryId.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it can be separated if you think that's better for readability. I also considered that, however I wanted the least amount of deviation from the classic code, so it seemed safer and easier to hook into this branch.
If we duplicate this code part, it carries the risk that we might miss changes in this branch in case of rebases.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 10 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
viktorsomogyi and others added 2 commits April 22, 2026 09:50
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@giuseppelillo giuseppelillo merged commit 6bca3e6 into main Apr 22, 2026
4 checks passed
@giuseppelillo giuseppelillo deleted the svv/ts-unification-fetchers branch April 22, 2026 08:33
giuseppelillo pushed a commit that referenced this pull request May 6, 2026
)

* feat(inkless): POD-2392 Implement consolidating partition tracking

This commit implements ConsolidationFetcherManager and ConsolidationFetcherThread
in ReplicaManager to track diskless partitions that are being
consolidated into classic Kafka logs.

* Address copilot review comments

* Use full leader set to remove from fetchers

* Update core/src/main/scala/kafka/server/ReplicaManager.scala

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Address copilot review comments

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants