Skip to content

feat(inkless): add lag metrics for consolidated topic partitions#590

Merged
EelisK merged 2 commits into
mainfrom
EelisK/POD-2397
May 13, 2026
Merged

feat(inkless): add lag metrics for consolidated topic partitions#590
EelisK merged 2 commits into
mainfrom
EelisK/POD-2397

Conversation

@EelisK

@EelisK EelisK commented May 11, 2026

Copy link
Copy Markdown
Member

Introduce per-partition metrics to monitor the consolidation process:

  • inkless.remote.consolidation.lag: diskless log end offset minus remote log end offset
  • inkless.remote.consolidation.local.lag: diskless log end offset minus local log end offset
  • inkless.remote.consolidation.deletable.messages: messages consolidated to tiered storage that can be removed

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces per-partition JMX metrics to track diskless→tiered-storage consolidation progress (lag vs local log and vs remote tier, plus “deletable” backlog), wiring them into the consolidation fetcher lifecycle so metrics are registered/unregistered as partitions start/stop consolidating.

Changes:

  • Add ConsolidationMetrics to register per-partition gauges and update them from the consolidation fetcher thread.
  • Wire metrics lifecycle into ReplicaManager partition start/stop flows and close metrics on broker shutdown.
  • Expose UnifiedLog.highestOffsetInRemoteStorage() publicly so consolidation code can compute remote-tier lag.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java Makes highestOffsetInRemoteStorage() publicly accessible for consolidation lag calculations.
core/src/main/scala/kafka/server/ReplicaManager.scala Creates/owns ConsolidationMetrics, registers/unregisters per-partition gauges during leader/follower transitions, closes metrics on shutdown.
core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationMetrics.scala New metrics helper that manages per-partition gauges (lag/local lag/deletable).
core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala Updates consolidation metrics after processing fetched partition data.
core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherManager.scala Passes optional metrics into fetcher threads.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
@EelisK EelisK force-pushed the EelisK/POD-2397 branch from 3f16d28 to c9f9194 Compare May 12, 2026 08:42
@EelisK EelisK marked this pull request as ready for review May 12, 2026 08:52

@viktorsomogyi viktorsomogyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind creating a few tests on the ConsolidationFetcherThread level?

replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
consolidationFetcherManager.foreach(_.removeFetcherForPartitions(
partitions.filter(p => _inklessMetadataView.isConsolidatingDisklessTopic(p.topic))))
val consolidatingPartitionsToStop = partitions.filter(p => _inklessMetadataView.isConsolidatingDisklessTopic(p.topic))

@viktorsomogyi viktorsomogyi May 12, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the && config.disklessRemoteStorageConsolidationEnabled check too to filter out whether the feature has been enabled or not.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this outside of the filter scope since it's not a property of partitions:

    val consolidatingPartitionsToStop = if (config.disklessRemoteStorageConsolidationEnabled)
      partitions.filter(p => _inklessMetadataView.isConsolidatingDisklessTopic(p.topic))
      else Set[TopicPartition]()

@EelisK EelisK force-pushed the EelisK/POD-2397 branch from c9f9194 to 4803af2 Compare May 13, 2026 08:01
@EelisK

EelisK commented May 13, 2026

Copy link
Copy Markdown
Member Author

@viktorsomogyi Thanks for the review!

Would you mind creating a few tests on the ConsolidationFetcherThread level?

Added a few tests 👍

@viktorsomogyi viktorsomogyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, LGTM. We can merge it after the CI completes green.

@EelisK EelisK merged commit 803abc2 into main May 13, 2026
5 checks passed
@EelisK EelisK deleted the EelisK/POD-2397 branch May 13, 2026 08:30
giuseppelillo pushed a commit that referenced this pull request May 15, 2026
Introduce per-partition metrics to monitor the consolidation process:
- `inkless.remote.consolidation.lag`: diskless log end offset minus remote log end offset
- `inkless.remote.consolidation.local.lag`: diskless log end offset minus local log end offset
- `inkless.remote.consolidation.deletable.messages`: messages consolidated to tiered storage that can be removed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants