Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions core/src/test/java/kafka/server/InklessConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.server.IntegrationTestUtils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
Expand All @@ -46,6 +51,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -163,8 +169,10 @@ public void disklessTopicConfigs(boolean defaultDisklessEnableConfig) throws Exc
assertEquals("true", disklessTopicConfig.get(DISKLESS_ENABLE_CONFIG));
// Then it's not possible turn off diskless after the topic is created
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, disklessTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")));
assertThrows(ExecutionException.class, () -> legacyAlterTopicConfig(admin, disklessTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")));
// Then it's not possible to delete the diskless.enable config
assertThrows(ExecutionException.class, () -> deleteTopicConfigs(admin, disklessTopic, List.of(DISKLESS_ENABLE_CONFIG)));
assertThrows(ExecutionException.class, () -> legacyAlterTopicConfig(admin, disklessTopic, Map.of()));

admin.close();
cluster.close();
Expand All @@ -186,6 +194,7 @@ public void classicTopicWithDisklessDefaultFalseConfigs() throws Exception {
assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG));
// Then it's not possible turn on diskless after the topic is created
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")));
assertThrows(ExecutionException.class, () -> legacyAlterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")));
// Then it's not possible to delete the diskless.enable config
assertThrows(ExecutionException.class, () -> deleteTopicConfigs(admin, classicTopic, List.of(DISKLESS_ENABLE_CONFIG)));

Expand All @@ -198,8 +207,11 @@ public void classicTopicWithDisklessDefaultFalseConfigs() throws Exception {
// Then it's not possible turn on diskless after the topic is created
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, disklessDisabledTopic, Map.of(
DISKLESS_ENABLE_CONFIG, "true")));
assertThrows(ExecutionException.class, () -> legacyAlterTopicConfig(admin, disklessDisabledTopic, Map.of(
DISKLESS_ENABLE_CONFIG, "true")));
// Then it's not possible to delete the diskless.enable config
assertThrows(ExecutionException.class, () -> deleteTopicConfigs(admin, classicTopic, List.of(DISKLESS_ENABLE_CONFIG)));
assertThrows(ExecutionException.class, () -> legacyAlterTopicConfig(admin, disklessDisabledTopic, Map.of()));

admin.close();
cluster.close();
Expand All @@ -221,8 +233,11 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception {
// Then it's not possible turn on diskless after the topic is created
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, disklessDisabledTopic, Map.of(
DISKLESS_ENABLE_CONFIG, "true")));
assertThrows(ExecutionException.class, () -> legacyAlterTopicConfig(admin, disklessDisabledTopic, Map.of(
DISKLESS_ENABLE_CONFIG, "true")));
// Then it's not possible to delete diskless.enable=false because the default is true and it would enable diskless
assertThrows(ExecutionException.class, () -> deleteTopicConfigs(admin, disklessDisabledTopic, List.of(DISKLESS_ENABLE_CONFIG)));
assertThrows(ExecutionException.class, () -> legacyAlterTopicConfig(admin, disklessDisabledTopic, Map.of()));
}
cluster.close();
}
Expand Down Expand Up @@ -519,10 +534,31 @@ private void alterTopicConfig(Admin admin, String topic, Map<String, String> new
admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(10, TimeUnit.SECONDS);
}

private void legacyAlterTopicConfig(Admin admin, String topic, Map<String, String> newConfigs) throws Exception {
var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
var configEntries = newConfigs.entrySet().stream()
.map(entry -> new AlterConfigsRequest.ConfigEntry(entry.getKey(), entry.getValue()))
.toList();
var config = new AlterConfigsRequest.Config(configEntries);
var request = new AlterConfigsRequest.Builder(Map.of(topicResource, config), false)
.build(ApiKeys.ALTER_CONFIGS.latestVersion());
AlterConfigsResponse response = IntegrationTestUtils.connectAndReceive(request, firstBrokerPort(admin));
var apiError = response.errors().get(topicResource);
assertNotNull(apiError, "Legacy AlterConfigs response did not contain topic resource " + topicResource);
if (apiError.error() != Errors.NONE) {
CompletableFuture.failedFuture(apiError.exception()).get(10, TimeUnit.SECONDS);
}
}

