Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions core/src/test/java/kafka/server/InklessConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,51 @@ void compactedRegexExcludedTopicIsExcludedFromForcePolicy() throws Exception {
}
}

@Nested
final class DisklessAllowFromClassic {

@Test
void cannotEnableDisklessOnClassicTopicWhenAllowFromClassicDisabled() throws Exception {
final KafkaClusterTestKit cluster = init(false, true, false);
final Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (final Admin admin = AdminClient.create(clientConfigs)) {
final String topicWithoutDisklessSet = "classic-topic-no-diskless";
createTopic(admin, topicWithoutDisklessSet, Map.of());
assertEquals("false", getTopicConfig(admin, topicWithoutDisklessSet).get(DISKLESS_ENABLE_CONFIG));
assertThrows(ExecutionException.class,
() -> alterTopicConfig(admin, topicWithoutDisklessSet, Map.of(DISKLESS_ENABLE_CONFIG, "true")));

final String topicWithDisklessFalse = "classic-topic-diskless-false";
createTopic(admin, topicWithDisklessFalse, Map.of(DISKLESS_ENABLE_CONFIG, "false"));
assertEquals("false", getTopicConfig(admin, topicWithDisklessFalse).get(DISKLESS_ENABLE_CONFIG));
assertThrows(ExecutionException.class,
() -> alterTopicConfig(admin, topicWithDisklessFalse, Map.of(DISKLESS_ENABLE_CONFIG, "true")));
} finally {
cluster.close();
}
}

@Test
void canEnableDisklessOnClassicTopicWhenAllowFromClassicEnabled() throws Exception {
final KafkaClusterTestKit cluster = init(false, true, true);
final Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (final Admin admin = AdminClient.create(clientConfigs)) {
final String topic = "classic-topic-to-diskless";
createTopic(admin, topic, Map.of());
assertEquals("false", getTopicConfig(admin, topic).get(DISKLESS_ENABLE_CONFIG));

alterTopicConfig(admin, topic, Map.of(DISKLESS_ENABLE_CONFIG, "true"));
assertEquals("true", getTopicConfig(admin, topic).get(DISKLESS_ENABLE_CONFIG));
} finally {
cluster.close();
}
}
}

public void createTopic(Admin admin, String topic, Map<String, String> configs) throws Exception {
admin.createTopics(Collections.singletonList(
new NewTopic(topic, 1, (short) 1)
Expand Down
43 changes: 39 additions & 4 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -553,19 +553,54 @@ class LogConfigTest {
kafkaConfig)
}

@Test
def testDisklessAllowFromClassicAtUpdate(): Unit = {
val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig())
val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable."
val existingWithoutDisklessOrRemote = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")
val existingWithDisklessFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")
val existingWithDisklessTrue = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
val existingWithRemoteFalse = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
val existingWithRemoteTrue = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")

// Case 1: set diskless.enable=true with allowFromClassic=true
val setDisklessTrue = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true")

assertValid(existingWithoutDisklessOrRemote, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true)
assertValid(existingWithDisklessFalse, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true)
assertValid(existingWithDisklessTrue, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true)
// Mutual exclusion still applies even with allowFromClassic
assertInvalid(existingWithRemoteFalse, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true)
assertInvalid(existingWithRemoteTrue, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true)

// Case 2: set diskless.enable=false with allowFromClassic=true - disabling diskless is still forbidden
val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false")

assertValid(existingWithoutDisklessOrRemote, setDisklessFalse, kafkaConfig, disklessAllowFromClassic = true)
assertValid(existingWithDisklessFalse, setDisklessFalse, kafkaConfig, disklessAllowFromClassic = true)
assertInvalid(existingWithDisklessTrue, setDisklessFalse,
"It is invalid to disable diskless.", kafkaConfig, disklessAllowFromClassic = true)
assertInvalid(existingWithRemoteFalse, setDisklessFalse,
mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true)
assertInvalid(existingWithRemoteTrue, setDisklessFalse,
mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true)
}

private def topicProps(entries: (String, String)*): Properties = {
val props = new Properties()
entries.foreach { case (k, v) => props.put(k, v) }
props
}

private def assertValid(existingConfigs: util.Map[String, String], props: Properties, kafkaConfig: KafkaConfig): Unit = {
LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true)
private def assertValid(existingConfigs: util.Map[String, String], props: Properties, kafkaConfig: KafkaConfig,
disklessAllowFromClassic: Boolean = false): Unit = {
LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true, disklessAllowFromClassic)
}

