Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class ControllerServer(
setDefaultNumPartitions(config.numPartitions.intValue()).
setDefaultDisklessEnable(config.logDisklessEnable).
setDisklessStorageSystemEnabled(config.disklessStorageSystemEnabled).
setDefaultRemoteStorageForTopicCreateEnabled(config.defaultRemoteStorageForTopicCreateEnabled).
setDefaultRemoteStorageForTopicCreateLocalOnlyTopicRegex(config.defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
/** Diskless Configuration */
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG)
val defaultRemoteStorageForTopicCreateEnabled: Boolean = getBoolean(ServerConfigs.DEFAULT_REMOTE_STORAGE_FOR_TOPIC_CREATE_ENABLE_CONFIG)
val defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex: String =
getString(ServerConfigs.DEFAULT_REMOTE_STORAGE_FOR_TOPIC_CREATE_LOCAL_ONLY_TOPIC_REGEX_CONFIG)

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
Expand Down

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ class LogConfigTest {
val t1 = assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true))
assertEquals("To migrate a classic topic to diskless, both diskless.enable and remote.storage.enable must be set to true, and the broker config diskless.allow.from.classic.enable must also be enabled.", t1.getMessage)
assertEquals("It is invalid to enable diskless on an already existing topic.", t1.getMessage)

// Add remote storage
val t2 = assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ public static class Builder {

private boolean defaultDisklessEnable = false;
private boolean disklessStorageSystemEnabled = false;
private boolean defaultRemoteStorageForTopicCreateEnabled = false;
private String defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex = "";

public Builder(int nodeId, String clusterId) {
this.nodeId = nodeId;
Expand Down Expand Up @@ -293,6 +295,21 @@ public Builder setDisklessStorageSystemEnabled(boolean disklessStorageSystemEnab
return this;
}

public Builder setDefaultRemoteStorageForTopicCreateEnabled(boolean defaultRemoteStorageForTopicCreateEnabled) {
this.defaultRemoteStorageForTopicCreateEnabled = defaultRemoteStorageForTopicCreateEnabled;
return this;
}

public Builder setDefaultRemoteStorageForTopicCreateLocalOnlyTopicRegex(
String defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex
) {
this.defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex =
defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex == null
? ""
: defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex;
return this;
}

public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
this.replicaPlacer = replicaPlacer;
return this;
Expand Down Expand Up @@ -442,6 +459,8 @@ public QuorumController build() throws Exception {
defaultNumPartitions,
defaultDisklessEnable,
disklessStorageSystemEnabled,
defaultRemoteStorageForTopicCreateEnabled,
defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex,
replicaPlacer,
leaderImbalanceCheckIntervalNs,
maxIdleIntervalNs,
Expand Down Expand Up @@ -1483,6 +1502,8 @@ private QuorumController(
int defaultNumPartitions,
boolean defaultDisklessEnable,
boolean disklessStorageSystemEnabled,
boolean defaultRemoteStorageForTopicCreateEnabled,
String defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex,
ReplicaPlacer replicaPlacer,
OptionalLong leaderImbalanceCheckIntervalNs,
OptionalLong maxIdleIntervalNs,
Expand Down Expand Up @@ -1567,6 +1588,8 @@ private QuorumController(
setDefaultNumPartitions(defaultNumPartitions).
setDefaultDisklessEnable(defaultDisklessEnable).
setDisklessStorageSystemEnabled(disklessStorageSystemEnabled).
setDefaultRemoteStorageForTopicCreateEnabled(defaultRemoteStorageForTopicCreateEnabled).
setDefaultRemoteStorageForTopicCreateLocalOnlyTopicRegex(defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ static class Builder {
private int defaultNumPartitions = 1;
private boolean defaultDisklessEnable = false;
private boolean isDisklessStorageSystemEnabled = false;
private boolean defaultRemoteStorageForTopicCreateEnabled = false;
private String defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex = "";

private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
private ConfigurationControlManager configurationControl = null;
Expand Down Expand Up @@ -199,6 +201,21 @@ public Builder setDisklessStorageSystemEnabled(boolean isDisklessStorageSystemEn
return this;
}

public Builder setDefaultRemoteStorageForTopicCreateEnabled(boolean defaultRemoteStorageForTopicCreateEnabled) {
this.defaultRemoteStorageForTopicCreateEnabled = defaultRemoteStorageForTopicCreateEnabled;
return this;
}

public Builder setDefaultRemoteStorageForTopicCreateLocalOnlyTopicRegex(
String defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex
) {
this.defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex =
defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex == null
? ""
: defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex;
return this;
}

Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
return this;
Expand Down Expand Up @@ -241,6 +258,8 @@ ReplicationControlManager build() {
defaultNumPartitions,
defaultDisklessEnable,
isDisklessStorageSystemEnabled,
defaultRemoteStorageForTopicCreateEnabled,
defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex,
maxElectionsPerImbalance,
configurationControl,
clusterControl,
Expand Down Expand Up @@ -319,6 +338,7 @@ static Map<String, String> translateCreationConfigs(CreatableTopicConfigCollecti
private final boolean defaultDisklessEnable;

private final boolean isDisklessStorageSystemEnabled;
private final TopicCreateRemoteStoragePolicy topicCreateRemoteStoragePolicy;

/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
Expand Down Expand Up @@ -410,6 +430,8 @@ private ReplicationControlManager(
int defaultNumPartitions,
boolean defaultDisklessEnable,
boolean isDisklessStorageSystemEnabled,
boolean defaultRemoteStorageForTopicCreateEnabled,
String defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex,
int maxElectionsPerImbalance,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
Expand All @@ -422,6 +444,10 @@ private ReplicationControlManager(
this.defaultNumPartitions = defaultNumPartitions;
this.defaultDisklessEnable = defaultDisklessEnable;
this.isDisklessStorageSystemEnabled = isDisklessStorageSystemEnabled;
this.topicCreateRemoteStoragePolicy = new TopicCreateRemoteStoragePolicy(
defaultRemoteStorageForTopicCreateEnabled,
defaultRemoteStorageForTopicCreateLocalOnlyTopicRegex
);
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
this.configurationControl = configurationControl;
this.createTopicPolicy = createTopicPolicy;
Expand Down Expand Up @@ -677,6 +703,8 @@ ControllerResult<CreateTopicsResponseData> createTopics(
// Figure out what ConfigRecords should be created, if any.
ConfigResource configResource = new ConfigResource(TOPIC, topic.name());
Map<String, Entry<OpType, String>> keyToOps = configChanges.get(configResource);
Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
keyToOps = topicCreateRemoteStoragePolicy.apply(topic.name(), keyToOps, isDisklessEnabledAtCreate(creationConfigs));
List<ApiMessageAndVersion> configRecords;
if (keyToOps != null) {
ControllerResult<ApiError> configResult =
Expand Down Expand Up @@ -882,7 +910,8 @@ private ApiError createTopic(ControllerRequestContext context,
return ApiError.fromThrowable(e);
}
Uuid topicId = Uuid.randomUuid();
final CreatableTopicResult result = buildCreatableTopicResult(topic, authorizedToReturnConfigs, topicId, creationConfigs, numPartitions, newParts);
final Map<String, String> responseCreationConfigs = normalizeCreateTopicResponseConfigs(topic.name(), creationConfigs, disklessEnabled);
final CreatableTopicResult result = buildCreatableTopicResult(topic, authorizedToReturnConfigs, topicId, responseCreationConfigs, numPartitions, newParts);

successes.put(topic.name(), result);
records.add(new ApiMessageAndVersion(new TopicRecord().
Expand All @@ -901,6 +930,31 @@ private ApiError createTopic(ControllerRequestContext context,
return ApiError.NONE;
}

private Map<String, String> normalizeCreateTopicResponseConfigs(
String topicName,
Map<String, String> creationConfigs,
boolean disklessEnabled) {
final Map<String, String> responseCreationConfigs = new HashMap<>(creationConfigs);
Map<String, Entry<OpType, String>> responseConfigOps =
topicCreateRemoteStoragePolicy.apply(topicName, null, disklessEnabled);
if (responseConfigOps != null) {
for (Entry<String, Entry<OpType, String>> entry : responseConfigOps.entrySet()) {
if (entry.getValue().getKey() == SET) {
responseCreationConfigs.put(entry.getKey(), entry.getValue().getValue());
}
}
}
return responseCreationConfigs;
}

private boolean isDisklessEnabledAtCreate(Map<String, String> creationConfigs) {
String disklessEnableConfigValue = creationConfigs.get(DISKLESS_ENABLE_CONFIG);
if (disklessEnableConfigValue == null) {
return defaultDisklessEnable;
}
return Boolean.parseBoolean(disklessEnableConfigValue);
}

private List<ApiMessageAndVersion> validConfigRecords(CreatableTopic topic, List<ApiMessageAndVersion> configRecords, boolean disklessEnabled) {
final List<ApiMessageAndVersion> validConfigRecord = new ArrayList<>();
boolean isDisklessEnableDefined = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;

import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.internals.Topic;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG;

final class TopicCreateRemoteStoragePolicy {
private final boolean enabled;
private final Optional<Pattern> localOnlyTopicPattern;

TopicCreateRemoteStoragePolicy(boolean enabled) {
this(enabled, "");
}

TopicCreateRemoteStoragePolicy(boolean enabled, String localOnlyTopicRegex) {
this.enabled = enabled;
this.localOnlyTopicPattern = compilePattern(localOnlyTopicRegex);
}

Map<String, Entry<OpType, String>> apply(
String topicName,
Map<String, Entry<OpType, String>> keyToOps,
boolean disklessEnabled
) {
if (!enabled) {
return keyToOps;
}

final boolean remoteStorageEnabled = !disklessEnabled && !isLocalOnlyTopic(topicName);
Map<String, Entry<OpType, String>> normalized = keyToOps == null ? new HashMap<>() : new HashMap<>(keyToOps);
normalized.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, new SimpleImmutableEntry<>(SET, String.valueOf(remoteStorageEnabled)));
return normalized;
}

private boolean isLocalOnlyTopic(String topicName) {
if (Topic.isInternal(topicName) || topicName.startsWith("__")) {
return true;
}
return localOnlyTopicPattern
.map(pattern -> pattern.matcher(topicName).matches())
.orElse(false);
}

private static Optional<Pattern> compilePattern(String localOnlyTopicRegex) {
if (localOnlyTopicRegex == null || localOnlyTopicRegex.isBlank()) {
return Optional.empty();
}
try {
return Optional.of(Pattern.compile(localOnlyTopicRegex));
} catch (PatternSyntaxException e) {
throw new IllegalArgumentException(
"Invalid regex in default remote storage local-only topic pattern: " + localOnlyTopicRegex,
e
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;

import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.TopicConfig;

import org.junit.jupiter.api.Test;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

class TopicCreateRemoteStoragePolicyTest {
@Test
void policyDisabledKeepsBehaviorUnchanged() {
TopicCreateRemoteStoragePolicy policy = new TopicCreateRemoteStoragePolicy(false);
assertNull(policy.apply("foo", null, false));
}

@Test
void classicTopicDefaultsToRemoteStorageEnabled() {
TopicCreateRemoteStoragePolicy policy = new TopicCreateRemoteStoragePolicy(true);
Map<String, Map.Entry<OpType, String>> result = policy.apply("foo", null, false);
assertEquals("true", result.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());
}

@Test
void classicTopicOverridesExplicitRemoteStorageFalse() {
TopicCreateRemoteStoragePolicy policy = new TopicCreateRemoteStoragePolicy(true);
Map<String, Map.Entry<OpType, String>> keyToOps = new HashMap<>();
keyToOps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, new SimpleImmutableEntry<>(SET, "false"));
Map<String, Map.Entry<OpType, String>> result = policy.apply("foo", keyToOps, false);
assertEquals("true", result.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());
}

@Test
void disklessForcesRemoteStorageDisabled() {
TopicCreateRemoteStoragePolicy policy = new TopicCreateRemoteStoragePolicy(true);
Map<String, Map.Entry<OpType, String>> result = policy.apply("foo", null, true);
assertEquals("false", result.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());
}

@Test
void internalTopicsForceRemoteStorageDisabled() {
TopicCreateRemoteStoragePolicy policy = new TopicCreateRemoteStoragePolicy(true);
assertEquals("false", policy.apply("__consumer_offsets", null, false)
.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());
assertEquals("false", policy.apply("__any_internal_topic", null, false)
.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());
assertEquals("false", policy.apply("__connect_configs-test", null, false)
.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());
}

@Test
void excludeSpecificTopics() {
TopicCreateRemoteStoragePolicy policy = new TopicCreateRemoteStoragePolicy(
true,
"_schemas|mm2-(.*)"
);
assertEquals("false", policy.apply("_schemas", null, false)
.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());

assertEquals("false", policy.apply("mm2-test", null, false)
.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());

assertEquals("true", policy.apply("mytopic", null, false)
.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).getValue());
}

@Test
void invalidCustomRegexFailsFast() {
assertThrows(
IllegalArgumentException.class,
() -> new TopicCreateRemoteStoragePolicy(true, "my-custom-(")
);
}
}
Loading