private void deleteTopicConfigs(Admin admin, String topic, Collection<String> configsToDelete) throws Exception {
var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
var deleteEntries = configsToDelete.stream().map(configToDelete -> new AlterConfigOp(new ConfigEntry(configToDelete, ""), AlterConfigOp.OpType.DELETE)).toList();
admin.incrementalAlterConfigs(Map.of(topicResource, deleteEntries)).all().get(10, TimeUnit.SECONDS);
}

private int firstBrokerPort(Admin admin) throws Exception {
var nodes = admin.describeCluster().nodes().get(10, TimeUnit.SECONDS);
return nodes.iterator().next().port();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,25 @@ private ApiError validateAlterConfig(
return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
} else if (isDisallowedClusterMinIsrTransition(configRecord)) {
return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
} else if (Objects.equals(configRecord.name(), TopicConfig.DISKLESS_ENABLE_CONFIG)) {
return ApiError.fromThrowable(
new InvalidConfigurationException("It is not allowed to delete the diskless.enable config"));
} else {
allConfigs.remove(configRecord.name());
}
// As per KAFKA-14195, do not include implicit deletions caused by using the legacy AlterConfigs API
// in the list passed to the policy in order to maintain backwards compatibility
}
if (!newlyCreatedResource &&
configResource.type().equals(Type.TOPIC) &&
Boolean.parseBoolean(allConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) &&
!Boolean.parseBoolean(existingConfigsMap.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) &&
!Boolean.parseBoolean(String.valueOf(staticConfig.getOrDefault(
ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG,
ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT)))) {
return ApiError.fromThrowable(
new InvalidConfigurationException("It is invalid to enable diskless on an already existing topic."));
}
if (!newlyCreatedResource &&
configResource.type().equals(Type.TOPIC) &&
Boolean.parseBoolean(allConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3008,23 +3008,16 @@ ApiError validateClassicToDisklessSwitchPrecondition(

/**
* Per-resource precondition check for legacy AlterConfigs enabling diskless.
* Covers both explicit {@code diskless.enable=true} in the request and implicit
* enabling via deletion of a {@code diskless.enable=false} override when the broker
* default is {@code true}.
* Legacy AlterConfigs omitting {@code diskless.enable} is rejected by
* {@link ConfigurationControlManager}, since omissions delete existing overrides.
*/
ApiError validateClassicToDisklessSwitchPreconditionForLegacy(
ConfigResource resource,
Map<ConfigResource, Map<String, String>> newConfigs
) {
if (resource.type() != TOPIC) return ApiError.NONE;
Map<String, String> configs = newConfigs.get(resource);
if (configs == null) return ApiError.NONE;
boolean explicitlyEnabling = Boolean.parseBoolean(configs.get(DISKLESS_ENABLE_CONFIG));
boolean implicitlyEnabling = !configs.containsKey(DISKLESS_ENABLE_CONFIG)
&& defaultDisklessEnable
&& !isDisklessTopic(resource.name())
&& "false".equals(configurationControl.currentTopicConfig(resource.name()).get(DISKLESS_ENABLE_CONFIG));
if (!explicitlyEnabling && !implicitlyEnabling) return ApiError.NONE;
if (configs == null || !Boolean.parseBoolean(configs.get(DISKLESS_ENABLE_CONFIG))) return ApiError.NONE;
if (isDisklessTopic(resource.name())) return ApiError.NONE;
Uuid topicId = topicsByName.get(resource.name());
if (topicId == null) return ApiError.NONE;
Expand Down Expand Up @@ -3165,14 +3158,6 @@ List<ApiMessageAndVersion> markClassicToDisklessSwitchStartedForLegacyAlterConfi
DISKLESS_ENABLE_CONFIG,
new SimpleImmutableEntry<>(SET, disklessEnable)
));
} else if (defaultDisklessEnable
&& resource.type() == TOPIC
&& !isDisklessTopic(resource.name())
&& "false".equals(configurationControl.currentTopicConfig(resource.name()).get(DISKLESS_ENABLE_CONFIG))) {
configChanges.put(resource, Map.of(
DISKLESS_ENABLE_CONFIG,
new SimpleImmutableEntry<>(SET, "true")
));
}
}
return markClassicToDisklessSwitchStarted(configChanges, configResults);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6900,49 +6900,11 @@ public void testBrokerFencingDoesNotTriggerUncleanElectionForPendingSwitchPartit
}

