From 786396a12c756ed0e49e3e78f2f61a466d61da8b Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Jun 2026 11:10:02 +0300 Subject: [PATCH 1/2] chore: add integration test dir to ignored --- .gitignore | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index fdd22bf7cc3..76a7d627a68 100644 --- a/.gitignore +++ b/.gitignore @@ -64,4 +64,6 @@ __pycache__ _data/ -.inkless-sync/ \ No newline at end of file +.inkless-sync/ + +core/data/ \ No newline at end of file From 17e48b3030f5e36ab30e3905b29aac51ea847aba Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Jun 2026 10:57:57 +0300 Subject: [PATCH 2/2] fix(inkless:metrics): exclude consolidating partitions from URP metrics Consolidating diskless partitions never materialize followers (ReplicaManager skips getOrCreatePartition for them), so they always trip URP-related gauges (underReplicatedPartitionCount, atMinIsrPartitionCount, underMinIsrPartitionCount) despite having dedicated ConsolidationLocalLag/TotalLag metrics for observability. Co-Authored-By: Claude Opus 4.6 --- .../scala/kafka/server/ReplicaManager.scala | 19 +++- .../server/ReplicaManagerInklessTest.scala | 107 +++++++++++++++++- 2 files changed, 121 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 506244089be..00bbad241e6 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -394,8 +394,8 @@ class ReplicaManager(val config: KafkaConfig, private[kafka] val partitionCount = metricsGroup.newGauge(PartitionCountMetricName, () => allPartitions.size) metricsGroup.newGauge(OfflineReplicaCountMetricName, () => offlinePartitionCount) metricsGroup.newGauge(UnderReplicatedPartitionsMetricName, () => underReplicatedPartitionCount) - metricsGroup.newGauge(UnderMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isUnderMinIsr)) - metricsGroup.newGauge(AtMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isAtMinIsr)) + metricsGroup.newGauge(UnderMinIsrPartitionCountMetricName, () => underMinIsrPartitionCount) + metricsGroup.newGauge(AtMinIsrPartitionCountMetricName, () => atMinIsrPartitionCount) metricsGroup.newGauge(ReassigningPartitionsMetricName, () => reassigningPartitionsCount) metricsGroup.newGauge(SealedPartitionsCountMetricName, () => sealedPartitionsCount) metricsGroup.newGauge(PartitionsWithLateTransactionsCountMetricName, () => lateTransactionsCount) @@ -416,7 +416,20 @@ class ReplicaManager(val config: KafkaConfig, val isrShrinkRate: Meter = metricsGroup.newMeter(IsrShrinksPerSecMetricName, "shrinks", TimeUnit.SECONDS) val failedIsrUpdatesRate: Meter = metricsGroup.newMeter(FailedIsrUpdatesPerSecMetricName, "failedUpdates", TimeUnit.SECONDS) - def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated) + private def isConsolidatingPartition(partition: Partition): Boolean = + config.disklessRemoteStorageConsolidationEnabled && _inklessMetadataView.isConsolidatingDisklessTopic(partition.topic) + + def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count { partition => + partition.isUnderReplicated && !isConsolidatingPartition(partition) + } + + def underMinIsrPartitionCount: Int = leaderPartitionsIterator.count { partition => + partition.isUnderMinIsr && !isConsolidatingPartition(partition) + } + + def atMinIsrPartitionCount: Int = leaderPartitionsIterator.count { partition => + partition.isAtMinIsr && !isConsolidatingPartition(partition) + } def startHighWatermarkCheckPointThread(): Unit = { if (highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala index f40ffd3e152..3d89207bd01 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala @@ -30,6 +30,7 @@ import kafka.server.share.DelayedShareFetch import kafka.utils.TestUtils import kafka.utils.TestUtils.waitUntilTrue import kafka.log.LogManager +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.compress.Compression @@ -4974,7 +4975,8 @@ class ReplicaManagerInklessTest { consolidatingDisklessTopics: Set[String] = Set.empty, mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None, inklessSharedStateEnabled: Boolean = true, - initDisklessLogManager: Option[InitDisklessLogManager] = None + initDisklessLogManager: Option[InitDisklessLogManager] = None, + defaultLogConfig: Option[LogConfig] = None ): ReplicaManager = { val props = TestUtils.createBrokerConfig(1, logDirCount = 2) if (disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled) { @@ -4989,7 +4991,7 @@ class ReplicaManagerInklessTest { (disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled).toString ) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties())) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), defaultLogConfig.getOrElse(new LogConfig(new Properties()))) val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS) when(sharedState.time()).thenReturn(Time.SYSTEM) val inklessConfigMap = new util.HashMap[String, Object]() @@ -5568,4 +5570,105 @@ class ReplicaManagerInklessTest { consolidationCtor.close() } } + + @Test + def testUrpMetricsExcludeConsolidatingDisklessTopicsAcrossStates(): Unit = { + val consolidatingTopic = "consolidating-topic" + val classicTopic = "classic-topic" + val consolidatingTopicId = Uuid.randomUuid() + val classicTopicId = Uuid.randomUuid() + + val logProps = new Properties() + logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + + val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = { + case (mock, _) => + when(mock.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) + } + val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit) + try { + val replicaManager = createReplicaManager( + disklessTopics = List(consolidatingTopic), + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(consolidatingTopic), + topicIdMapping = Map(consolidatingTopic -> consolidatingTopicId, classicTopic -> classicTopicId), + defaultLogConfig = Some(new LogConfig(logProps)), + ) + try { + // --- State 1: ISR=[1,2,3] (healthy for min.isr=2, RF=3) --- + // Under-replicated: no (ISR == RF) + // At min ISR: no (3 != 2) + // Under min ISR: no (3 > 2) + val initialDelta = new TopicsDelta(TopicsImage.EMPTY) + initialDelta.replay(new TopicRecord().setName(consolidatingTopic).setTopicId(consolidatingTopicId)) + initialDelta.replay(new PartitionRecord() + .setPartitionId(0).setTopicId(consolidatingTopicId) + .setReplicas(java.util.List.of(1, 2, 3)).setIsr(java.util.List.of(1, 2, 3)) + .setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0)) + initialDelta.replay(new TopicRecord().setName(classicTopic).setTopicId(classicTopicId)) + initialDelta.replay(new PartitionRecord() + .setPartitionId(0).setTopicId(classicTopicId) + .setReplicas(java.util.List.of(1, 2, 3)).setIsr(java.util.List.of(1, 2, 3)) + .setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0)) + + val initialImage = imageFromTopics(initialDelta.apply()) + replicaManager.applyDelta(initialDelta, initialImage) + + assertEquals(0, replicaManager.underReplicatedPartitionCount, + "No partitions should be under-replicated when ISR == RF") + assertEquals(0, replicaManager.atMinIsrPartitionCount, + "No partitions should be at-min-ISR when ISR(3) > min.isr(2)") + assertEquals(0, replicaManager.underMinIsrPartitionCount, + "No partitions should be under-min-ISR when ISR(3) > min.isr(2)") + + // --- State 2: ISR shrinks to [1,2] --- + // Under-replicated: yes (ISR=2 < RF=3) + // At min ISR: yes (ISR=2 == min.isr=2) + // Under min ISR: no (ISR=2 >= min.isr=2) + val shrinkDelta1 = new TopicsDelta(initialImage.topics()) + shrinkDelta1.replay(new PartitionChangeRecord() + .setTopicId(consolidatingTopicId).setPartitionId(0) + .setIsr(java.util.List.of(1, 2)).setLeader(1)) + shrinkDelta1.replay(new PartitionChangeRecord() + .setTopicId(classicTopicId).setPartitionId(0) + .setIsr(java.util.List.of(1, 2)).setLeader(1)) + + val image2 = imageFromTopics(shrinkDelta1.apply()) + replicaManager.applyDelta(shrinkDelta1, image2) + + assertEquals(1, replicaManager.underReplicatedPartitionCount, + "Only classic partition should count as under-replicated; consolidating excluded") + assertEquals(1, replicaManager.atMinIsrPartitionCount, + "Only classic partition should count as at-min-ISR; consolidating excluded") + assertEquals(0, replicaManager.underMinIsrPartitionCount, + "No partitions under-min-ISR when ISR(2) == min.isr(2)") + + // --- State 3: ISR shrinks to [1] --- + // Under-replicated: yes (ISR=1 < RF=3) + // At min ISR: no (ISR=1 != min.isr=2) + // Under min ISR: yes (ISR=1 < min.isr=2) + val shrinkDelta2 = new TopicsDelta(image2.topics()) + shrinkDelta2.replay(new PartitionChangeRecord() + .setTopicId(consolidatingTopicId).setPartitionId(0) + .setIsr(java.util.List.of(1)).setLeader(1)) + shrinkDelta2.replay(new PartitionChangeRecord() + .setTopicId(classicTopicId).setPartitionId(0) + .setIsr(java.util.List.of(1)).setLeader(1)) + + val image3 = imageFromTopics(shrinkDelta2.apply()) + replicaManager.applyDelta(shrinkDelta2, image3) + + assertEquals(1, replicaManager.underReplicatedPartitionCount, + "Only classic partition should count as under-replicated; consolidating excluded") + assertEquals(0, replicaManager.atMinIsrPartitionCount, + "No partitions at-min-ISR when ISR(1) < min.isr(2)") + assertEquals(1, replicaManager.underMinIsrPartitionCount, + "Only classic partition should count as under-min-ISR; consolidating excluded") + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } finally { + consolidationCtor.close() + } + } }