From 415af0f019d0cf13f5fb71e7c7f80b6757164224 Mon Sep 17 00:00:00 2001 From: Viktor Somogyi-Vass Date: Tue, 16 Jun 2026 15:19:06 +0200 Subject: [PATCH] fix(inkless:consolidation): start consolidation when remote storage is enabled on a diskless topic The controller applies a classic-to-consolidated switch (and the untiered-diskless -> consolidated upgrade) as separate metadata deltas: remote.storage.enable=true arrives as a config-only delta with no leader or partition change, so the leader-delta path never starts the consolidation fetcher. TopicConfigHandler now kicks off consolidation for the topic's online partitions when remote storage flips from false to true on a diskless topic; the reconciler skips any partitions that are not ready yet and it is a no-op for non-consolidating topics. Co-authored-by: Cursor --- .../scala/kafka/server/ConfigHandler.scala | 17 ++++ .../server/DynamicConfigChangeTest.scala | 85 +++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index d4a4a2b709..d62b1cc29d 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -83,6 +83,23 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, rlm.onLeadershipChange((leaderPartitions.toSet: Set[TopicPartitionLog]).asJava, (followerPartitions.toSet: Set[TopicPartitionLog]).asJava, topicIds)) } + // Start consolidation on a classic -> consolidated switch. The controller applies such a + // switch's config records in separate metadata deltas: diskless.enable=true (which drives the + // classic-log seal) commits well before remote.storage.enable=true. At the seal-commit delta the + // topic is therefore not yet a consolidating diskless topic, so ReplicaManager.applyLocalLeadersDelta + // cannot start the consolidation fetcher; remote.storage.enable=true then arrives as a config-only + // delta with no partition/leader change, so the leader-delta path is never re-entered. Without this + // hook consolidation would only start incidentally on the next partition-epoch bump (e.g. an ISR + // change) -- or never. When remote storage flips on for a diskless topic it has just become + // consolidating, so kick off consolidation here for the topic's online partitions; the reconciler + // skips any that are not ready yet (still below the committed seal -- those resume via the classic + // fetcher's seal hand-off) and is a no-op for non-consolidating topics. + if (isRemoteLogEnabled && !wasRemoteLogEnabled && + replicaManager.inklessMetadataView().isDisklessTopic(topic)) { + val consolidatingPartitions = (leaderPartitions ++ followerPartitions).map(_.topicPartition).toSet + replicaManager.startConsolidationFetchersForCaughtUpClassicPartitions(consolidatingPartitions) + } + // When copy disabled, we should stop leaderCopyRLMTask, but keep expirationTask if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) { replicaManager.remoteLogManager.foreach(rlm => { diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index f0428b107a..6fe3c6bf3a 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -536,6 +536,10 @@ class DynamicConfigChangeUnitTest { when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) when(replicaManager.metadataCache).thenReturn(metadataCache) when(metadataCache.getTopicId(topic)).thenReturn(topicUuid) + // Classic-tiered topic: the consolidation hook must not fire for a non-diskless topic. + val inklessMetadataView = mock(classOf[kafka.server.metadata.InklessMetadataView]) + when(inklessMetadataView.isDisklessTopic(topic)).thenReturn(false) + when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView) val tp0 = new TopicPartition(topic, 0) val log0: UnifiedLog = mock(classOf[UnifiedLog]) @@ -564,6 +568,87 @@ class DynamicConfigChangeUnitTest { configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate, false) assertEquals(Collections.singleton(partition0), leaderPartitionsArg.getValue) assertEquals(Collections.singleton(partition1), followerPartitionsArg.getValue) + verify(replicaManager, never()).startConsolidationFetchersForCaughtUpClassicPartitions(any()) + } + + @Test + def testEnableRemoteLogStorageOnDisklessTopicStartsConsolidation(): Unit = { + // A classic -> consolidated switch flips diskless.enable=true (sealing the classic log) and + // remote.storage.enable=true in separate controller deltas, with remote storage landing after + // the seal as a config-only change. This hook starts consolidation when remote storage flips on + // for a (now consolidating) diskless topic, since the leader-delta path no longer re-runs. + val topic = "diskless-topic" + val topicUuid = Uuid.randomUuid() + val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) + val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + val metadataCache = mock(classOf[MetadataCache]) + when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) + when(replicaManager.metadataCache).thenReturn(metadataCache) + when(metadataCache.getTopicId(topic)).thenReturn(topicUuid) + val inklessMetadataView = mock(classOf[kafka.server.metadata.InklessMetadataView]) + when(inklessMetadataView.isDisklessTopic(topic)).thenReturn(true) + when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView) + + val tp0 = new TopicPartition(topic, 0) + val log0: UnifiedLog = mock(classOf[UnifiedLog]) + val partition0: Partition = mock(classOf[Partition]) + when(log0.topicPartition).thenReturn(tp0) + when(log0.remoteLogEnabled()).thenReturn(true) + when(partition0.isLeader).thenReturn(true) + when(partition0.topicPartition).thenReturn(tp0) + when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0)) + when(log0.config).thenReturn(new LogConfig(Collections.emptyMap())) + + val tp1 = new TopicPartition(topic, 1) + val log1: UnifiedLog = mock(classOf[UnifiedLog]) + val partition1: Partition = mock(classOf[Partition]) + when(log1.topicPartition).thenReturn(tp1) + when(log1.remoteLogEnabled()).thenReturn(true) + when(partition1.isLeader).thenReturn(false) + when(partition1.topicPartition).thenReturn(tp1) + when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1)) + when(log1.config).thenReturn(new LogConfig(Collections.emptyMap())) + + doNothing().when(rlm).onLeadershipChange(any(), any(), any()) + + val consolidatingArg: ArgumentCaptor[Set[TopicPartition]] = ArgumentCaptor.forClass(classOf[Set[TopicPartition]]) + doNothing().when(replicaManager).startConsolidationFetchersForCaughtUpClassicPartitions(consolidatingArg.capture()) + + val isRemoteLogEnabledBeforeUpdate = false + val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null) + configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate, false) + + // Both the leader and the follower partition are handed to the reconciler, which decides per + // partition whether they are ready to consolidate (at/above the seal) or must keep replicating. + assertEquals(Set(tp0, tp1), consolidatingArg.getValue) + } + + @Test + def testEnableRemoteLogStorageOnAlreadyEnabledDisklessTopicDoesNotStartConsolidation(): Unit = { + // Remote storage was already on (e.g. the leader-delta path already started consolidation at the + // seal commit, or this is an unrelated config edit): the false->true transition guard must keep + // this from redundantly kicking off consolidation again. + val topic = "diskless-topic" + val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) + val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) + val inklessMetadataView = mock(classOf[kafka.server.metadata.InklessMetadataView]) + when(inklessMetadataView.isDisklessTopic(topic)).thenReturn(true) + when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView) + + val tp0 = new TopicPartition(topic, 0) + val log0: UnifiedLog = mock(classOf[UnifiedLog]) + val partition0: Partition = mock(classOf[Partition]) + when(log0.topicPartition).thenReturn(tp0) + when(log0.remoteLogEnabled()).thenReturn(true) + when(partition0.isLeader).thenReturn(true) + when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0)) + when(log0.config).thenReturn(new LogConfig(Collections.emptyMap())) + + val isRemoteLogEnabledBeforeUpdate = true + val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null) + configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate, false) + verify(replicaManager, never()).startConsolidationFetchersForCaughtUpClassicPartitions(any()) } @Test