private def assertInvalid(existingConfigs: util.Map[String, String], props: Properties, expectedMessage: String, kafkaConfig: KafkaConfig): Unit = {
private def assertInvalid(existingConfigs: util.Map[String, String], props: Properties, expectedMessage: String,
kafkaConfig: KafkaConfig, disklessAllowFromClassic: Boolean = false): Unit = {
val ex = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true))
() => LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true, disklessAllowFromClassic))
assertEquals(expectedMessage, ex.getMessage)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
Expand Down Expand Up @@ -277,7 +278,10 @@ private ApiError incrementalAlterConfigResource(
configResource.type().equals(Type.TOPIC) &&
Objects.equals(key, TopicConfig.DISKLESS_ENABLE_CONFIG) &&
Boolean.parseBoolean(opValue) &&
!Boolean.parseBoolean(currentValue)) {
!Boolean.parseBoolean(currentValue) &&
!Boolean.parseBoolean(String.valueOf(staticConfig.getOrDefault(
ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG,
ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT)))) {
return ApiError.fromThrowable(
new InvalidConfigurationException("It is invalid to enable diskless on an already existing topic."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ public static void validateBrokerLogConfigValues(Map<?, ?> props,
private static void validateDiskless(Map<String, String> existingConfigs,
Map<String, Object> requestedConfigs,
Map<?, ?> newConfigs,
boolean isDisklessStorageSystemEnabled) {
boolean isDisklessStorageSystemEnabled,
boolean isDisklessAllowFromClassicEnabled) {
final boolean isCreation = existingConfigs.isEmpty();
final boolean isDisklessExplicitlySet = requestedConfigs.containsKey(TopicConfig.DISKLESS_ENABLE_CONFIG);
final boolean isRemoteStorageExplicitlySet = requestedConfigs.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
Expand All @@ -518,7 +519,7 @@ private static void validateDiskless(Map<String, String> existingConfigs,
isDisklessEnabled = wasDisklessEnabled;
}

validateDisklessTransition(isCreation, isDisklessExplicitlySet, isDisklessEnabled, wasDisklessEnabled);
validateDisklessTransition(isCreation, isDisklessExplicitlySet, isDisklessEnabled, wasDisklessEnabled, isDisklessAllowFromClassicEnabled);

// Only one between diskless.enable and remote.storage.enable can be set, no matter the value.
final boolean hasExplicitDiskless = isDisklessExplicitlySet || wasDisklessExplicitlySet;
Expand All @@ -529,9 +530,9 @@ private static void validateDiskless(Map<String, String> existingConfigs,
private static void validateDisklessTransition(boolean isCreation,
boolean isDisklessExplicitlySet,
boolean isDisklessEnabled,
boolean wasDisklessEnabled) {
// Diskless can be enabled only at creation
if (!isCreation && isDisklessExplicitlySet && isDisklessEnabled && !wasDisklessEnabled) {
boolean wasDisklessEnabled,
boolean isDisklessAllowFromClassicEnabled) {
if (!isCreation && isDisklessExplicitlySet && isDisklessEnabled && !wasDisklessEnabled && !isDisklessAllowFromClassicEnabled) {
throw new InvalidConfigurationException("It is invalid to enable diskless on an already existing topic.");
}

Expand Down Expand Up @@ -565,9 +566,10 @@ private static void validateTopicLogConfigValues(Map<String, String> existingCon
Map<String, Object> requestedConfigs,
Map<?, ?> newConfigs,
boolean isRemoteLogStorageSystemEnabled,
boolean isDisklessStorageSystemEnabled) {
boolean isDisklessStorageSystemEnabled,
boolean isDisklessAllowFromClassicEnabled) {
validateValues(newConfigs);
validateDiskless(existingConfigs, requestedConfigs, newConfigs, isDisklessStorageSystemEnabled);
validateDiskless(existingConfigs, requestedConfigs, newConfigs, isDisklessStorageSystemEnabled, isDisklessAllowFromClassicEnabled);

boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
Expand Down Expand Up @@ -702,7 +704,7 @@ public static void validate(Map<String, String> existingConfigs,
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateTopicLogConfigValues(existingConfigs, Utils.castToStringObjectMap(props), valueMaps,
isRemoteLogStorageSystemEnabled, isDisklessStorageSystemEnabled);
isRemoteLogStorageSystemEnabled, isDisklessStorageSystemEnabled, isDisklessAllowFromClassicEnabled);
}
}

Expand Down
Loading