diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 76a4b247edd..b49f7d84a66 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -194,6 +194,11 @@ class Partition(val topicPartition: TopicPartition, @volatile private[cluster] var partitionState: PartitionState = new CommittedPartitionState(util.Set.of(), LeaderRecoveryState.RECOVERED) @volatile var assignmentState: AssignmentState = new SimpleAssignmentState(util.List.of()) + // When true, the partition is sealed for classic-to-diskless migration. + // A sealed partition rejects all produce requests via appendRecordsToLeader. + // Set under the write lock to guarantee that once sealed, LEO cannot increase. + @volatile private var _sealed: Boolean = false + // Logs belonging to this partition. Majority of time it will be only one log, but if log directory // is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy // completes and a switch to new location is performed. @@ -252,6 +257,15 @@ class Partition(val topicPartition: TopicPartition, */ def isAtMinIsr: Boolean = leaderLogIfLocal.exists { partitionState.isr.size == effectiveMinIsr(_) } + def isSealed: Boolean = _sealed + + def seal(): Unit = inWriteLock(leaderIsrUpdateLock) { + if (!_sealed) { + _sealed = true + stateChangeLogger.info(s"Sealed partition $topicPartition for diskless migration with LEO ${localLogOrException.logEndOffset}") + } + } + def isReassigning: Boolean = assignmentState.isInstanceOf[OngoingReassignmentState] def isAddingLocalReplica: Boolean = assignmentState.isAddingReplica(localBrokerId) @@ -1224,6 +1238,12 @@ class Partition(val topicPartition: TopicPartition, transactionVersion: Short = TransactionVersion.TV_UNKNOWN ): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { + if (_sealed) { + // Force metadata refresh and client retries while the migration from classic to diskless is still ongoing. + throw new ReplicaNotAvailableException( + s"Partition $topicPartition is sealed for diskless migration on broker $localBrokerId") + } + leaderLogIfLocal match { case Some(leaderLog) => val minIsr = effectiveMinIsr(leaderLog) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b774a31d7be..9c61a0ecd23 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -553,6 +553,18 @@ class ReplicaManager(val config: KafkaConfig, } } + def sealTopicPartitions(topic: String): Unit = { + allPartitions.forEach { (tp, hostedPartition) => + if (tp.topic() == topic) { + hostedPartition match { + case HostedPartition.Online(partition) if partition.isLeader => + partition.seal() + case _ => + } + } + } + } + private def offlinePartitionCount: Int = { allPartitions.values.asScala.iterator.count(_.getClass == HostedPartition.Offline.getClass) } @@ -2736,10 +2748,12 @@ class ReplicaManager(val config: KafkaConfig, "local leaders.") replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet) localLeaders.foreachEntry { (tp, info) => - if (!_inklessMetadataView.isDisklessTopic(tp.topic())) + val isDiskless = _inklessMetadataView.isDisklessTopic(tp.topic()) + val existingPartition = onlinePartition(tp) + if (!isDiskless) { getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { - val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) partition.makeLeader(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) changedPartitions.add(partition) @@ -2754,6 +2768,23 @@ class ReplicaManager(val config: KafkaConfig, markPartitionOffline(tp) } } + } else if (existingPartition.isDefined) { + // Classic-to-diskless transition: seal the partition before making it leader + // so that no produce request can ever be processed by the new leader. + val partition = existingPartition.get + try { + partition.seal() + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) + partition.makeLeader(info.partition, false, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) + + changedPartitions.add(partition) + } catch { + case e: KafkaStorageException => + stateChangeLogger.info(s"Skipped the become-leader state change for transitioning partition $tp " + + s"with topic id ${info.topicId} due to a storage error ${e.getMessage}") + markPartitionOffline(tp) + } + } } } @@ -2781,7 +2812,7 @@ class ReplicaManager(val config: KafkaConfig, // - This also ensures that the local replica is created even if the leader // is unavailable. This is required to ensure that we include the partition's // high watermark in the checkpoint file (see KAFKA-1647). - val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) val isNewLeaderEpoch = partition.makeFollower(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) if (isInControlledShutdown && (info.partition.leader == NO_LEADER || diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 1d117da442a..58553ed57d6 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -24,6 +24,7 @@ import kafka.server.share.SharePartitionManager import kafka.server.{KafkaConfig, ReplicaManager} import kafka.utils.Logging import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.internals.Topic import org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta, KRaftCoordinatorMetadataImage} @@ -143,6 +144,10 @@ class BrokerMetadataPublisher( debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.") } + // Seal existing leader partitions for topics transitioning from classic to diskless. + // New leaders elected are instead sealed inside ReplicaManager.applyLocalLeadersDelta. + sealExistingLeadersOfTopicsMigratedToDiskless(delta, newImage) + // Apply topic deltas. Option(delta.topicsDelta()).foreach { topicsDelta => try { @@ -339,6 +344,24 @@ class BrokerMetadataPublisher( } } + private def sealExistingLeadersOfTopicsMigratedToDiskless(delta: MetadataDelta, newImage: MetadataImage): Unit = { + Option(delta.configsDelta()).foreach { configsDelta => + configsDelta.changes().forEach { (resource, _) => + if (resource.`type`() == ConfigResource.Type.TOPIC) { + val topicName = resource.name() + val oldProps = delta.image().configs().configProperties(resource) + val wasDiskless = oldProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean + val newProps = newImage.configs().configProperties(resource) + val isDiskless = newProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean + if (!wasDiskless && isDiskless) { + info(s"Topic $topicName transitioning from classic to diskless, sealing leader partitions") + replicaManager.sealTopicPartitions(topicName) + } + } + } + } + } + private def initializeManagers(newImage: MetadataImage): Unit = { try { // Start log manager, which will perform (potentially lengthy) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index b13744b2d0e..c68669dd209 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric import kafka.log.LogManager import kafka.server._ import kafka.utils._ -import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, UnknownLeaderEpochException} +import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, ReplicaNotAvailableException, UnknownLeaderEpochException} import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset @@ -4073,4 +4073,102 @@ class PartitionTest extends AbstractPartitionTest { partition.setLog(mockLog, false) assertThrows(classOf[PolicyViolationException], () => partition.deleteRecordsOnLeader(1L)) } + + @Test + def testSealPartition(): Unit = { + val leaderEpoch = 1 + partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + assertFalse(partition.isSealed) + + partition.seal() + + assertTrue(partition.isSealed) + } + + @Test + def testSealIsIdempotent(): Unit = { + val leaderEpoch = 1 + partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + partition.seal() + assertTrue(partition.isSealed) + + partition.seal() + assertTrue(partition.isSealed) + } + + @Test + def testSealedPartitionRejectsAppends(): Unit = { + val leaderEpoch = 1 + partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + val requestLocal = RequestLocal.withThreadConfinedCaching + val records = TestUtils.records(List(new SimpleRecord("k".getBytes, "v".getBytes))) + + // Appending before sealing should succeed + partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal) + + partition.seal() + + // Appending after sealing should throw ReplicaNotAvailableException + val newRecords = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))) + assertThrows(classOf[ReplicaNotAvailableException], () => + partition.appendRecordsToLeader(newRecords, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)) + } + + @Test + def testSealedPartitionStabilizesLeo(): Unit = { + val leaderEpoch = 1 + partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + val requestLocal = RequestLocal.withThreadConfinedCaching + val records = TestUtils.records(List(new SimpleRecord("k".getBytes, "v".getBytes))) + partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal) + + val leoBeforeSeal = partition.localLogOrException.logEndOffset + + partition.seal() + + // LEO should remain stable after sealing + assertEquals(leoBeforeSeal, partition.localLogOrException.logEndOffset) + + // Further appends are rejected + val newRecords = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))) + assertThrows(classOf[ReplicaNotAvailableException], () => + partition.appendRecordsToLeader(newRecords, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)) + + // LEO remains unchanged + assertEquals(leoBeforeSeal, partition.localLogOrException.logEndOffset) + } + + @Test + def testSealedPartitionPreservedAcrossMakeLeader(): Unit = { + val leaderEpoch = 1 + partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + partition.seal() + assertTrue(partition.isSealed) + + // Transitioning to leader with a new epoch should not clear the sealed flag + val replicas = Array(brokerId, remoteReplicaId) + val newRegistration = new PartitionRegistration.Builder() + .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch + 1) + .setIsr(replicas) + .setPartitionEpoch(2) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + .build() + + partition.makeLeader(newRegistration, isNew = false, offsetCheckpoints, None) + + assertTrue(partition.isSealed, "Sealed flag should be preserved across makeLeader") + + val requestLocal = RequestLocal.withThreadConfinedCaching + val records = TestUtils.records(List(new SimpleRecord("k".getBytes, "v".getBytes))) + assertThrows(classOf[ReplicaNotAvailableException], () => + partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)) + } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index d4571d14e0c..1e03ce4f8cb 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6861,6 +6861,126 @@ class ReplicaManagerTest { // TODO: Add more fetch tests combinations, edge cases ara not covered yet. + @Test + def testSealTopicPartitionsSealsOnlyLeadersOfTargetTopic(): Unit = { + val props = TestUtils.createBrokerConfig(0) + val config = KafkaConfig.fromProps(props) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties())) + val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1)) + mockGetAliveBrokerFunctions(metadataCache, aliveBrokers) + when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION) + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + + try { + val topicToSeal = "topic-to-seal" + val otherTopic = "other-topic" + val sealTopicId = Uuid.randomUuid() + val otherTopicId = Uuid.randomUuid() + setupMetadataCacheWithTopicIds(Map(topicToSeal -> sealTopicId, otherTopic -> otherTopicId), metadataCache) + + val delta = new TopicsDelta(TopicsImage.EMPTY) + delta.replay(new TopicRecord().setName(topicToSeal).setTopicId(sealTopicId)) + delta.replay(new TopicRecord().setName(otherTopic).setTopicId(otherTopicId)) + // topicToSeal: partition 0 is leader, partition 1 is follower + delta.replay(new PartitionRecord() + .setPartitionId(0).setTopicId(sealTopicId) + .setReplicas(util.Arrays.asList(0, 1)).setIsr(util.Arrays.asList(0, 1)) + .setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0)) + delta.replay(new PartitionRecord() + .setPartitionId(1).setTopicId(sealTopicId) + .setReplicas(util.Arrays.asList(0, 1)).setIsr(util.Arrays.asList(0, 1)) + .setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0)) + // otherTopic: partition 0 is leader + delta.replay(new PartitionRecord() + .setPartitionId(0).setTopicId(otherTopicId) + .setReplicas(util.Arrays.asList(0, 1)).setIsr(util.Arrays.asList(0, 1)) + .setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0)) + val image = imageFromTopics(delta.apply()) + rm.applyDelta(delta, image) + + val sealLeader = rm.getPartitionOrException(new TopicPartition(topicToSeal, 0)) + val sealFollower = rm.getPartitionOrException(new TopicPartition(topicToSeal, 1)) + val otherLeader = rm.getPartitionOrException(new TopicPartition(otherTopic, 0)) + + rm.sealTopicPartitions(topicToSeal) + + assertTrue(sealLeader.isSealed, "Leader of target topic should be sealed") + assertFalse(sealFollower.isSealed, "Follower of target topic should not be sealed") + assertFalse(otherLeader.isSealed, "Leader of other topic should not be sealed") + } finally { + rm.shutdown(checkpointHW = false) + } + } + + @Test + def testApplyDeltaSealsMigratedLeaderPartitions(): Unit = { + val props = TestUtils.createBrokerConfig(0) + val config = KafkaConfig.fromProps(props) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties())) + val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1)) + val kraftMetadataCache: KRaftMetadataCache = mock(classOf[KRaftMetadataCache]) + when(kraftMetadataCache.topicConfig(anyString())).thenReturn(new Properties()) + mockGetAliveBrokerFunctions(kraftMetadataCache, aliveBrokers) + when(kraftMetadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION) + val topicName = "transitioning-topic" + val transTopicId = Uuid.randomUuid() + setupMetadataCacheWithTopicIds(Map(topicName -> transTopicId), kraftMetadataCache) + + val inklessMetadata = mock(classOf[InklessMetadataView]) + when(inklessMetadata.isDisklessTopic(any())).thenReturn(false) + + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = kraftMetadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager, + inklessMetadataView = Some(inklessMetadata)) + + try { + // First, create the partition as a classic follower (leader is broker 1) + val createDelta = new TopicsDelta(TopicsImage.EMPTY) + createDelta.replay(new TopicRecord().setName(topicName).setTopicId(transTopicId)) + createDelta.replay(new PartitionRecord() + .setPartitionId(0).setTopicId(transTopicId) + .setReplicas(util.Arrays.asList(0, 1)).setIsr(util.Arrays.asList(0, 1)) + .setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0)) + val createImage = imageFromTopics(createDelta.apply()) + rm.applyDelta(createDelta, createImage) + + val partition = rm.getPartitionOrException(new TopicPartition(topicName, 0)) + assertFalse(partition.isLeader) + assertFalse(partition.isSealed) + + // Now simulate a follower->leader transition while the topic becomes diskless + when(inklessMetadata.isDisklessTopic(topicName)).thenReturn(true) + + val changeDelta = new TopicsDelta(createDelta.apply()) + changeDelta.replay(new PartitionChangeRecord() + .setPartitionId(0).setTopicId(transTopicId) + .setLeader(0).setIsr(util.Arrays.asList(0, 1))) + val changeImage = imageFromTopics(changeDelta.apply()) + rm.applyDelta(changeDelta, changeImage) + + assertTrue(partition.isSealed, "Transitioning leader partition should be sealed after applyDelta") + } finally { + rm.shutdown(checkpointHW = false) + } + } + private def mockFetchHandler(disklessResponse: Map[TopicIdPartition, FetchPartitionData]) = { // We use constructor mocking here to inject a FetchHandler mock into ReplicaManager, // because ReplicaManager internally constructs its own FetchHandler instance and does not diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 25a7fb0f388..a5728dae003 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -30,10 +30,10 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic} import org.apache.kafka.common.Uuid -import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.config.ConfigResource.Type.BROKER import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} +import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes} import org.apache.kafka.common.utils.Exit import org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta, KRaftCoordinatorMetadataImage} @@ -362,4 +362,122 @@ class BrokerMetadataPublisherTest { // SharePartitionManager is receiving the latest changes. verify(sharePartitionManager).onShareVersionToggle(any(), any()) } + + @Test + def testSealTransitioningTopicsOnDisklessEnableChange(): Unit = { + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)) + val metadataCache = new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_1) + val logManager = mock(classOf[LogManager]) + val replicaManager = mock(classOf[ReplicaManager]) + val inklessMetadataView = mock(classOf[InklessMetadataView]) + when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView) + val faultHandler = mock(classOf[FaultHandler]) + + val metadataPublisher = new BrokerMetadataPublisher( + config, + metadataCache, + logManager, + replicaManager, + mock(classOf[GroupCoordinator]), + mock(classOf[TransactionCoordinator]), + mock(classOf[ShareCoordinator]), + mock(classOf[SharePartitionManager]), + mock(classOf[DynamicConfigPublisher]), + mock(classOf[DynamicClientQuotaPublisher]), + mock(classOf[DynamicTopicClusterQuotaPublisher]), + mock(classOf[ScramPublisher]), + mock(classOf[DelegationTokenPublisher]), + mock(classOf[AclPublisher]), + faultHandler, + faultHandler + ) + + val topicName = "test-seal-topic" + val topicId = Uuid.randomUuid() + + // Build initial image with topic (diskless.enable = false by default) + var delta = new MetadataDelta(MetadataImage.EMPTY) + delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId)) + delta.replay(new PartitionRecord() + .setTopicId(topicId).setPartitionId(0).setLeader(config.brokerId)) + val baseImage = delta.apply(MetadataProvenance.EMPTY) + + // Now create a delta that sets diskless.enable = true + delta = new MetadataDelta(baseImage) + delta.replay(new ConfigRecord() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName(topicName) + .setName(TopicConfig.DISKLESS_ENABLE_CONFIG) + .setValue("true")) + + metadataPublisher.onMetadataUpdate(delta, delta.apply(MetadataProvenance.EMPTY), + LogDeltaManifest.newBuilder() + .provenance(MetadataProvenance.EMPTY) + .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build()) + + verify(replicaManager).sealTopicPartitions(topicName) + } + + @Test + def testNoSealingWhenDisklessEnableNotChanged(): Unit = { + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)) + val metadataCache = new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_1) + val logManager = mock(classOf[LogManager]) + val replicaManager = mock(classOf[ReplicaManager]) + val inklessMetadataView = mock(classOf[InklessMetadataView]) + when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView) + val faultHandler = mock(classOf[FaultHandler]) + + val metadataPublisher = new BrokerMetadataPublisher( + config, + metadataCache, + logManager, + replicaManager, + mock(classOf[GroupCoordinator]), + mock(classOf[TransactionCoordinator]), + mock(classOf[ShareCoordinator]), + mock(classOf[SharePartitionManager]), + mock(classOf[DynamicConfigPublisher]), + mock(classOf[DynamicClientQuotaPublisher]), + mock(classOf[DynamicTopicClusterQuotaPublisher]), + mock(classOf[ScramPublisher]), + mock(classOf[DelegationTokenPublisher]), + mock(classOf[AclPublisher]), + faultHandler, + faultHandler + ) + + val topicName = "test-no-seal-topic" + val topicId = Uuid.randomUuid() + + // Build initial image with topic + var delta = new MetadataDelta(MetadataImage.EMPTY) + delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId)) + delta.replay(new PartitionRecord() + .setTopicId(topicId).setPartitionId(0).setLeader(config.brokerId)) + val baseImage = delta.apply(MetadataProvenance.EMPTY) + + // Create a delta that changes a non-diskless config + delta = new MetadataDelta(baseImage) + delta.replay(new ConfigRecord() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName(topicName) + .setName(TopicConfig.RETENTION_MS_CONFIG) + .setValue("86400000")) + + metadataPublisher.onMetadataUpdate(delta, delta.apply(MetadataProvenance.EMPTY), + LogDeltaManifest.newBuilder() + .provenance(MetadataProvenance.EMPTY) + .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build()) + + verify(replicaManager, Mockito.never()).sealTopicPartitions(any()) + } }