Skip to content

feat(metadata:inkless): Log metrics for Inkless#511

Draft
viktorsomogyi wants to merge 1 commit intomainfrom
svv/diskless-log-metrics
Draft

feat(metadata:inkless): Log metrics for Inkless#511
viktorsomogyi wants to merge 1 commit intomainfrom
svv/diskless-log-metrics

Conversation

@viktorsomogyi
Copy link

Add JMX metrics for diskless (Inkless) partitions that mirror the classic kafka.log:type=Log metrics (LogStartOffset, LogEndOffset, Size). Values are sourced from the Control Plane and tagged with topic and partition so existing JMX/Prometheus tooling and dashboards work unchanged.

  • Add InklessLogMetrics: batch getLogInfo, cache per partition, register/remove gauges for diskless partitions using kafka.log/Log and LogMetricNames (LOG_START_OFFSET, LOG_END_OFFSET, SIZE).
  • Wire into ReplicaManager: create from SharedState, schedule refresh every 30s, close before SharedState on shutdown.
  • Add InklessLogMetricsTest (gauges reflect getLogInfo, partition removal drops gauges, close removes all).

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds JMX metrics exposure for Inkless (diskless) partitions so existing Kafka kafka.log:type=Log dashboards can track start/end offsets and size for diskless partitions via Control Plane–sourced values.

Changes:

  • Introduces InklessLogMetrics to register/remove kafka.log:type=Log gauges (start offset, end offset, size) for diskless partitions and refresh them periodically.
  • Wires periodic refresh and shutdown cleanup into ReplicaManager.
  • Adds unit tests validating gauge registration, updates, and cleanup.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java New runnable/closeable component that fetches log info from the Control Plane and exposes it via Kafka/Yammer metrics.
storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java Tests gauge creation, value propagation, partition removal cleanup, and close cleanup.
core/src/main/scala/kafka/server/ReplicaManager.scala Instantiates InklessLogMetrics, schedules refresh every 30s, and closes it on shutdown.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

// There are internal delays in case of errors or absence of work items, no need for extra delays here.
scheduler.schedule("inkless-file-merger", () => inklessFileMerger.foreach(_.run()), sharedState.config().fileMergerInterval().toMillis, sharedState.config().fileMergerInterval().toMillis)

scheduler.schedule("inkless-log-metrics", () => inklessLogMetrics.foreach(_.run()), config.logInitialTaskDelayMs, 30000L)
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InklessLogMetrics is refreshed via a hard-coded 30000L interval. Since other Inkless background tasks take their intervals from config (fileCleanerInterval, fileMergerInterval), consider making this refresh interval configurable (or at least define it as a named constant/Duration) to avoid magic numbers and to allow tuning in production if the Control Plane load or desired freshness changes.

Copilot uses AI. Check for mistakes.
Comment on lines +84 to +105
final List<TopicIdPartition> currentOrdered = new ArrayList<>(currentSet);
final List<GetLogInfoRequest> requests = new ArrayList<>();
for (final TopicIdPartition tp : currentOrdered) {
requests.add(new GetLogInfoRequest(tp.topicId(), tp.partition()));
}
final List<GetLogInfoResponse> responses = controlPlane.getLogInfo(requests);

final Map<TopicIdPartition, LogInfoSnapshot> newCache = new ConcurrentHashMap<>();
for (int i = 0; i < currentOrdered.size(); i++) {
final TopicIdPartition tp = currentOrdered.get(i);
final GetLogInfoResponse r = responses.get(i);
final long logStartOffset = r.errors() != Errors.NONE
? GetLogInfoResponse.INVALID_OFFSET
: r.logStartOffset();
final long highWatermark = r.errors() != Errors.NONE
? GetLogInfoResponse.INVALID_OFFSET
: r.highWatermark();
final long byteSize = r.errors() != Errors.NONE
? GetLogInfoResponse.INVALID_BYTE_SIZE
: r.byteSize();
newCache.put(tp, new LogInfoSnapshot(logStartOffset, highWatermark, byteSize));
}
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getLogInfo responses are consumed positionally (responses.get(i)), but ControlPlane#getLogInfo does not document (or enforce) an in-order response contract the way findBatches does. The Postgres implementation builds a VALUES table without an explicit ordering, which can legally return rows in a different order, leading to metrics being attributed to the wrong partition. Consider changing the API to return keyed results (include topicId/partition in the response or return a Map<TopicIdPartition, ...>), or add an explicit request index and guarantee ordering end-to-end; at minimum, validate responses.size() before indexing.

Copilot uses AI. Check for mistakes.
Comment on lines +135 to +138
metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET, () -> {
final LogInfoSnapshot s = cache.get(tp);
return s != null ? s.highWatermark : GetLogInfoResponse.INVALID_OFFSET;
}, tags);
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogMetricNames.LOG_END_OFFSET in Kafka’s UnifiedLog is logEndOffset (LEO), but this gauge is currently wired to highWatermark. If Inkless only has a high watermark available, exposing it under the LogEndOffset name will make existing dashboards/alerts misleading. Either source/report the actual log end offset from the Control Plane, or expose high watermark under a distinct metric name/tag so semantics stay consistent with classic kafka.log:type=Log metrics.

Copilot uses AI. Check for mistakes.
InklessLogMetrics(final ControlPlane controlPlane, final MetadataView metadataView) {
this.controlPlane = controlPlane;
this.metadataView = metadataView;
run();
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test-only constructor calls run() immediately. In production this means new InklessLogMetrics(sharedState) will synchronously hit the Control Plane during ReplicaManager initialization, potentially increasing broker startup latency or failing startup if the Control Plane is temporarily unavailable. Consider making construction side-effect free (register gauges + schedule the first refresh) and let the scheduler trigger the initial fetch, or provide an explicit start()/init() step called from the scheduled task.

Suggested change
run();

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant