Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,23 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
rlm.onLeadershipChange((leaderPartitions.toSet: Set[TopicPartitionLog]).asJava, (followerPartitions.toSet: Set[TopicPartitionLog]).asJava, topicIds))
}

// Start consolidation on a classic -> consolidated switch. The controller applies such a
// switch's config records in separate metadata deltas: diskless.enable=true (which drives the
// classic-log seal) commits well before remote.storage.enable=true. At the seal-commit delta the
// topic is therefore not yet a consolidating diskless topic, so ReplicaManager.applyLocalLeadersDelta
// cannot start the consolidation fetcher; remote.storage.enable=true then arrives as a config-only
// delta with no partition/leader change, so the leader-delta path is never re-entered. Without this
// hook consolidation would only start incidentally on the next partition-epoch bump (e.g. an ISR
// change) -- or never. When remote storage flips on for a diskless topic it has just become
// consolidating, so kick off consolidation here for the topic's online partitions; the reconciler
// skips any that are not ready yet (still below the committed seal -- those resume via the classic
// fetcher's seal hand-off) and is a no-op for non-consolidating topics.
if (isRemoteLogEnabled && !wasRemoteLogEnabled &&
replicaManager.inklessMetadataView().isDisklessTopic(topic)) {
val consolidatingPartitions = (leaderPartitions ++ followerPartitions).map(_.topicPartition).toSet
replicaManager.startConsolidationFetchersForCaughtUpClassicPartitions(consolidatingPartitions)
}

// When copy disabled, we should stop leaderCopyRLMTask, but keep expirationTask
if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) {
replicaManager.remoteLogManager.foreach(rlm => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ class DynamicConfigChangeUnitTest {
when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(metadataCache.getTopicId(topic)).thenReturn(topicUuid)
// Classic-tiered topic: the consolidation hook must not fire for a non-diskless topic.
val inklessMetadataView = mock(classOf[kafka.server.metadata.InklessMetadataView])
when(inklessMetadataView.isDisklessTopic(topic)).thenReturn(false)
when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView)

val tp0 = new TopicPartition(topic, 0)
val log0: UnifiedLog = mock(classOf[UnifiedLog])
Expand Down Expand Up @@ -564,6 +568,87 @@ class DynamicConfigChangeUnitTest {
configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate, false)
assertEquals(Collections.singleton(partition0), leaderPartitionsArg.getValue)
assertEquals(Collections.singleton(partition1), followerPartitionsArg.getValue)
verify(replicaManager, never()).startConsolidationFetchersForCaughtUpClassicPartitions(any())
}

@Test
def testEnableRemoteLogStorageOnDisklessTopicStartsConsolidation(): Unit = {
// A classic -> consolidated switch flips diskless.enable=true (sealing the classic log) and
// remote.storage.enable=true in separate controller deltas, with remote storage landing after
// the seal as a config-only change. This hook starts consolidation when remote storage flips on
// for a (now consolidating) diskless topic, since the leader-delta path no longer re-runs.
val topic = "diskless-topic"
val topicUuid = Uuid.randomUuid()
val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
val metadataCache = mock(classOf[MetadataCache])
when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
when(replicaManager.metadataCache).thenReturn(metadataCache)
when(metadataCache.getTopicId(topic)).thenReturn(topicUuid)
val inklessMetadataView = mock(classOf[kafka.server.metadata.InklessMetadataView])
when(inklessMetadataView.isDisklessTopic(topic)).thenReturn(true)
when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView)

val tp0 = new TopicPartition(topic, 0)
val log0: UnifiedLog = mock(classOf[UnifiedLog])
val partition0: Partition = mock(classOf[Partition])
when(log0.topicPartition).thenReturn(tp0)
when(log0.remoteLogEnabled()).thenReturn(true)
when(partition0.isLeader).thenReturn(true)
when(partition0.topicPartition).thenReturn(tp0)
when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0))
when(log0.config).thenReturn(new LogConfig(Collections.emptyMap()))

val tp1 = new TopicPartition(topic, 1)
val log1: UnifiedLog = mock(classOf[UnifiedLog])
val partition1: Partition = mock(classOf[Partition])
when(log1.topicPartition).thenReturn(tp1)
when(log1.remoteLogEnabled()).thenReturn(true)
when(partition1.isLeader).thenReturn(false)
when(partition1.topicPartition).thenReturn(tp1)
when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1))
when(log1.config).thenReturn(new LogConfig(Collections.emptyMap()))

doNothing().when(rlm).onLeadershipChange(any(), any(), any())

val consolidatingArg: ArgumentCaptor[Set[TopicPartition]] = ArgumentCaptor.forClass(classOf[Set[TopicPartition]])
doNothing().when(replicaManager).startConsolidationFetchersForCaughtUpClassicPartitions(consolidatingArg.capture())

val isRemoteLogEnabledBeforeUpdate = false
val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null)
configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate, false)

// Both the leader and the follower partition are handed to the reconciler, which decides per
// partition whether they are ready to consolidate (at/above the seal) or must keep replicating.
assertEquals(Set(tp0, tp1), consolidatingArg.getValue)
}

@Test
def testEnableRemoteLogStorageOnAlreadyEnabledDisklessTopicDoesNotStartConsolidation(): Unit = {
// Remote storage was already on (e.g. the leader-delta path already started consolidation at the
// seal commit, or this is an unrelated config edit): the false->true transition guard must keep
// this from redundantly kicking off consolidation again.
val topic = "diskless-topic"
val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
val inklessMetadataView = mock(classOf[kafka.server.metadata.InklessMetadataView])
when(inklessMetadataView.isDisklessTopic(topic)).thenReturn(true)
when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView)

val tp0 = new TopicPartition(topic, 0)
val log0: UnifiedLog = mock(classOf[UnifiedLog])
val partition0: Partition = mock(classOf[Partition])
when(log0.topicPartition).thenReturn(tp0)
when(log0.remoteLogEnabled()).thenReturn(true)
when(partition0.isLeader).thenReturn(true)
when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0))
when(log0.config).thenReturn(new LogConfig(Collections.emptyMap()))

val isRemoteLogEnabledBeforeUpdate = true
val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null)
configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate, false)
verify(replicaManager, never()).startConsolidationFetchersForCaughtUpClassicPartitions(any())
}

@Test
Expand Down
Loading