Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ __pycache__

_data/

.inkless-sync/
.inkless-sync/

core/data/
19 changes: 16 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ class ReplicaManager(val config: KafkaConfig,
private[kafka] val partitionCount = metricsGroup.newGauge(PartitionCountMetricName, () => allPartitions.size)
metricsGroup.newGauge(OfflineReplicaCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(UnderReplicatedPartitionsMetricName, () => underReplicatedPartitionCount)
metricsGroup.newGauge(UnderMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isUnderMinIsr))
metricsGroup.newGauge(AtMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isAtMinIsr))
metricsGroup.newGauge(UnderMinIsrPartitionCountMetricName, () => underMinIsrPartitionCount)
metricsGroup.newGauge(AtMinIsrPartitionCountMetricName, () => atMinIsrPartitionCount)
metricsGroup.newGauge(ReassigningPartitionsMetricName, () => reassigningPartitionsCount)
metricsGroup.newGauge(SealedPartitionsCountMetricName, () => sealedPartitionsCount)
metricsGroup.newGauge(PartitionsWithLateTransactionsCountMetricName, () => lateTransactionsCount)
Expand All @@ -416,7 +416,20 @@ class ReplicaManager(val config: KafkaConfig,
val isrShrinkRate: Meter = metricsGroup.newMeter(IsrShrinksPerSecMetricName, "shrinks", TimeUnit.SECONDS)
val failedIsrUpdatesRate: Meter = metricsGroup.newMeter(FailedIsrUpdatesPerSecMetricName, "failedUpdates", TimeUnit.SECONDS)

def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated)
private def isConsolidatingPartition(partition: Partition): Boolean =
config.disklessRemoteStorageConsolidationEnabled && _inklessMetadataView.isConsolidatingDisklessTopic(partition.topic)

def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count { partition =>
partition.isUnderReplicated && !isConsolidatingPartition(partition)
}

def underMinIsrPartitionCount: Int = leaderPartitionsIterator.count { partition =>
partition.isUnderMinIsr && !isConsolidatingPartition(partition)
}

def atMinIsrPartitionCount: Int = leaderPartitionsIterator.count { partition =>
partition.isAtMinIsr && !isConsolidatingPartition(partition)
}
Comment thread
Copilot marked this conversation as resolved.

def startHighWatermarkCheckPointThread(): Unit = {
if (highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
Expand Down
107 changes: 105 additions & 2 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kafka.server.share.DelayedShareFetch
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import kafka.log.LogManager
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.compress.Compression
Expand Down Expand Up @@ -4974,7 +4975,8 @@ class ReplicaManagerInklessTest {
consolidatingDisklessTopics: Set[String] = Set.empty,
mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None,
inklessSharedStateEnabled: Boolean = true,
initDisklessLogManager: Option[InitDisklessLogManager] = None
initDisklessLogManager: Option[InitDisklessLogManager] = None,
defaultLogConfig: Option[LogConfig] = None
): ReplicaManager = {
val props = TestUtils.createBrokerConfig(1, logDirCount = 2)
if (disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled) {
Expand All @@ -4989,7 +4991,7 @@ class ReplicaManagerInklessTest {
(disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled).toString
)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties()))
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), defaultLogConfig.getOrElse(new LogConfig(new Properties())))
val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS)
when(sharedState.time()).thenReturn(Time.SYSTEM)
val inklessConfigMap = new util.HashMap[String, Object]()
Expand Down Expand Up @@ -5568,4 +5570,105 @@ class ReplicaManagerInklessTest {
consolidationCtor.close()
}
}

