Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidRequestException;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.List;
Expand Down Expand Up @@ -59,6 +60,7 @@ public boolean intercept(
final Map<String, Entry<OpType, String>> targetConfigOps
) {
if (shouldForceDisklessEnable(topicName, requestConfigs)) {
rejectIfExplicitlyDisabled(topicName, requestConfigs);
targetConfigOps.put(TopicConfig.DISKLESS_ENABLE_CONFIG, new SimpleImmutableEntry<>(SET, "true"));
return true;
}
Expand All @@ -71,12 +73,21 @@ public boolean intercept(
final Map<String, String> targetConfigs
) {
if (shouldForceDisklessEnable(topicName, targetConfigs)) {
rejectIfExplicitlyDisabled(topicName, targetConfigs);
targetConfigs.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true");
return true;
}
return false;
}

private void rejectIfExplicitlyDisabled(final String topicName, final Map<String, String> configs) {
String disklessEnableConfigValue = configs.getOrDefault(TopicConfig.DISKLESS_ENABLE_CONFIG, "").trim();
if (!disklessEnableConfigValue.isEmpty() && !Boolean.parseBoolean(disklessEnableConfigValue)) {
throw new InvalidRequestException(
"Topic '" + topicName + "' matches the diskless force include regexes and cannot set diskless.enable=false.");
}
}

private boolean shouldForceDisklessEnable(
final String topicName,
final Map<String, String> topicConfigs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,11 +779,16 @@ ControllerResult<CreateTopicsResponseData> createTopics(
} else {
keyToOps = new HashMap<>(keyToOps);
}
createTopicConfigInterceptors.intercept(
topic.name(),
requestConfigs,
keyToOps
);
try {
createTopicConfigInterceptors.intercept(
topic.name(),
requestConfigs,
keyToOps
);
} catch (ApiException e) {
topicErrors.put(topic.name(), ApiError.fromThrowable(e));
continue;
}
Comment thread
gqmelo marked this conversation as resolved.
if (keyToOps.isEmpty()) {
keyToOps = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.internals.Topic;

import org.junit.jupiter.api.Test;
Expand All @@ -32,6 +33,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class DisklessForceCreateTopicInterceptorTest {

Expand All @@ -50,17 +52,21 @@ public void forcesDisklessWhenTopicMatchesRegex() {
}

@Test
public void forcesDisklessEvenWhenRequestHadDisklessFalse() {
public void throwsWhenRequestHasDisklessFalseAndTopicMatchesRegex() {
final var interceptor = new DisklessForceCreateTopicInterceptor(List.of("my-topic-.*"));
final Map<String, String> requestConfigs = new HashMap<>(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false"));
final Map<String, Entry<OpType, String>> targetConfigOps = new HashMap<>();
final Map<String, String> targetConfigs = new HashMap<>(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false"));

interceptor.intercept("my-topic-1", requestConfigs, targetConfigOps);
interceptor.intercept("my-topic-1", targetConfigs);

assertEquals(Map.entry(SET, "true"), targetConfigOps.get(TopicConfig.DISKLESS_ENABLE_CONFIG));
assertEquals("true", targetConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG));
for (String falseValue : List.of("FALSE", "False", "false ", " false", " FALSE ")) {
final Map<String, String> requestConfigs = new HashMap<>(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, falseValue));
final Map<String, Entry<OpType, String>> targetConfigOps = new HashMap<>();
final Map<String, String> targetConfigs = new HashMap<>(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, falseValue));

assertThrows(InvalidRequestException.class,
() -> interceptor.intercept("my-topic-1", requestConfigs, targetConfigOps),
"Should reject value: '" + falseValue + "'");
assertThrows(InvalidRequestException.class,
() -> interceptor.intercept("my-topic-1", targetConfigs),
"Should reject value: '" + falseValue + "'");
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ private static class Builder {
private boolean disklessRemoteStorageConsolidationEnabled = false;
private boolean classicRemoteStorageForceEnabled = false;
private List<String> classicRemoteStorageForceExcludeTopicRegexes = List.of();
private boolean disklessForceEnabled = false;
private List<String> disklessForceIncludeTopicRegexes = List.of();

Builder setCreateTopicPolicy(CreateTopicPolicy createTopicPolicy) {
this.createTopicPolicy = Optional.of(createTopicPolicy);
Expand Down Expand Up @@ -250,6 +252,16 @@ Builder setClassicRemoteStorageForceExcludeTopicRegexes(final List<String> class
return this;
}

Builder setDisklessForceEnabled(boolean disklessForceEnabled) {
this.disklessForceEnabled = disklessForceEnabled;
return this;
}

Builder setDisklessForceIncludeTopicRegexes(List<String> disklessForceIncludeTopicRegexes) {
this.disklessForceIncludeTopicRegexes = disklessForceIncludeTopicRegexes;
return this;
}

ReplicationControlTestContext build() {
return new ReplicationControlTestContext(metadataVersion,
createTopicPolicy,
Expand All @@ -261,7 +273,9 @@ ReplicationControlTestContext build() {
disklessManagedReplicasEnable,
disklessRemoteStorageConsolidationEnabled,
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes);
classicRemoteStorageForceExcludeTopicRegexes,
disklessForceEnabled,
disklessForceIncludeTopicRegexes);
}
}

Expand Down Expand Up @@ -292,7 +306,9 @@ private ReplicationControlTestContext(
boolean disklessManagedReplicasEnable,
boolean disklessRemoteStorageConsolidationEnabled,
final boolean classicRemoteStorageForceEnabled,
final List<String> classicRemoteStorageForceExcludeTopicRegexes
final List<String> classicRemoteStorageForceExcludeTopicRegexes,
final boolean disklessForceEnabled,
final List<String> disklessForceIncludeTopicRegexes
) {
this.time = time;
this.featureControl = new FeatureControlManager.Builder().
Expand Down Expand Up @@ -342,6 +358,8 @@ private ReplicationControlTestContext(
setDisklessRemoteStorageConsolidationEnabled(disklessRemoteStorageConsolidationEnabled).
setClassicRemoteStorageForceEnabled(classicRemoteStorageForceEnabled).
setClassicRemoteStorageForceExcludeTopicRegexes(classicRemoteStorageForceExcludeTopicRegexes).
setDisklessForceEnabled(disklessForceEnabled).
setDisklessForceIncludeTopicRegexes(disklessForceIncludeTopicRegexes).
build();
clusterControl.activate();
}
Expand Down Expand Up @@ -3721,6 +3739,42 @@ void testForceClassicRemoteStorageEnableNotAppliedToExcludedRegexTopics() {
.noneMatch(r -> r.name().equals(REMOTE_LOG_STORAGE_ENABLE_CONFIG) && r.value().equals("true")));
}

@Test
void testDisklessForceInterceptorRejectsOnlyOffendingTopic() {
final ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setDisklessForceEnabled(true)
.setDisklessForceIncludeTopicRegexes(List.of("forced-.*"))
.setDisklessStorageSystemEnabled(true)
.build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);

final CreateTopicsRequestData request = new CreateTopicsRequestData();

// Topic that matches the regex and explicitly sets diskless.enable=false — should be rejected
final CreateTopicsRequestData.CreatableTopicConfigCollection badConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
badConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig()
.setName(DISKLESS_ENABLE_CONFIG)
.setValue("false"));
request.topics().add(new CreatableTopic()
.setName("forced-bad")
.setNumPartitions(1)
.setReplicationFactor((short) 1)
.setConfigs(badConfigs));

// Topic that does not match the regex — should succeed
request.topics().add(new CreatableTopic()
.setName("normal-topic")
.setNumPartitions(1)
.setReplicationFactor((short) 1));

final ControllerResult<CreateTopicsResponseData> result = ctx.replicationControl.createTopics(
anonymousContextFor(ApiKeys.CREATE_TOPICS), request, Set.of("forced-bad", "normal-topic"));

assertEquals(INVALID_REQUEST.code(), result.response().topics().find("forced-bad").errorCode());
assertEquals(NONE.code(), result.response().topics().find("normal-topic").errorCode());
}

@Nested
// Tests Diskless single/unmanaged replica approach where a single replica is registered on KRaft but it's effectively ignored.
class DisklessUnmanagedReplicaTests {
Expand Down
Loading