@Test
public void testLegacyAlterConfigsRejectsImplicitSwitchWhenUnderReplicated() {
public void testLegacyAlterConfigsRejectsImplicitDisklessEnableDeletion() {
// Legacy AlterConfigs replaces the entire config map. If a topic has
// diskless.enable=false and the request omits it, the override would be deleted,
// switching to diskless via broker default. The precondition check must
// detect this implicit switch and reject it when partitions are unhealthy.
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setStaticConfig(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, true)
.setDisklessStorageSystemEnabled(true)
.setDefaultDisklessEnable(true)
.build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}},
Map.of(DISKLESS_ENABLE_CONFIG, "false"), (short) 0).topicId();

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo");
assertEquals("false", ctx.configurationControl.currentTopicConfig("foo").get(DISKLESS_ENABLE_CONFIG));

// Make the partition under-replicated
ctx.fenceBrokers(2);
PartitionRegistration partition = ctx.replicationControl.getPartition(fooId, 0);
assertTrue(partition.isr.length < partition.replicas.length);

// Legacy AlterConfigs with only retention.ms (omits diskless.enable).
// Since this would implicitly switch via broker default and the partition
// is under-replicated, it must be rejected.
ControllerResult<Map<ConfigResource, ApiError>> legacyResult =
ctx.configurationControl.legacyAlterConfigs(
Map.of(resource, Map.of("retention.ms", "86400000")),
false,
r -> ctx.replicationControl.validateClassicToDisklessSwitchPreconditionForLegacy(
r, Map.of(resource, Map.of("retention.ms", "86400000"))));

assertEquals(Errors.INVALID_CONFIG, legacyResult.response().get(resource).error(),
"Legacy AlterConfigs should reject implicit diskless switch when under-replicated");
assertTrue(legacyResult.response().get(resource).message().contains("under-replicated"),
"Expected 'under-replicated' in: " + legacyResult.response().get(resource).message());
}

@Test
public void testLegacyAlterConfigsEmitsSwitchRecordsForImplicitSwitch() {
// When partitions are healthy and legacy AlterConfigs implicitly enables diskless
// via override deletion, switch-pending records must be emitted.
// switching to diskless via broker default. This must be rejected like an
// incremental DELETE of diskless.enable.
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setStaticConfig(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, true)
.setDisklessStorageSystemEnabled(true)
Expand All @@ -6954,27 +6916,29 @@ public void testLegacyAlterConfigsEmitsSwitchRecordsForImplicitSwitch() {
Map.of(DISKLESS_ENABLE_CONFIG, "false"), (short) 0);

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo");
assertEquals("false", ctx.configurationControl.currentTopicConfig("foo").get(DISKLESS_ENABLE_CONFIG));

// Legacy AlterConfigs with only retention.ms (omits diskless.enable).
// Partitions are healthy, so the implicit switch should succeed and produce
// switch-pending records.
// Since this would delete diskless.enable=false, it must be rejected.
Map<ConfigResource, Map<String, String>> newConfigs =
Map.of(resource, Map.of("retention.ms", "86400000"));
ControllerResult<Map<ConfigResource, ApiError>> legacyResult =
ctx.configurationControl.legacyAlterConfigs(newConfigs, false,
r -> ctx.replicationControl.validateClassicToDisklessSwitchPreconditionForLegacy(
r, newConfigs));
assertEquals(ApiError.NONE, legacyResult.response().get(resource));

// Call before replay (same order as QuorumController) — the override still
// exists in configData at this point.
assertEquals(Errors.INVALID_CONFIG, legacyResult.response().get(resource).error(),
"Legacy AlterConfigs should reject implicit diskless.enable deletion");
assertTrue(legacyResult.response().get(resource).message().contains("not allowed to delete"),
"Expected delete rejection in: " + legacyResult.response().get(resource).message());
assertTrue(legacyResult.records().isEmpty(),
"Rejected legacy AlterConfigs must not emit config records");

List<ApiMessageAndVersion> switchRecords =
ctx.replicationControl.markClassicToDisklessSwitchStartedForLegacyAlterConfigs(
newConfigs, legacyResult.response());
assertFalse(switchRecords.isEmpty(), "Expected switch-pending records for implicit diskless switch");

ctx.replay(legacyResult.records());
ctx.replay(switchRecords);
assertTrue(switchRecords.isEmpty(),
"Rejected implicit diskless deletion must not emit switch-pending records");
}

@Test
Expand Down
Loading