From d054236b897bff15a6ac6eca8dc7ea00e4cf71d6 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Thu, 23 Apr 2026 18:25:40 +0200 Subject: [PATCH 1/4] feat(inkless): Set migrating partitions in KRaft metadata --- .../scala/kafka/server/ReplicaManager.scala | 9 +- .../kafka/server/ReplicaManagerTest.scala | 142 ++++++++++++++++++ .../kafka/controller/QuorumController.java | 8 + .../controller/ReplicationControlManager.java | 39 ++++- .../kafka/metadata/PartitionRegistration.java | 1 + .../ReplicationControlManagerTest.java | 74 +++++++++ .../metadata/PartitionRegistrationTest.java | 48 ++++++ 7 files changed, 316 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a08f2952e1..5d15e34c6b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1894,9 +1894,10 @@ class ReplicaManager(val config: KafkaConfig, classicFetchInfos += fetchInfo } else { val classicToDisklessStartOffset = _inklessMetadataView.getClassicToDisklessStartOffset(tp.topicPartition()) - val shouldReadFromUnifiedLog = - classicToDisklessStartOffset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET && - partitionData.fetchOffset < classicToDisklessStartOffset + val migrationPending = + classicToDisklessStartOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING + val shouldReadFromUnifiedLog = migrationPending || ( + classicToDisklessStartOffset >= 0 && partitionData.fetchOffset < classicToDisklessStartOffset) (shouldReadFromUnifiedLog, config.disklessManagedReplicasEnabled) match { case (false, _) => @@ -2850,7 +2851,7 @@ class ReplicaManager(val config: KafkaConfig, Option(topicImage.partitions().get(partitionId)) } val shouldInitOnControlPlane = previousPartition.exists { previous => - previous.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET && + previous.classicToDisklessStartOffset < 0 && partitionRegistration.classicToDisklessStartOffset >= 0 } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9ec0e184ce..627419428e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6427,6 +6427,148 @@ class ReplicaManagerTest { } } + @Test + def testFetchDisklessMigrationPendingReadsFromClassicLogWhenManagedReplicasEnabled(): Unit = { + val fetchHandlerCtor = mockFetchHandler(Map.empty) + try { + val cp = mock(classOf[ControlPlane]) + val replicaManager = spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp), disklessManagedReplicasEnabled = true)) + + // Given a diskless topic with classicToDisklessStartOffset = -2 (migration pending) + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING) + + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1L, 0L, 0), RECORDS), + Optional.empty(), 10L, 0L, 10L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + -1, -1L, + 0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // Then the request is served from the unified log (classic path) because migration is still pending + assertNotNull(responseData) + assertEquals(1, responseData.size) + assertEquals(RECORDS, responseData(disklessTopicPartition).records) + verify(replicaManager, times(1)).readFromLog(any(), any(), any(), any()) + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + verify(cp, never()).findBatches(any(), any(), any()) + } finally { + fetchHandlerCtor.close() + } + } + + @Test + def testFetchDisklessMigrationPendingFailsWhenManagedReplicasDisabled(): Unit = { + val fetchHandlerCtor = mockFetchHandler(Map.empty) + try { + val cp = mock(classOf[ControlPlane]) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = false, + )) + + // Given a diskless topic with classicToDisklessStartOffset = -2 (migration pending) + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING) + + val fetchParams = new FetchParams( + -1, -1L, + 0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // Then the request fails because managed replicas are disabled and data is still in UnifiedLog + assertNotNull(responseData) + assertEquals(1, responseData.size) + assertEquals(Errors.INVALID_REQUEST, responseData(disklessTopicPartition).error) + verify(replicaManager, never()).readFromLog(any(), any(), any(), any()) + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + verify(cp, never()).findBatches(any(), any(), any()) + } finally { + fetchHandlerCtor.close() + } + } + + @Test + def testFetchFullDisklessTopicRoutesDiskless(): Unit = { + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + 110L, 100L, + RECORDS, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + try { + val batchMetadata = mock(classOf[BatchMetadata]) + when(batchMetadata.topicIdPartition()).thenReturn(disklessTopicPartition) + val batch = mock(classOf[BatchInfo]) + when(batch.metadata()).thenReturn(batchMetadata) + val findBatchResponse = mock(classOf[FindBatchResponse]) + when(findBatchResponse.batches()).thenReturn(util.List.of(batch)) + when(findBatchResponse.highWatermark()).thenReturn(110L) + when(findBatchResponse.estimatedByteSize(50L)).thenReturn(RECORDS.sizeInBytes()) + when(findBatchResponse.errors()).thenReturn(Errors.NONE) + val cp = mock(classOf[ControlPlane]) + when(cp.findBatches(any(), any(), any())).thenReturn(util.List.of(findBatchResponse)) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + )) + + // Given a full diskless topic with classicToDisklessStartOffset = -1 (never migrated) + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) + + val fetchParams = new FetchParams( + -1, -1L, + 0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // Then the request is served from diskless path (not from UnifiedLog) + assertNotNull(responseData) + assertEquals(1, responseData.size) + assertEquals(Errors.NONE, responseData(disklessTopicPartition).error) + verify(replicaManager, never()).readFromLog(any(), any(), any(), any()) + verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any()) + } finally { + fetchHandlerCtor.close() + } + } + @Test def testFetchFailDisklessWhenFromReplicaAndUnmanagedReplicas(): Unit = { val fetchHandlerCtor = mockFetchHandler(Map.empty) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index c0854c64d1..29ee7a26e1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -129,6 +129,7 @@ import org.slf4j.Logger; +import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -1929,6 +1930,13 @@ public CompletableFuture> incrementalAlterConfigs( if (validateOnly) { return result.withoutRecords(); } else { + List migrationRecords = + replicationControl.markClassicToDisklessMigrationStarted(configChanges); + if (!migrationRecords.isEmpty()) { + List allRecords = new ArrayList<>(result.records()); + allRecords.addAll(migrationRecords); + return ControllerResult.of(allRecords, result.response()); + } return result; } }); diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 1261986b60..603a7c8c3a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1515,7 +1515,7 @@ ControllerResult initDisklessLog( continue; } - if (partition.classicToDisklessStartOffset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) { + if (partition.classicToDisklessStartOffset >= 0) { log.info("Rejecting InitDisklessLog request from node {} for {}-{} because " + "the partition is already initialized with classicToDisklessStartOffset={}.", request.brokerId(), topic.name, partitionId, partition.classicToDisklessStartOffset); @@ -2875,6 +2875,43 @@ private void validatePartitionReplicationFactorUnchanged(PartitionRegistration p } } + List markClassicToDisklessMigrationStarted( + Map>> configChanges + ) { + List records = new ArrayList<>(); + for (Entry>> entry : configChanges.entrySet()) { + ConfigResource resource = entry.getKey(); + if (resource.type() != TOPIC) continue; + Map> changes = entry.getValue(); + Entry disklessChange = changes.get(DISKLESS_ENABLE_CONFIG); + if (disklessChange == null) continue; + if (disklessChange.getKey() != SET || !Boolean.parseBoolean(disklessChange.getValue())) continue; + if (isDisklessTopic(resource.name())) continue; + + String topicName = resource.name(); + Uuid topicId = topicsByName.get(topicName); + if (topicId == null) continue; + TopicControlInfo topicInfo = topics.get(topicId); + if (topicInfo == null) continue; + + for (Entry partEntry : topicInfo.parts.entrySet()) { + PartitionRegistration partition = partEntry.getValue(); + if (partition.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) { + PartitionChangeRecord record = new PartitionChangeRecord() + .setTopicId(topicId) + .setPartitionId(partEntry.getKey()); + record.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset( + PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING)); + records.add(new ApiMessageAndVersion(record, (short) 0)); + } + } + log.info("Marked {} partition(s) for topic {} as classic-to-diskless migration pending", + records.size(), topicName); + } + return records; + } + private boolean isDisklessTopic(String topicName) { return Boolean.parseBoolean( configurationControl.currentTopicConfig(topicName) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java index d4db9bcc6d..54cb4c4dfc 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java @@ -165,6 +165,7 @@ public PartitionRegistration build() { } public static final long NO_CLASSIC_TO_DISKLESS_START_OFFSET = -1L; + public static final long CLASSIC_TO_DISKLESS_MIGRATION_PENDING = -2L; public final int[] replicas; public final Uuid[] directories; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index f54e733fde..9a4beb3564 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -6119,6 +6119,80 @@ public void testInitDisklessLogMultipleProducerStates() { assertEquals(2L, updatedPartition.disklessProducerStates.get(1).producerId()); assertEquals(3L, updatedPartition.disklessProducerStates.get(2).producerId()); } + + @Test + public void testInitDisklessLogAcceptsMigrationPendingPartition() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", + new int[][] {new int[] {0, 1, 2}}); + + Uuid topicId = createTopicResult.topicId(); + + // Simulate migration pending by replaying a PartitionChangeRecord with -2 + PartitionChangeRecord migrationPendingRecord = new PartitionChangeRecord() + .setTopicId(topicId) + .setPartitionId(0); + migrationPendingRecord.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset( + PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING)); + ctx.replay(List.of(new ApiMessageAndVersion(migrationPendingRecord, (short) 0))); + + PartitionRegistration pendingPartition = replicationControl.getPartition(topicId, 0); + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING, pendingPartition.classicToDisklessStartOffset); + + // InitDisklessLog should succeed even though classicToDisklessStartOffset is -2 + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION); + InitDisklessLogRequestData request = singlePartitionRequest( + 0, defaultBrokerEpoch(0), topicId, 0, 100L, pendingPartition.leaderEpoch, List.of()); + + ControllerResult result = + replicationControl.initDisklessLog(requestContext, request); + + assertEquals(1, result.records().size()); + assertEquals(NONE.code(), + result.response().topics().get(0).partitions().get(0).errorCode()); + + ctx.replay(result.records()); + PartitionRegistration updatedPartition = replicationControl.getPartition(topicId, 0); + assertEquals(100L, updatedPartition.classicToDisklessStartOffset); + } + + @Test + public void testInitDisklessLogRejectsAlreadyInitializedPartition() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", + new int[][] {new int[] {0, 1, 2}}); + + Uuid topicId = createTopicResult.topicId(); + PartitionRegistration partition = replicationControl.getPartition(topicId, 0); + + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION); + + // First init succeeds + InitDisklessLogRequestData firstRequest = singlePartitionRequest( + 0, defaultBrokerEpoch(0), topicId, 0, 100L, partition.leaderEpoch, List.of()); + ControllerResult firstResult = + replicationControl.initDisklessLog(requestContext, firstRequest); + ctx.replay(firstResult.records()); + assertEquals(NONE.code(), + firstResult.response().topics().get(0).partitions().get(0).errorCode()); + + // Second init with different offset is rejected (offset >= 0 means already committed) + PartitionRegistration updatedPartition = replicationControl.getPartition(topicId, 0); + InitDisklessLogRequestData secondRequest = singlePartitionRequest( + 0, defaultBrokerEpoch(0), topicId, 0, 200L, updatedPartition.leaderEpoch, List.of()); + ControllerResult secondResult = + replicationControl.initDisklessLog(requestContext, secondRequest); + assertEquals(0, secondResult.records().size()); + assertEquals(INVALID_REQUEST.code(), + secondResult.response().topics().get(0).partitions().get(0).errorCode()); + } } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index 8e2f911ea7..5df2c89e05 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -497,4 +497,52 @@ public void testDisklessFieldsInEqualsAndHashCode() { assertNotEquals(a, c); assertNotEquals(a, d); } + + @Test + public void testMigrationPendingRoundTrip() { + PartitionRegistration original = new PartitionRegistration.Builder(). + setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)). + setIsr(new int[]{1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). + setLeaderEpoch(0).setPartitionEpoch(0). + setClassicToDisklessStartOffset(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING).build(); + + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING, original.classicToDisklessStartOffset); + + Uuid topicId = Uuid.randomUuid(); + ApiMessageAndVersion record = original.toRecord(topicId, 0, + new ImageWriterOptions.Builder(MetadataVersion.latestTesting()).build()); + PartitionRecord partitionRecord = (PartitionRecord) record.message(); + PartitionRegistration restored = new PartitionRegistration(partitionRecord); + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING, restored.classicToDisklessStartOffset); + } + + @Test + public void testMergeFromMigrationPendingToActualOffset() { + PartitionRegistration original = new PartitionRegistration.Builder(). + setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)). + setIsr(new int[]{1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). + setLeaderEpoch(0).setPartitionEpoch(0). + setClassicToDisklessStartOffset(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING).build(); + + PartitionChangeRecord changeRecord = new PartitionChangeRecord(); + changeRecord.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset(100L)); + + PartitionRegistration merged = original.merge(changeRecord); + assertEquals(100L, merged.classicToDisklessStartOffset); + } + + @Test + public void testMergePreservesMigrationPending() { + PartitionRegistration original = new PartitionRegistration.Builder(). + setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)). + setIsr(new int[]{1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). + setLeaderEpoch(0).setPartitionEpoch(0). + setClassicToDisklessStartOffset(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING).build(); + + PartitionRegistration merged = original.merge(new PartitionChangeRecord(). + setLeader(2).setIsr(List.of(2, 3))); + + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING, merged.classicToDisklessStartOffset); + } } From 4e427beb5fef9d1a5c0896b43356b87843b0ddc2 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Fri, 24 Apr 2026 10:45:15 +0200 Subject: [PATCH 2/4] Address review comments --- .../org/apache/kafka/controller/QuorumController.java | 7 ++++--- .../kafka/controller/ReplicationControlManager.java | 10 +++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 29ee7a26e1..9f83e52f10 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -121,6 +121,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.fault.FaultHandlerException; +import org.apache.kafka.server.mutable.BoundedList; import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.CreateTopicPolicy; import org.apache.kafka.snapshot.SnapshotReader; @@ -129,7 +130,6 @@ import org.slf4j.Logger; -import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -1931,9 +1931,10 @@ public CompletableFuture> incrementalAlterConfigs( return result.withoutRecords(); } else { List migrationRecords = - replicationControl.markClassicToDisklessMigrationStarted(configChanges); + replicationControl.markClassicToDisklessMigrationStarted(configChanges, result.response()); if (!migrationRecords.isEmpty()) { - List allRecords = new ArrayList<>(result.records()); + List allRecords = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); + allRecords.addAll(result.records()); allRecords.addAll(migrationRecords); return ControllerResult.of(allRecords, result.response()); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 603a7c8c3a..dea3367d84 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -2876,12 +2876,15 @@ private void validatePartitionReplicationFactorUnchanged(PartitionRegistration p } List markClassicToDisklessMigrationStarted( - Map>> configChanges + Map>> configChanges, + Map configResults ) { - List records = new ArrayList<>(); + List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); for (Entry>> entry : configChanges.entrySet()) { ConfigResource resource = entry.getKey(); if (resource.type() != TOPIC) continue; + ApiError error = configResults.get(resource); + if (error != null && error != ApiError.NONE) continue; Map> changes = entry.getValue(); Entry disklessChange = changes.get(DISKLESS_ENABLE_CONFIG); if (disklessChange == null) continue; @@ -2894,6 +2897,7 @@ List markClassicToDisklessMigrationStarted( TopicControlInfo topicInfo = topics.get(topicId); if (topicInfo == null) continue; + int sizeBefore = records.size(); for (Entry partEntry : topicInfo.parts.entrySet()) { PartitionRegistration partition = partEntry.getValue(); if (partition.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) { @@ -2907,7 +2911,7 @@ List markClassicToDisklessMigrationStarted( } } log.info("Marked {} partition(s) for topic {} as classic-to-diskless migration pending", - records.size(), topicName); + records.size() - sizeBefore, topicName); } return records; } From b53ae25a02f48b71afe74cec691f803b70c28bab Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Fri, 24 Apr 2026 15:46:18 +0200 Subject: [PATCH 3/4] Remove duplicate test and add tests for markClassicToDisklessMigrationStarted --- .../ReplicationControlManagerTest.java | 141 +++++++++++++++--- 1 file changed, 120 insertions(+), 21 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 9a4beb3564..e6762e6470 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -6161,38 +6161,137 @@ public void testInitDisklessLogAcceptsMigrationPendingPartition() { } @Test - public void testInitDisklessLogRejectsAlreadyInitializedPartition() { + public void testMarkClassicToDisklessMigrationStartedSuccess() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); ReplicationControlManager replicationControl = ctx.replicationControl; ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", - new int[][] {new int[] {0, 1, 2}}); + new int[][] {new int[] {0, 1, 2}, new int[] {1, 2, 0}}); Uuid topicId = createTopicResult.topicId(); - PartitionRegistration partition = replicationControl.getPartition(topicId, 0); - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION); + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); + Map>> configChanges = Map.of( + resource, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + Map configResults = Map.of(resource, ApiError.NONE); + + List records = + replicationControl.markClassicToDisklessMigrationStarted(configChanges, configResults); + + assertEquals(2, records.size()); + for (int i = 0; i < 2; i++) { + assertInstanceOf(PartitionChangeRecord.class, records.get(i).message()); + PartitionChangeRecord record = (PartitionChangeRecord) records.get(i).message(); + assertEquals(topicId, record.topicId()); + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING, + InitDisklessLogFields.decodeClassicToDisklessStartOffset(record.unknownTaggedFields())); + } - // First init succeeds - InitDisklessLogRequestData firstRequest = singlePartitionRequest( - 0, defaultBrokerEpoch(0), topicId, 0, 100L, partition.leaderEpoch, List.of()); - ControllerResult firstResult = - replicationControl.initDisklessLog(requestContext, firstRequest); - ctx.replay(firstResult.records()); - assertEquals(NONE.code(), - firstResult.response().topics().get(0).partitions().get(0).errorCode()); + ctx.replay(records); + for (int i = 0; i < 2; i++) { + PartitionRegistration partition = replicationControl.getPartition(topicId, i); + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING, + partition.classicToDisklessStartOffset); + } + } - // Second init with different offset is rejected (offset >= 0 means already committed) - PartitionRegistration updatedPartition = replicationControl.getPartition(topicId, 0); - InitDisklessLogRequestData secondRequest = singlePartitionRequest( - 0, defaultBrokerEpoch(0), topicId, 0, 200L, updatedPartition.leaderEpoch, List.of()); - ControllerResult secondResult = - replicationControl.initDisklessLog(requestContext, secondRequest); - assertEquals(0, secondResult.records().size()); - assertEquals(INVALID_REQUEST.code(), - secondResult.response().topics().get(0).partitions().get(0).errorCode()); + @Test + public void testMarkClassicToDisklessMigrationStartedSkipsIneligibleChanges() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + ctx.createTestTopic("classic-topic", new int[][] {new int[] {0, 1, 2}}); + ctx.createTestTopic("already-diskless", 1, (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code()); + + ConfigResource classicTopic = new ConfigResource(ConfigResource.Type.TOPIC, "classic-topic"); + ConfigResource alreadyDiskless = new ConfigResource(ConfigResource.Type.TOPIC, "already-diskless"); + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + ConfigResource unknownTopic = new ConfigResource(ConfigResource.Type.TOPIC, "no-such-topic"); + + Map>> configChanges = new java.util.HashMap<>(); + // Config error on the topic + configChanges.put(classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + // Already-diskless topic + configChanges.put(alreadyDiskless, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + // Non-TOPIC resource + configChanges.put(brokerResource, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + // Unknown topic + configChanges.put(unknownTopic, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + + Map configResults = new java.util.HashMap<>(); + configResults.put(classicTopic, new ApiError(Errors.INVALID_REQUEST, "bad config")); + configResults.put(alreadyDiskless, ApiError.NONE); + configResults.put(brokerResource, ApiError.NONE); + configResults.put(unknownTopic, ApiError.NONE); + + List records = + replicationControl.markClassicToDisklessMigrationStarted(configChanges, configResults); + assertEquals(0, records.size()); + + // Also verify DELETE op and SET to "false" are skipped + configChanges.clear(); + configResults.clear(); + configChanges.put(classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.DELETE, "true"))); + configResults.put(classicTopic, ApiError.NONE); + + records = replicationControl.markClassicToDisklessMigrationStarted(configChanges, configResults); + assertEquals(0, records.size()); + + configChanges.put(classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "false"))); + + records = replicationControl.markClassicToDisklessMigrationStarted(configChanges, configResults); + assertEquals(0, records.size()); } + + @Test + public void testMarkClassicToDisklessMigrationStartedSkipsAlreadyInitializedPartitions() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", + new int[][] {new int[] {0, 1, 2}, new int[] {1, 2, 0}}); + + Uuid topicId = createTopicResult.topicId(); + PartitionRegistration partition0 = replicationControl.getPartition(topicId, 0); + + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION); + InitDisklessLogRequestData initRequest = singlePartitionRequest( + 0, defaultBrokerEpoch(0), topicId, 0, 100L, partition0.leaderEpoch, List.of()); + ControllerResult initResult = + replicationControl.initDisklessLog(requestContext, initRequest); + ctx.replay(initResult.records()); + + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo"); + Map>> configChanges = Map.of( + resource, Map.of(DISKLESS_ENABLE_CONFIG, + new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))); + Map configResults = Map.of(resource, ApiError.NONE); + + List records = + replicationControl.markClassicToDisklessMigrationStarted(configChanges, configResults); + + // Only partition 1 should be marked — partition 0 was already initialized + assertEquals(1, records.size()); + PartitionChangeRecord record = (PartitionChangeRecord) records.get(0).message(); + assertEquals(topicId, record.topicId()); + assertEquals(1, record.partitionId()); + assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING, + InitDisklessLogFields.decodeClassicToDisklessStartOffset(record.unknownTaggedFields())); + } + } } From fd072ebc4193b1614bb0dbb8e33477ca2f3c3b65 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Fri, 24 Apr 2026 16:33:48 +0200 Subject: [PATCH 4/4] Use ControllerResult.atomicOf for emitted records --- .../main/java/org/apache/kafka/controller/QuorumController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 9f83e52f10..fe445109d2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1936,7 +1936,7 @@ public CompletableFuture> incrementalAlterConfigs( List allRecords = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); allRecords.addAll(result.records()); allRecords.addAll(migrationRecords); - return ControllerResult.of(allRecords, result.response()); + return ControllerResult.atomicOf(allRecords, result.response()); } return result; }