feat(metadata:inkless): Log metrics for Inkless#511
feat(metadata:inkless): Log metrics for Inkless#511viktorsomogyi wants to merge 1 commit intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds JMX metrics exposure for Inkless (diskless) partitions so existing Kafka kafka.log:type=Log dashboards can track start/end offsets and size for diskless partitions via Control Plane–sourced values.
Changes:
- Introduces
InklessLogMetricsto register/removekafka.log:type=Loggauges (start offset, end offset, size) for diskless partitions and refresh them periodically. - Wires periodic refresh and shutdown cleanup into
ReplicaManager. - Adds unit tests validating gauge registration, updates, and cleanup.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java |
New runnable/closeable component that fetches log info from the Control Plane and exposes it via Kafka/Yammer metrics. |
storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java |
Tests gauge creation, value propagation, partition removal cleanup, and close cleanup. |
core/src/main/scala/kafka/server/ReplicaManager.scala |
Instantiates InklessLogMetrics, schedules refresh every 30s, and closes it on shutdown. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // There are internal delays in case of errors or absence of work items, no need for extra delays here. | ||
| scheduler.schedule("inkless-file-merger", () => inklessFileMerger.foreach(_.run()), sharedState.config().fileMergerInterval().toMillis, sharedState.config().fileMergerInterval().toMillis) | ||
|
|
||
| scheduler.schedule("inkless-log-metrics", () => inklessLogMetrics.foreach(_.run()), config.logInitialTaskDelayMs, 30000L) |
There was a problem hiding this comment.
InklessLogMetrics is refreshed via a hard-coded 30000L interval. Since other Inkless background tasks take their intervals from config (fileCleanerInterval, fileMergerInterval), consider making this refresh interval configurable (or at least define it as a named constant/Duration) to avoid magic numbers and to allow tuning in production if the Control Plane load or desired freshness changes.
| final List<TopicIdPartition> currentOrdered = new ArrayList<>(currentSet); | ||
| final List<GetLogInfoRequest> requests = new ArrayList<>(); | ||
| for (final TopicIdPartition tp : currentOrdered) { | ||
| requests.add(new GetLogInfoRequest(tp.topicId(), tp.partition())); | ||
| } | ||
| final List<GetLogInfoResponse> responses = controlPlane.getLogInfo(requests); | ||
|
|
||
| final Map<TopicIdPartition, LogInfoSnapshot> newCache = new ConcurrentHashMap<>(); | ||
| for (int i = 0; i < currentOrdered.size(); i++) { | ||
| final TopicIdPartition tp = currentOrdered.get(i); | ||
| final GetLogInfoResponse r = responses.get(i); | ||
| final long logStartOffset = r.errors() != Errors.NONE | ||
| ? GetLogInfoResponse.INVALID_OFFSET | ||
| : r.logStartOffset(); | ||
| final long highWatermark = r.errors() != Errors.NONE | ||
| ? GetLogInfoResponse.INVALID_OFFSET | ||
| : r.highWatermark(); | ||
| final long byteSize = r.errors() != Errors.NONE | ||
| ? GetLogInfoResponse.INVALID_BYTE_SIZE | ||
| : r.byteSize(); | ||
| newCache.put(tp, new LogInfoSnapshot(logStartOffset, highWatermark, byteSize)); | ||
| } |
There was a problem hiding this comment.
getLogInfo responses are consumed positionally (responses.get(i)), but ControlPlane#getLogInfo does not document (or enforce) an in-order response contract the way findBatches does. The Postgres implementation builds a VALUES table without an explicit ordering, which can legally return rows in a different order, leading to metrics being attributed to the wrong partition. Consider changing the API to return keyed results (include topicId/partition in the response or return a Map<TopicIdPartition, ...>), or add an explicit request index and guarantee ordering end-to-end; at minimum, validate responses.size() before indexing.
| metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET, () -> { | ||
| final LogInfoSnapshot s = cache.get(tp); | ||
| return s != null ? s.highWatermark : GetLogInfoResponse.INVALID_OFFSET; | ||
| }, tags); |
There was a problem hiding this comment.
LogMetricNames.LOG_END_OFFSET in Kafka’s UnifiedLog is logEndOffset (LEO), but this gauge is currently wired to highWatermark. If Inkless only has a high watermark available, exposing it under the LogEndOffset name will make existing dashboards/alerts misleading. Either source/report the actual log end offset from the Control Plane, or expose high watermark under a distinct metric name/tag so semantics stay consistent with classic kafka.log:type=Log metrics.
| InklessLogMetrics(final ControlPlane controlPlane, final MetadataView metadataView) { | ||
| this.controlPlane = controlPlane; | ||
| this.metadataView = metadataView; | ||
| run(); |
There was a problem hiding this comment.
The test-only constructor calls run() immediately. In production this means new InklessLogMetrics(sharedState) will synchronously hit the Control Plane during ReplicaManager initialization, potentially increasing broker startup latency or failing startup if the Control Plane is temporarily unavailable. Consider making construction side-effect free (register gauges + schedule the first refresh) and let the scheduler trigger the initial fetch, or provide an explicit start()/init() step called from the scheduled task.
| run(); |
Add JMX metrics for diskless (Inkless) partitions that mirror the classic kafka.log:type=Log metrics (LogStartOffset, LogEndOffset, Size). Values are sourced from the Control Plane and tagged with topic and partition so existing JMX/Prometheus tooling and dashboards work unchanged.