@Test
def testUrpMetricsExcludeConsolidatingDisklessTopicsAcrossStates(): Unit = {
val consolidatingTopic = "consolidating-topic"
val classicTopic = "classic-topic"
val consolidatingTopicId = Uuid.randomUuid()
val classicTopicId = Uuid.randomUuid()

val logProps = new Properties()
logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")

val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = {
case (mock, _) =>
when(mock.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
}
val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit)
try {
val replicaManager = createReplicaManager(
disklessTopics = List(consolidatingTopic),
disklessRemoteStorageConsolidationEnabled = true,
consolidatingDisklessTopics = Set(consolidatingTopic),
topicIdMapping = Map(consolidatingTopic -> consolidatingTopicId, classicTopic -> classicTopicId),
defaultLogConfig = Some(new LogConfig(logProps)),
)
try {
// --- State 1: ISR=[1,2,3] (healthy for min.isr=2, RF=3) ---
// Under-replicated: no (ISR == RF)
// At min ISR: no (3 != 2)
// Under min ISR: no (3 > 2)
val initialDelta = new TopicsDelta(TopicsImage.EMPTY)
initialDelta.replay(new TopicRecord().setName(consolidatingTopic).setTopicId(consolidatingTopicId))
initialDelta.replay(new PartitionRecord()
.setPartitionId(0).setTopicId(consolidatingTopicId)
.setReplicas(java.util.List.of(1, 2, 3)).setIsr(java.util.List.of(1, 2, 3))
.setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0))
initialDelta.replay(new TopicRecord().setName(classicTopic).setTopicId(classicTopicId))
initialDelta.replay(new PartitionRecord()
.setPartitionId(0).setTopicId(classicTopicId)
.setReplicas(java.util.List.of(1, 2, 3)).setIsr(java.util.List.of(1, 2, 3))
.setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0))

val initialImage = imageFromTopics(initialDelta.apply())
replicaManager.applyDelta(initialDelta, initialImage)

assertEquals(0, replicaManager.underReplicatedPartitionCount,
"No partitions should be under-replicated when ISR == RF")
assertEquals(0, replicaManager.atMinIsrPartitionCount,
"No partitions should be at-min-ISR when ISR(3) > min.isr(2)")
assertEquals(0, replicaManager.underMinIsrPartitionCount,
"No partitions should be under-min-ISR when ISR(3) > min.isr(2)")

// --- State 2: ISR shrinks to [1,2] ---
// Under-replicated: yes (ISR=2 < RF=3)
// At min ISR: yes (ISR=2 == min.isr=2)
// Under min ISR: no (ISR=2 >= min.isr=2)
val shrinkDelta1 = new TopicsDelta(initialImage.topics())
shrinkDelta1.replay(new PartitionChangeRecord()
.setTopicId(consolidatingTopicId).setPartitionId(0)
.setIsr(java.util.List.of(1, 2)).setLeader(1))
shrinkDelta1.replay(new PartitionChangeRecord()
.setTopicId(classicTopicId).setPartitionId(0)
.setIsr(java.util.List.of(1, 2)).setLeader(1))

val image2 = imageFromTopics(shrinkDelta1.apply())
replicaManager.applyDelta(shrinkDelta1, image2)

assertEquals(1, replicaManager.underReplicatedPartitionCount,
"Only classic partition should count as under-replicated; consolidating excluded")
assertEquals(1, replicaManager.atMinIsrPartitionCount,
"Only classic partition should count as at-min-ISR; consolidating excluded")
assertEquals(0, replicaManager.underMinIsrPartitionCount,
"No partitions under-min-ISR when ISR(2) == min.isr(2)")

// --- State 3: ISR shrinks to [1] ---
// Under-replicated: yes (ISR=1 < RF=3)
// At min ISR: no (ISR=1 != min.isr=2)
// Under min ISR: yes (ISR=1 < min.isr=2)
val shrinkDelta2 = new TopicsDelta(image2.topics())
shrinkDelta2.replay(new PartitionChangeRecord()
.setTopicId(consolidatingTopicId).setPartitionId(0)
.setIsr(java.util.List.of(1)).setLeader(1))
shrinkDelta2.replay(new PartitionChangeRecord()
.setTopicId(classicTopicId).setPartitionId(0)
.setIsr(java.util.List.of(1)).setLeader(1))

val image3 = imageFromTopics(shrinkDelta2.apply())
replicaManager.applyDelta(shrinkDelta2, image3)

assertEquals(1, replicaManager.underReplicatedPartitionCount,
"Only classic partition should count as under-replicated; consolidating excluded")
assertEquals(0, replicaManager.atMinIsrPartitionCount,
"No partitions at-min-ISR when ISR(1) < min.isr(2)")
assertEquals(1, replicaManager.underMinIsrPartitionCount,
"Only classic partition should count as under-min-ISR; consolidating excluded")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
} finally {
consolidationCtor.close()
}
}
}
Loading