From c784afc5176773abd0a030477e374a21610eb5a3 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 28 May 2026 18:23:49 +0300 Subject: [PATCH 1/2] test(inkless:switch): add consolidated diskless integration tests with transition matrix Add integration tests in DisklessAndRemoteStorageConfigsTest that exercise the full controller+broker path for topic type transitions under consolidation. Cover all 9 transition scenarios including allow-from-classic gate. Document the transition matrix as Javadoc on both LogConfigTest and the integration test class. Co-Authored-By: Claude Opus 4.6 --- .../DisklessAndRemoteStorageConfigsTest.java | 249 ++++++++++++++++++ .../scala/unit/kafka/log/LogConfigTest.scala | 24 ++ 2 files changed, 273 insertions(+) diff --git a/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java b/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java index f4feabf516..beb0e1218e 100644 --- a/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java +++ b/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java @@ -293,6 +293,255 @@ private void createTopicAndAssertEffective(Admin admin, assertEquals(expectedRemoteStorage, topicConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); } + /** + * Integration tests for remote storage consolidation — validates end-to-end behavior + * when consolidation is enabled. + * + *

Topic type transition matrix (consolidation enabled): + *

+     * #  | From       → To        | Condition                                                   | Result                     | Covered by
+     * ---+------------+-----------+-------------------------------------------------------------+----------------------------+------------------------------
+     * 1  | (none)     → DISKLESS  | diskless.enable=true, no remote.storage.enable in request   | VALID (auto-enabled)       | testCreateDisklessAutoEnablesRemoteStorage
+     * 2  | (none)     → DISKLESS  | diskless.enable=true, remote.storage.enable=true            | VALID                      | testCreateDisklessAutoEnablesRemoteStorage
+     * 3  | (none)     → DISKLESS  | diskless.enable=true, remote.storage.enable=false           | REJECTED                   | testCreateDisklessWithExplicitRemoteFalseRejected
+     * 4  | CLASSIC    → DISKLESS  | allow-from-classic=true                                     | VALID (switch)             | testClassicToDisklessSwitch
+     * 5  | CLASSIC    → DISKLESS  | allow-from-classic=false                                    | REJECTED                   | testClassicToDisklessBlockedWithoutAllowFromClassic
+     * 6  | TIERED     → DISKLESS  | allow-from-classic=true                                     | VALID (switch)             | testTieredToDisklessSwitch
+     * 7  | TIERED     → DISKLESS  | allow-from-classic=false                                    | REJECTED                   | testTieredToDisklessBlockedWithoutAllowFromClassic
+     * 8  | DISKLESS   → forbidden | remote.storage.enable=false                                 | REJECTED (mutual exclusion)| testDisklessCannotDisableRemoteStorage
+     * 9  | DISKLESS   → TIERED    | diskless.enable=false                                       | REJECTED (unsupported)     | testDisklessCannotDisableDiskless
+     * 
+ */ + @Nested + class ConsolidatedDisklessTopics { + @Test + void testCreateDisklessAutoEnablesRemoteStorage() throws Exception { + var cluster = initConsolidatedCluster(); + try (Admin admin = AdminClient.create(Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + // Create diskless topic without explicit remote.storage.enable — controller auto-enables + createTopicAndAssertEffective(admin, "diskless-auto-rs", Map.of( + DISKLESS_ENABLE_CONFIG, "true"), "true", "true"); + // Create diskless topic with explicit remote.storage.enable=true + createTopicAndAssertEffective(admin, "diskless-rs-true", Map.of( + DISKLESS_ENABLE_CONFIG, "true", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "true", "true"); + } finally { + cluster.close(); + } + } + + @Test + void testCreateDisklessWithExplicitRemoteFalseRejected() throws Exception { + var cluster = initConsolidatedCluster(); + try (Admin admin = AdminClient.create(Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + Optional error = createTopic(admin, "diskless-rs-false", Map.of( + DISKLESS_ENABLE_CONFIG, "true", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); + assertTrue(error.isPresent(), "Should reject diskless with remote.storage.enable=false"); + } finally { + cluster.close(); + } + } + + @Test + void testClassicToDisklessSwitch() throws Exception { + var cluster = initConsolidatedCluster(); + try (Admin admin = AdminClient.create(Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + // Create classic topic + createTopicAndAssertEffective(admin, "classic-to-diskless", Map.of(), "false", "false"); + // Switch to diskless + assertTrue(incrementalAlterTopicConfig(admin, "classic-to-diskless", Map.of( + DISKLESS_ENABLE_CONFIG, "true", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")).isEmpty(), + "CLASSIC→DISKLESS switch should succeed with allow-from-classic + consolidation"); + var config = getTopicConfig(admin, "classic-to-diskless"); + assertEquals("true", config.get(DISKLESS_ENABLE_CONFIG)); + assertEquals("true", config.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); + } finally { + cluster.close(); + } + } + + @Test + void testTieredToDisklessSwitch() throws Exception { + var cluster = initConsolidatedCluster(); + try (Admin admin = AdminClient.create(Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + // Create tiered topic + createTopicAndAssertEffective(admin, "tiered-to-diskless", + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "false", "true"); + // Switch to diskless + assertTrue(incrementalAlterTopicConfig(admin, "tiered-to-diskless", Map.of( + DISKLESS_ENABLE_CONFIG, "true", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")).isEmpty(), + "TIERED→DISKLESS switch should succeed with allow-from-classic"); + var config = getTopicConfig(admin, "tiered-to-diskless"); + assertEquals("true", config.get(DISKLESS_ENABLE_CONFIG)); + assertEquals("true", config.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); + } finally { + cluster.close(); + } + } + + @Test + void testDisklessCannotDisableRemoteStorage() throws Exception { + var cluster = initConsolidatedCluster(); + try (Admin admin = AdminClient.create(Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + // Create diskless topic (RS auto-enabled by controller) + createTopicAndAssertEffective(admin, "diskless-no-disable-rs", Map.of( + DISKLESS_ENABLE_CONFIG, "true"), "true", "true"); + // Try to disable remote storage + Optional error = incrementalAlterTopicConfig(admin, "diskless-no-disable-rs", Map.of( + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); + assertTrue(error.isPresent(), "Should not allow disabling remote storage on diskless topic"); + assertEquals(DISKLESS_REMOTE_SET_ERROR, error.get()); + } finally { + cluster.close(); + } + } + + @Test + void testDisklessCannotBeDisabled() throws Exception { + var cluster = initConsolidatedCluster(); + try (Admin admin = AdminClient.create(Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + // Create diskless topic + createTopicAndAssertEffective(admin, "diskless-no-disable", Map.of( + DISKLESS_ENABLE_CONFIG, "true"), "true", "true"); + // Try to disable diskless + Optional error = incrementalAlterTopicConfig(admin, "diskless-no-disable", Map.of( + DISKLESS_ENABLE_CONFIG, "false")); + assertTrue(error.isPresent(), "Should not allow disabling diskless"); + assertEquals(DISABLE_DISKLESS_ERROR, error.get()); + } finally { + cluster.close(); + } + } + + @Test + void testClassicToDisklessBlockedWithoutAllowFromClassic() throws Exception { + var cluster = initConsolidatedClusterWithoutAllowFromClassic(); + try (Admin admin = AdminClient.create(Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + // Create classic topic + createTopicAndAssertEffective(admin, "classic-blocked", Map.of(), "false", "false"); + // Attempt CLASSIC→DISKLESS switch without allow-from-classic + Optional error = incrementalAlterTopicConfig(admin, "classic-blocked", Map.of( + DISKLESS_ENABLE_CONFIG, "true", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")); + assertTrue(error.isPresent(), "CLASSIC→DISKLESS should be blocked without allow-from-classic"); + assertEquals(ENABLE_DISKLESS_ERROR, error.get()); + } finally { + cluster.close(); + } + } + + @Test + void testTieredToDisklessBlockedWithoutAllowFromClassic() throws Exception { + var cluster = initConsolidatedClusterWithoutAllowFromClassic(); + try (Admin admin = AdminClient.create(Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + // Create tiered topic + createTopicAndAssertEffective(admin, "tiered-blocked", + Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "false", "true"); + // Attempt TIERED→DISKLESS switch without allow-from-classic + Optional error = incrementalAlterTopicConfig(admin, "tiered-blocked", Map.of( + DISKLESS_ENABLE_CONFIG, "true", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")); + assertTrue(error.isPresent(), "TIERED→DISKLESS should be blocked without allow-from-classic"); + assertEquals(ENABLE_DISKLESS_ERROR, error.get()); + } finally { + cluster.close(); + } + } + + private KafkaClusterTestKit initConsolidatedClusterWithoutAllowFromClassic() throws Exception { + final TestKitNodes nodes = new TestKitNodes.Builder() + .setCombined(true) + .setNumBrokerNodes(1) + .setNumControllerNodes(1) + .build(); + var cluster = new KafkaClusterTestKit.Builder(nodes) + .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") + .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, "false") + .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "true") + .setConfigProp(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, "true") + .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, "false") + .setConfigProp(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, "true") + .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.USERNAME_CONFIG, PostgreSQLTestContainer.USERNAME) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.PASSWORD_CONFIG, PostgreSQLTestContainer.PASSWORD) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_BACKEND_CLASS_CONFIG, S3Storage.class.getName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_BUCKET_NAME_CONFIG, s3Container.getBucketName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_REGION_CONFIG, s3Container.getRegion()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_ENDPOINT_URL_CONFIG, s3Container.getEndpoint()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true") + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) + .build(); + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + return cluster; + } + + private KafkaClusterTestKit initConsolidatedCluster() throws Exception { + final TestKitNodes nodes = new TestKitNodes.Builder() + .setCombined(true) + .setNumBrokerNodes(1) + .setNumControllerNodes(1) + .build(); + var cluster = new KafkaClusterTestKit.Builder(nodes) + .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") + .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, "false") + .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "true") + .setConfigProp(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, "true") + .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, "true") + .setConfigProp(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, "true") + .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.USERNAME_CONFIG, PostgreSQLTestContainer.USERNAME) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.PASSWORD_CONFIG, PostgreSQLTestContainer.PASSWORD) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_BACKEND_CLASS_CONFIG, S3Storage.class.getName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_BUCKET_NAME_CONFIG, s3Container.getBucketName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_REGION_CONFIG, s3Container.getRegion()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_ENDPOINT_URL_CONFIG, s3Container.getEndpoint()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true") + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) + .build(); + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + return cluster; + } + + private Optional incrementalAlterTopicConfig(Admin admin, String topic, Map newConfigs) { + var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + var operations = newConfigs.entrySet().stream() + .map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET)) + .toList(); + try { + admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(10, TimeUnit.SECONDS); + return Optional.empty(); + } catch (Exception e) { + String message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); + return Optional.ofNullable(message); + } + } + } + private Map getTopicConfig(Admin admin, String topic) throws ExecutionException, InterruptedException, TimeoutException { int maxRetries = 3; diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 17e22cd11c..31adfb9bf5 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -35,6 +35,30 @@ import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListVa import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +/** + * Topic type transition matrix (consolidation enabled): + * + * Topic types: + * CLASSIC (diskless.enable=false, remote.storage.enable=false) — local-only storage + * TIERED (diskless.enable=false, remote.storage.enable=true) — tiered storage + * DISKLESS (diskless.enable=true, remote.storage.enable=true) — diskless with remote storage + * Forbidden state: diskless.enable=true, remote.storage.enable=false + * + * Creation: + * diskless.enable=true, no remote.storage.enable → VALID (controller auto-enables) + * diskless.enable=true, remote.storage.enable=true → VALID + * diskless.enable=true, remote.storage.enable=false → REJECTED (mutual exclusion) + * + * Alter (allow-from-classic + consolidation): + * CLASSIC → DISKLESS (diskless.enable=true, remote.storage.enable=true) → VALID (switch) + * CLASSIC → DISKLESS (diskless.enable=true, no remote.storage.enable) → VALID (controller auto-enables) + * CLASSIC (remote.storage.enable=false) → DISKLESS (diskless.enable=true) → REJECTED (mutual exclusion) + * TIERED → DISKLESS (diskless.enable=true, remote.storage.enable=true) → VALID (switch) + * DISKLESS → set remote.storage.enable=false → REJECTED (mutual exclusion) + * DISKLESS → set diskless.enable=false → REJECTED (unsupported) + * + * See also: DisklessAndRemoteStorageConfigsTest (integration-level equivalent) + */ class LogConfigTest { @Test From 4ff6dbb395ac47bceb3a3eb4aa279a92cbda6e22 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 1 Jun 2026 13:08:32 +0300 Subject: [PATCH 2/2] fixup! test(inkless:switch): add consolidated diskless integration tests with transition matrix --- .../DisklessAndRemoteStorageConfigsTest.java | 219 +++++------------- 1 file changed, 56 insertions(+), 163 deletions(-) diff --git a/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java b/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java index beb0e1218e..8ed6041934 100644 --- a/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java +++ b/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java @@ -223,19 +223,6 @@ void validatesUpdateCases() throws Exception { } } - private Optional incrementalAlterTopicConfig(Admin admin, String topic, Map newConfigs) { - var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); - var operations = newConfigs.entrySet().stream() - .map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET)) - .toList(); - try { - admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(10, TimeUnit.SECONDS); - return Optional.empty(); - } catch (Exception e) { - String message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); - return Optional.ofNullable(message); - } - } } private KafkaClusterTestKit initCluster() throws Exception { @@ -300,166 +287,106 @@ private void createTopicAndAssertEffective(Admin admin, *

Topic type transition matrix (consolidation enabled): *

      * #  | From       → To        | Condition                                                   | Result                     | Covered by
-     * ---+------------+-----------+-------------------------------------------------------------+----------------------------+------------------------------
-     * 1  | (none)     → DISKLESS  | diskless.enable=true, no remote.storage.enable in request   | VALID (auto-enabled)       | testCreateDisklessAutoEnablesRemoteStorage
-     * 2  | (none)     → DISKLESS  | diskless.enable=true, remote.storage.enable=true            | VALID                      | testCreateDisklessAutoEnablesRemoteStorage
-     * 3  | (none)     → DISKLESS  | diskless.enable=true, remote.storage.enable=false           | REJECTED                   | testCreateDisklessWithExplicitRemoteFalseRejected
-     * 4  | CLASSIC    → DISKLESS  | allow-from-classic=true                                     | VALID (switch)             | testClassicToDisklessSwitch
-     * 5  | CLASSIC    → DISKLESS  | allow-from-classic=false                                    | REJECTED                   | testClassicToDisklessBlockedWithoutAllowFromClassic
-     * 6  | TIERED     → DISKLESS  | allow-from-classic=true                                     | VALID (switch)             | testTieredToDisklessSwitch
-     * 7  | TIERED     → DISKLESS  | allow-from-classic=false                                    | REJECTED                   | testTieredToDisklessBlockedWithoutAllowFromClassic
-     * 8  | DISKLESS   → forbidden | remote.storage.enable=false                                 | REJECTED (mutual exclusion)| testDisklessCannotDisableRemoteStorage
-     * 9  | DISKLESS   → TIERED    | diskless.enable=false                                       | REJECTED (unsupported)     | testDisklessCannotDisableDiskless
+     * ---+------------+-----------+-------------------------------------------------------------+----------------------------+---------------------------------------------
+     * 1  | (none)     → DISKLESS  | diskless.enable=true, no remote.storage.enable in request   | VALID (auto-enabled)       | testConsolidatedTransitionsWithAllowFromClassic
+     * 2  | (none)     → DISKLESS  | diskless.enable=true, remote.storage.enable=true            | VALID                      | testConsolidatedTransitionsWithAllowFromClassic
+     * 3  | (none)     → DISKLESS  | diskless.enable=true, remote.storage.enable=false           | REJECTED                   | testConsolidatedTransitionsWithAllowFromClassic
+     * 4  | CLASSIC    → DISKLESS  | allow-from-classic=true                                     | VALID (switch)             | testConsolidatedTransitionsWithAllowFromClassic
+     * 5  | CLASSIC    → DISKLESS  | allow-from-classic=false                                    | REJECTED                   | testConsolidatedTransitionsWithoutAllowFromClassic
+     * 6  | TIERED     → DISKLESS  | allow-from-classic=true                                     | VALID (switch)             | testConsolidatedTransitionsWithAllowFromClassic
+     * 7  | TIERED     → DISKLESS  | allow-from-classic=false                                    | REJECTED                   | testConsolidatedTransitionsWithoutAllowFromClassic
+     * 8  | DISKLESS   → forbidden | remote.storage.enable=false                                 | REJECTED (mutual exclusion)| testConsolidatedTransitionsWithAllowFromClassic
+     * 9  | DISKLESS   → TIERED    | diskless.enable=false                                       | REJECTED (irreversible)    | testConsolidatedTransitionsWithAllowFromClassic
      * 
*/ @Nested class ConsolidatedDisklessTopics { @Test - void testCreateDisklessAutoEnablesRemoteStorage() throws Exception { - var cluster = initConsolidatedCluster(); + void testConsolidatedTransitionsWithAllowFromClassic() throws Exception { + var cluster = initConsolidatedCluster(true); try (Admin admin = AdminClient.create(Map.of( CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { - // Create diskless topic without explicit remote.storage.enable — controller auto-enables + // Scenario 1: Create diskless without explicit remote.storage.enable — controller auto-enables createTopicAndAssertEffective(admin, "diskless-auto-rs", Map.of( DISKLESS_ENABLE_CONFIG, "true"), "true", "true"); - // Create diskless topic with explicit remote.storage.enable=true + + // Scenario 2: Create diskless with explicit remote.storage.enable=true createTopicAndAssertEffective(admin, "diskless-rs-true", Map.of( DISKLESS_ENABLE_CONFIG, "true", REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "true", "true"); - } finally { - cluster.close(); - } - } - @Test - void testCreateDisklessWithExplicitRemoteFalseRejected() throws Exception { - var cluster = initConsolidatedCluster(); - try (Admin admin = AdminClient.create(Map.of( - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + // Scenario 3: Create diskless with remote.storage.enable=false — rejected Optional error = createTopic(admin, "diskless-rs-false", Map.of( DISKLESS_ENABLE_CONFIG, "true", REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); assertTrue(error.isPresent(), "Should reject diskless with remote.storage.enable=false"); - } finally { - cluster.close(); - } - } - @Test - void testClassicToDisklessSwitch() throws Exception { - var cluster = initConsolidatedCluster(); - try (Admin admin = AdminClient.create(Map.of( - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { - // Create classic topic + // Scenario 4: CLASSIC → DISKLESS switch createTopicAndAssertEffective(admin, "classic-to-diskless", Map.of(), "false", "false"); - // Switch to diskless assertTrue(incrementalAlterTopicConfig(admin, "classic-to-diskless", Map.of( DISKLESS_ENABLE_CONFIG, "true", REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")).isEmpty(), "CLASSIC→DISKLESS switch should succeed with allow-from-classic + consolidation"); - var config = getTopicConfig(admin, "classic-to-diskless"); - assertEquals("true", config.get(DISKLESS_ENABLE_CONFIG)); - assertEquals("true", config.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); - } finally { - cluster.close(); - } - } + var classicConfig = getTopicConfig(admin, "classic-to-diskless"); + assertEquals("true", classicConfig.get(DISKLESS_ENABLE_CONFIG)); + assertEquals("true", classicConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); - @Test - void testTieredToDisklessSwitch() throws Exception { - var cluster = initConsolidatedCluster(); - try (Admin admin = AdminClient.create(Map.of( - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { - // Create tiered topic + // Scenario 6: TIERED → DISKLESS switch createTopicAndAssertEffective(admin, "tiered-to-diskless", Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "false", "true"); - // Switch to diskless assertTrue(incrementalAlterTopicConfig(admin, "tiered-to-diskless", Map.of( DISKLESS_ENABLE_CONFIG, "true", REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")).isEmpty(), "TIERED→DISKLESS switch should succeed with allow-from-classic"); - var config = getTopicConfig(admin, "tiered-to-diskless"); - assertEquals("true", config.get(DISKLESS_ENABLE_CONFIG)); - assertEquals("true", config.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); - } finally { - cluster.close(); - } - } + var tieredConfig = getTopicConfig(admin, "tiered-to-diskless"); + assertEquals("true", tieredConfig.get(DISKLESS_ENABLE_CONFIG)); + assertEquals("true", tieredConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); - @Test - void testDisklessCannotDisableRemoteStorage() throws Exception { - var cluster = initConsolidatedCluster(); - try (Admin admin = AdminClient.create(Map.of( - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { - // Create diskless topic (RS auto-enabled by controller) + // Scenario 8: DISKLESS cannot disable remote storage createTopicAndAssertEffective(admin, "diskless-no-disable-rs", Map.of( DISKLESS_ENABLE_CONFIG, "true"), "true", "true"); - // Try to disable remote storage - Optional error = incrementalAlterTopicConfig(admin, "diskless-no-disable-rs", Map.of( + Optional disableRsError = incrementalAlterTopicConfig(admin, "diskless-no-disable-rs", Map.of( REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); - assertTrue(error.isPresent(), "Should not allow disabling remote storage on diskless topic"); - assertEquals(DISKLESS_REMOTE_SET_ERROR, error.get()); - } finally { - cluster.close(); - } - } + assertTrue(disableRsError.isPresent(), "Should not allow disabling remote storage on diskless topic"); + assertEquals(DISKLESS_REMOTE_SET_ERROR, disableRsError.get()); - @Test - void testDisklessCannotBeDisabled() throws Exception { - var cluster = initConsolidatedCluster(); - try (Admin admin = AdminClient.create(Map.of( - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { - // Create diskless topic - createTopicAndAssertEffective(admin, "diskless-no-disable", Map.of( - DISKLESS_ENABLE_CONFIG, "true"), "true", "true"); - // Try to disable diskless - Optional error = incrementalAlterTopicConfig(admin, "diskless-no-disable", Map.of( + // Scenario 9: DISKLESS cannot be disabled + Optional disableDisklessError = incrementalAlterTopicConfig(admin, "diskless-no-disable-rs", Map.of( DISKLESS_ENABLE_CONFIG, "false")); - assertTrue(error.isPresent(), "Should not allow disabling diskless"); - assertEquals(DISABLE_DISKLESS_ERROR, error.get()); + assertTrue(disableDisklessError.isPresent(), "Should not allow disabling diskless"); + assertEquals(DISABLE_DISKLESS_ERROR, disableDisklessError.get()); } finally { cluster.close(); } } @Test - void testClassicToDisklessBlockedWithoutAllowFromClassic() throws Exception { - var cluster = initConsolidatedClusterWithoutAllowFromClassic(); + void testConsolidatedTransitionsWithoutAllowFromClassic() throws Exception { + var cluster = initConsolidatedCluster(false); try (Admin admin = AdminClient.create(Map.of( CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { - // Create classic topic + // Scenario 5: CLASSIC → DISKLESS blocked without allow-from-classic createTopicAndAssertEffective(admin, "classic-blocked", Map.of(), "false", "false"); - // Attempt CLASSIC→DISKLESS switch without allow-from-classic - Optional error = incrementalAlterTopicConfig(admin, "classic-blocked", Map.of( + Optional classicError = incrementalAlterTopicConfig(admin, "classic-blocked", Map.of( DISKLESS_ENABLE_CONFIG, "true", REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")); - assertTrue(error.isPresent(), "CLASSIC→DISKLESS should be blocked without allow-from-classic"); - assertEquals(ENABLE_DISKLESS_ERROR, error.get()); - } finally { - cluster.close(); - } - } + assertTrue(classicError.isPresent(), "CLASSIC→DISKLESS should be blocked without allow-from-classic"); + assertEquals(ENABLE_DISKLESS_ERROR, classicError.get()); - @Test - void testTieredToDisklessBlockedWithoutAllowFromClassic() throws Exception { - var cluster = initConsolidatedClusterWithoutAllowFromClassic(); - try (Admin admin = AdminClient.create(Map.of( - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { - // Create tiered topic + // Scenario 7: TIERED → DISKLESS blocked without allow-from-classic createTopicAndAssertEffective(admin, "tiered-blocked", Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "false", "true"); - // Attempt TIERED→DISKLESS switch without allow-from-classic - Optional error = incrementalAlterTopicConfig(admin, "tiered-blocked", Map.of( + Optional tieredError = incrementalAlterTopicConfig(admin, "tiered-blocked", Map.of( DISKLESS_ENABLE_CONFIG, "true", REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")); - assertTrue(error.isPresent(), "TIERED→DISKLESS should be blocked without allow-from-classic"); - assertEquals(ENABLE_DISKLESS_ERROR, error.get()); + assertTrue(tieredError.isPresent(), "TIERED→DISKLESS should be blocked without allow-from-classic"); + assertEquals(ENABLE_DISKLESS_ERROR, tieredError.get()); } finally { cluster.close(); } } - private KafkaClusterTestKit initConsolidatedClusterWithoutAllowFromClassic() throws Exception { + private KafkaClusterTestKit initConsolidatedCluster(boolean allowFromClassic) throws Exception { final TestKitNodes nodes = new TestKitNodes.Builder() .setCombined(true) .setNumBrokerNodes(1) @@ -470,41 +397,7 @@ private KafkaClusterTestKit initConsolidatedClusterWithoutAllowFromClassic() thr .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, "false") .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "true") .setConfigProp(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, "true") - .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, "false") - .setConfigProp(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, "true") - .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") - .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") - .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") - .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.USERNAME_CONFIG, PostgreSQLTestContainer.USERNAME) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.PASSWORD_CONFIG, PostgreSQLTestContainer.PASSWORD) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_BACKEND_CLASS_CONFIG, S3Storage.class.getName()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_BUCKET_NAME_CONFIG, s3Container.getBucketName()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_REGION_CONFIG, s3Container.getRegion()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_ENDPOINT_URL_CONFIG, s3Container.getEndpoint()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true") - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) - .build(); - cluster.format(); - cluster.startup(); - cluster.waitForReadyBrokers(); - return cluster; - } - - private KafkaClusterTestKit initConsolidatedCluster() throws Exception { - final TestKitNodes nodes = new TestKitNodes.Builder() - .setCombined(true) - .setNumBrokerNodes(1) - .setNumControllerNodes(1) - .build(); - var cluster = new KafkaClusterTestKit.Builder(nodes) - .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, "false") - .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "true") - .setConfigProp(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, "true") - .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, "true") + .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(allowFromClassic)) .setConfigProp(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, "true") .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") @@ -526,19 +419,19 @@ private KafkaClusterTestKit initConsolidatedCluster() throws Exception { cluster.waitForReadyBrokers(); return cluster; } + } - private Optional incrementalAlterTopicConfig(Admin admin, String topic, Map newConfigs) { - var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); - var operations = newConfigs.entrySet().stream() - .map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET)) - .toList(); - try { - admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(10, TimeUnit.SECONDS); - return Optional.empty(); - } catch (Exception e) { - String message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); - return Optional.ofNullable(message); - } + private Optional incrementalAlterTopicConfig(Admin admin, String topic, Map newConfigs) { + var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + var operations = newConfigs.entrySet().stream() + .map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET)) + .toList(); + try { + admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(10, TimeUnit.SECONDS); + return Optional.empty(); + } catch (Exception e) { + String message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); + return Optional.ofNullable(message); } }