From 98f90bf7cdf3251ecdcca0d4cb2bec270cc190cb Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Fri, 20 Mar 2026 15:37:35 +0100 Subject: [PATCH] feat(inkless:config): Enforce diskless.allow.from.classic.enable Allow set diskless.enable=true for already existing classic topics if diskless.allow.from.classic.enable is set to true. --- .../java/kafka/server/InklessConfigsTest.java | 45 +++++++++++++++++++ .../scala/unit/kafka/log/LogConfigTest.scala | 43 ++++++++++++++++-- .../ConfigurationControlManager.java | 6 ++- .../storage/internals/log/LogConfig.java | 18 ++++---- 4 files changed, 99 insertions(+), 13 deletions(-) diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index 59dd2d0128a..2dd79ffdae9 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -338,6 +338,51 @@ void compactedRegexExcludedTopicIsExcludedFromForcePolicy() throws Exception { } } + @Nested + final class DisklessAllowFromClassic { + + @Test + void cannotEnableDisklessOnClassicTopicWhenAllowFromClassicDisabled() throws Exception { + final KafkaClusterTestKit cluster = init(false, true, false); + final Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (final Admin admin = AdminClient.create(clientConfigs)) { + final String topicWithoutDisklessSet = "classic-topic-no-diskless"; + createTopic(admin, topicWithoutDisklessSet, Map.of()); + assertEquals("false", getTopicConfig(admin, topicWithoutDisklessSet).get(DISKLESS_ENABLE_CONFIG)); + assertThrows(ExecutionException.class, + () -> alterTopicConfig(admin, topicWithoutDisklessSet, Map.of(DISKLESS_ENABLE_CONFIG, "true"))); + + final String topicWithDisklessFalse = "classic-topic-diskless-false"; + createTopic(admin, topicWithDisklessFalse, Map.of(DISKLESS_ENABLE_CONFIG, "false")); + assertEquals("false", getTopicConfig(admin, topicWithDisklessFalse).get(DISKLESS_ENABLE_CONFIG)); + assertThrows(ExecutionException.class, + () -> alterTopicConfig(admin, topicWithDisklessFalse, Map.of(DISKLESS_ENABLE_CONFIG, "true"))); + } finally { + cluster.close(); + } + } + + @Test + void canEnableDisklessOnClassicTopicWhenAllowFromClassicEnabled() throws Exception { + final KafkaClusterTestKit cluster = init(false, true, true); + final Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (final Admin admin = AdminClient.create(clientConfigs)) { + final String topic = "classic-topic-to-diskless"; + createTopic(admin, topic, Map.of()); + assertEquals("false", getTopicConfig(admin, topic).get(DISKLESS_ENABLE_CONFIG)); + + alterTopicConfig(admin, topic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); + assertEquals("true", getTopicConfig(admin, topic).get(DISKLESS_ENABLE_CONFIG)); + } finally { + cluster.close(); + } + } + } + public void createTopic(Admin admin, String topic, Map configs) throws Exception { admin.createTopics(Collections.singletonList( new NewTopic(topic, 1, (short) 1) diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 409efc5f96e..7af1a6ee0be 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -553,19 +553,54 @@ class LogConfigTest { kafkaConfig) } + @Test + def testDisklessAllowFromClassicAtUpdate(): Unit = { + val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig()) + val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable." + val existingWithoutDisklessOrRemote = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000") + val existingWithDisklessFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + val existingWithDisklessTrue = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val existingWithRemoteFalse = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + val existingWithRemoteTrue = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + + // Case 1: set diskless.enable=true with allowFromClassic=true + val setDisklessTrue = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true") + + assertValid(existingWithoutDisklessOrRemote, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true) + assertValid(existingWithDisklessFalse, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true) + assertValid(existingWithDisklessTrue, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true) + // Mutual exclusion still applies even with allowFromClassic + assertInvalid(existingWithRemoteFalse, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) + assertInvalid(existingWithRemoteTrue, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) + + // Case 2: set diskless.enable=false with allowFromClassic=true - disabling diskless is still forbidden + val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false") + + assertValid(existingWithoutDisklessOrRemote, setDisklessFalse, kafkaConfig, disklessAllowFromClassic = true) + assertValid(existingWithDisklessFalse, setDisklessFalse, kafkaConfig, disklessAllowFromClassic = true) + assertInvalid(existingWithDisklessTrue, setDisklessFalse, + "It is invalid to disable diskless.", kafkaConfig, disklessAllowFromClassic = true) + assertInvalid(existingWithRemoteFalse, setDisklessFalse, + mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) + assertInvalid(existingWithRemoteTrue, setDisklessFalse, + mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) + } + private def topicProps(entries: (String, String)*): Properties = { val props = new Properties() entries.foreach { case (k, v) => props.put(k, v) } props } - private def assertValid(existingConfigs: util.Map[String, String], props: Properties, kafkaConfig: KafkaConfig): Unit = { - LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true) + private def assertValid(existingConfigs: util.Map[String, String], props: Properties, kafkaConfig: KafkaConfig, + disklessAllowFromClassic: Boolean = false): Unit = { + LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true, disklessAllowFromClassic) } - private def assertInvalid(existingConfigs: util.Map[String, String], props: Properties, expectedMessage: String, kafkaConfig: KafkaConfig): Unit = { + private def assertInvalid(existingConfigs: util.Map[String, String], props: Properties, expectedMessage: String, + kafkaConfig: KafkaConfig, disklessAllowFromClassic: Boolean = false): Unit = { val ex = assertThrows(classOf[InvalidConfigurationException], - () => LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true)) + () => LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true, disklessAllowFromClassic)) assertEquals(expectedMessage, ex.getMessage) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index d765bc067ff..dc908bec502 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -34,6 +34,7 @@ import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.mutable.BoundedList; import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata; @@ -277,7 +278,10 @@ private ApiError incrementalAlterConfigResource( configResource.type().equals(Type.TOPIC) && Objects.equals(key, TopicConfig.DISKLESS_ENABLE_CONFIG) && Boolean.parseBoolean(opValue) && - !Boolean.parseBoolean(currentValue)) { + !Boolean.parseBoolean(currentValue) && + !Boolean.parseBoolean(String.valueOf(staticConfig.getOrDefault( + ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, + ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT)))) { return ApiError.fromThrowable( new InvalidConfigurationException("It is invalid to enable diskless on an already existing topic.")); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 2ad30e04183..ceae6b0653b 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -498,7 +498,8 @@ public static void validateBrokerLogConfigValues(Map props, private static void validateDiskless(Map existingConfigs, Map requestedConfigs, Map newConfigs, - boolean isDisklessStorageSystemEnabled) { + boolean isDisklessStorageSystemEnabled, + boolean isDisklessAllowFromClassicEnabled) { final boolean isCreation = existingConfigs.isEmpty(); final boolean isDisklessExplicitlySet = requestedConfigs.containsKey(TopicConfig.DISKLESS_ENABLE_CONFIG); final boolean isRemoteStorageExplicitlySet = requestedConfigs.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); @@ -518,7 +519,7 @@ private static void validateDiskless(Map existingConfigs, isDisklessEnabled = wasDisklessEnabled; } - validateDisklessTransition(isCreation, isDisklessExplicitlySet, isDisklessEnabled, wasDisklessEnabled); + validateDisklessTransition(isCreation, isDisklessExplicitlySet, isDisklessEnabled, wasDisklessEnabled, isDisklessAllowFromClassicEnabled); // Only one between diskless.enable and remote.storage.enable can be set, no matter the value. final boolean hasExplicitDiskless = isDisklessExplicitlySet || wasDisklessExplicitlySet; @@ -529,9 +530,9 @@ private static void validateDiskless(Map existingConfigs, private static void validateDisklessTransition(boolean isCreation, boolean isDisklessExplicitlySet, boolean isDisklessEnabled, - boolean wasDisklessEnabled) { - // Diskless can be enabled only at creation - if (!isCreation && isDisklessExplicitlySet && isDisklessEnabled && !wasDisklessEnabled) { + boolean wasDisklessEnabled, + boolean isDisklessAllowFromClassicEnabled) { + if (!isCreation && isDisklessExplicitlySet && isDisklessEnabled && !wasDisklessEnabled && !isDisklessAllowFromClassicEnabled) { throw new InvalidConfigurationException("It is invalid to enable diskless on an already existing topic."); } @@ -565,9 +566,10 @@ private static void validateTopicLogConfigValues(Map existingCon Map requestedConfigs, Map newConfigs, boolean isRemoteLogStorageSystemEnabled, - boolean isDisklessStorageSystemEnabled) { + boolean isDisklessStorageSystemEnabled, + boolean isDisklessAllowFromClassicEnabled) { validateValues(newConfigs); - validateDiskless(existingConfigs, requestedConfigs, newConfigs, isDisklessStorageSystemEnabled); + validateDiskless(existingConfigs, requestedConfigs, newConfigs, isDisklessStorageSystemEnabled, isDisklessAllowFromClassicEnabled); boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); if (isRemoteLogStorageEnabled) { @@ -702,7 +704,7 @@ public static void validate(Map existingConfigs, combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); validateTopicLogConfigValues(existingConfigs, Utils.castToStringObjectMap(props), valueMaps, - isRemoteLogStorageSystemEnabled, isDisklessStorageSystemEnabled); + isRemoteLogStorageSystemEnabled, isDisklessStorageSystemEnabled, isDisklessAllowFromClassicEnabled); } }