Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager|ClusterControlManager|MetadataDelta|MetaPropertiesEnsemble).java"/>
<!-- ReplicationControlManager has multiple long methods; suppression needed for diskless managed replicas logic -->
<suppress checks="MethodLength"
files="ReplicationControlManager.java"/>
<suppress checks="BooleanExpressionComplexity"
files="(MetadataImage).java"/>
<suppress checks="ImportControl"
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, InklessMetadataView, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand Down Expand Up @@ -236,6 +236,7 @@ class ControllerServer(
setDefaultNumPartitions(config.numPartitions.intValue()).
setDefaultDisklessEnable(config.logDisklessEnable).
setDisklessStorageSystemEnabled(config.disklessStorageSystemEnabled).
setDisklessManagedReplicasEnabled(config.disklessManagedReplicasEnabled).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
Expand Down Expand Up @@ -369,13 +370,10 @@ class ControllerServer(
new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)
))

// Inkless metadata view needed to filter out Diskless topics from offline/leadership metrics
val inklessMetadataView = new InklessMetadataView(metadataCache, () => config.extractLogConfigMap)
// Set up the metrics publisher.
metadataPublishers.add(new ControllerMetadataMetricsPublisher(
sharedServer.controllerServerMetrics,
sharedServer.metadataPublishingFaultHandler,
t => inklessMetadataView.isDisklessTopic(t)
sharedServer.metadataPublishingFaultHandler
))

// Set up the ACL publisher.
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
/** Diskless Configuration */
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG)
val disklessManagedReplicasEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG)

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public static class Builder {

private boolean defaultDisklessEnable = false;
private boolean disklessStorageSystemEnabled = false;
private boolean disklessManagedReplicasEnabled = false;

public Builder(int nodeId, String clusterId) {
this.nodeId = nodeId;
Expand Down Expand Up @@ -293,6 +294,11 @@ public Builder setDisklessStorageSystemEnabled(boolean disklessStorageSystemEnab
return this;
}

public Builder setDisklessManagedReplicasEnabled(boolean disklessManagedReplicasEnabled) {
this.disklessManagedReplicasEnabled = disklessManagedReplicasEnabled;
return this;
}

public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
this.replicaPlacer = replicaPlacer;
return this;
Expand Down Expand Up @@ -435,6 +441,7 @@ public QuorumController build() throws Exception {
defaultNumPartitions,
defaultDisklessEnable,
disklessStorageSystemEnabled,
disklessManagedReplicasEnabled,
replicaPlacer,
leaderImbalanceCheckIntervalNs,
maxIdleIntervalNs,
Expand Down Expand Up @@ -1481,6 +1488,7 @@ private QuorumController(
int defaultNumPartitions,
boolean defaultDisklessEnable,
boolean disklessStorageSystemEnabled,
boolean disklessManagedReplicasEnabled,
ReplicaPlacer replicaPlacer,
OptionalLong leaderImbalanceCheckIntervalNs,
OptionalLong maxIdleIntervalNs,
Expand Down Expand Up @@ -1565,6 +1573,7 @@ private QuorumController(
setDefaultNumPartitions(defaultNumPartitions).
setDefaultDisklessEnable(defaultDisklessEnable).
setDisklessStorageSystemEnabled(disklessStorageSystemEnabled).
setDisklessManagedReplicasEnabled(disklessManagedReplicasEnabled).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
Expand Down Expand Up @@ -162,6 +163,7 @@ static class Builder {
private int defaultNumPartitions = 1;
private boolean defaultDisklessEnable = false;
private boolean isDisklessStorageSystemEnabled = false;
private boolean isDisklessManagedReplicasEnabled = false;

private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
private ConfigurationControlManager configurationControl = null;
Expand Down Expand Up @@ -199,6 +201,11 @@ public Builder setDisklessStorageSystemEnabled(boolean isDisklessStorageSystemEn
return this;
}

public Builder setDisklessManagedReplicasEnabled(boolean isDisklessManagedReplicasEnabled) {
this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled;
return this;
}

Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
return this;
Expand Down Expand Up @@ -241,6 +248,7 @@ ReplicationControlManager build() {
defaultNumPartitions,
defaultDisklessEnable,
isDisklessStorageSystemEnabled,
isDisklessManagedReplicasEnabled,
maxElectionsPerImbalance,
configurationControl,
clusterControl,
Expand Down Expand Up @@ -318,8 +326,20 @@ static Map<String, String> translateCreationConfigs(CreatableTopicConfigCollecti
*/
private final boolean defaultDisklessEnable;

/**
* When true, the diskless storage system is enabled, allowing diskless topics to be created.
*/
private final boolean isDisklessStorageSystemEnabled;

/**
* When true, diskless topics use managed replicas with RF = rack_count (one replica per rack).
* When false, diskless topics use legacy RF=1 behavior.
*
* <p>Phase 1 limitation: This config only affects topic creation. Add Partitions inherits
* RF from existing partitions (correct behavior - maintains consistency within the topic).
*/
private final boolean isDisklessManagedReplicasEnabled;

/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
Expand Down Expand Up @@ -410,6 +430,7 @@ private ReplicationControlManager(
int defaultNumPartitions,
boolean defaultDisklessEnable,
boolean isDisklessStorageSystemEnabled,
boolean isDisklessManagedReplicasEnabled,
int maxElectionsPerImbalance,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
Expand All @@ -422,6 +443,7 @@ private ReplicationControlManager(
this.defaultNumPartitions = defaultNumPartitions;
this.defaultDisklessEnable = defaultDisklessEnable;
this.isDisklessStorageSystemEnabled = isDisklessStorageSystemEnabled;
this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
this.configurationControl = configurationControl;
this.createTopicPolicy = createTopicPolicy;
Expand Down Expand Up @@ -761,9 +783,13 @@ private ApiError createTopic(ControllerRequestContext context,
"when the diskless storage system is disabled. " +
"Please enable the diskless storage system to create diskless topics.");
}
// Diskless RF validation: accept -1 (auto) or 1 (backward compat) only.
// Explicit RF > 1 rejected: users shouldn't need to know rack topology.
// Note: RF=1 is accepted for API backward compatibility, but when managed replicas
// are enabled, the actual RF is always computed from rack topology (rackCardinality).
if (Math.abs(topic.replicationFactor()) != 1) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Replication factor for diskless topics must be 1 or -1 to use the default value (1).");
"Replication factor for diskless topics must be 1 or -1 (system-computed from rack topology).");
}
}

Expand All @@ -778,7 +804,7 @@ private ApiError createTopic(ControllerRequestContext context,
"A manual partition assignment was specified, but numPartitions " +
"was not set to -1.");
}
if (disklessEnabled) {
if (disklessEnabled && !isDisklessManagedReplicasEnabled) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment cannot be specified for diskless topics.");
}
Expand Down Expand Up @@ -826,12 +852,19 @@ private ApiError createTopic(ControllerRequestContext context,
} else {
int numPartitions = topic.numPartitions() == -1 ?
defaultNumPartitions : topic.numPartitions();
short replicationFactor = topic.replicationFactor() == -1 ?
defaultReplicationFactor : topic.replicationFactor();
short classicReplicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor();
// For managed diskless: always use rackCardinality() regardless of requested RF.
// RF=1 in the request is accepted for backward compat but overridden here.
// Throws BrokerNotAvailableException or InvalidReplicationFactorException on failure,
// which are caught by the caller and converted to ApiError.
short disklessReplicationFactor = disklessEnabled && isDisklessManagedReplicasEnabled ? rackCardinality() : 1;
short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor;
try {
TopicAssignment topicAssignment;
Predicate<Integer> brokerFilter;
if (!disklessEnabled) {
// Diskless managed-replicas is equivalent to classic topic assignment,
// but RF is defined by number of racks
if (!disklessEnabled || isDisklessManagedReplicasEnabled) {
topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
0,
numPartitions,
Expand Down Expand Up @@ -901,6 +934,32 @@ private ApiError createTopic(ControllerRequestContext context,
return ApiError.NONE;
}

/**
* Computes the replication factor for diskless topics based on rack topology.
* Returns the number of distinct racks in the cluster, ensuring one replica per rack.
* Brokers with no rack configured are all treated as belonging to a single logical rack,
* so if at least one broker is registered but none have a rack configured, the result is RF=1.
*
* @return the number of distinct racks as a short
* @throws BrokerNotAvailableException if no brokers are registered
* @throws InvalidReplicationFactorException if rack count exceeds Short.MAX_VALUE
*/
private short rackCardinality() {
final Collection<BrokerRegistration> brokerRegistrations = clusterControl.brokerRegistrations().values();
final long racks = brokerRegistrations.stream()
.map(BrokerRegistration::rack)
.distinct()
.count();
if (racks > Short.MAX_VALUE) {
// Unfeasible but technically possible scenario.
// Would require more than 32,768 brokers and each with a different rack
throw new InvalidReplicationFactorException("Unexpected scenario: rack cardinality is not within short range (" + racks + "). Failing topic creation.");
}
if (racks == 0)
throw new BrokerNotAvailableException("No brokers available to create diskless topic.");
return (short) racks;
}

private List<ApiMessageAndVersion> validConfigRecords(CreatableTopic topic, List<ApiMessageAndVersion> configRecords, boolean disklessEnabled) {
final List<ApiMessageAndVersion> validConfigRecord = new ArrayList<>();
boolean isDisklessEnableDefined = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"ControllerStats", "ElectionFromEligibleLeaderReplicasPerSec");
private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");
private static final MetricName DISKLESS_TOPIC_COUNT = getMetricName(
"KafkaController", "DisklessTopicCount");
private static final MetricName DISKLESS_UNMANAGED_REPLICAS_TOPIC_COUNT = getMetricName(
"KafkaController", "DisklessUnmanagedReplicasTopicCount");
private static final MetricName DISKLESS_MANAGED_REPLICAS_TOPIC_COUNT = getMetricName(
"KafkaController", "DisklessManagedReplicasTopicCount");

private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
Expand All @@ -84,6 +90,9 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();
private Optional<Meter> electionFromEligibleLeaderReplicasMeter = Optional.empty();
private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
private final AtomicInteger disklessTopicCount = new AtomicInteger(0);
private final AtomicInteger disklessUnmanagedReplicasTopicCount = new AtomicInteger(0);
private final AtomicInteger disklessManagedReplicasTopicCount = new AtomicInteger(0);

/**
* Create a new ControllerMetadataMetrics object.
Expand Down Expand Up @@ -151,6 +160,24 @@ public Integer value() {
return ignoredStaticVoters() ? 1 : 0;
}
}));
registry.ifPresent(r -> r.newGauge(DISKLESS_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return disklessTopicCount();
}
}));
registry.ifPresent(r -> r.newGauge(DISKLESS_UNMANAGED_REPLICAS_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return disklessUnmanagedReplicasTopicCount();
}
}));
registry.ifPresent(r -> r.newGauge(DISKLESS_MANAGED_REPLICAS_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return disklessManagedReplicasTopicCount();
}
}));
}

public void addBrokerRegistrationStateMetric(int brokerId) {
Expand Down Expand Up @@ -309,6 +336,42 @@ public boolean ignoredStaticVoters() {
return ignoredStaticVoters.get();
}

public void setDisklessTopicCount(int count) {
this.disklessTopicCount.set(count);
}

public void addToDisklessTopicCount(int delta) {
this.disklessTopicCount.addAndGet(delta);
}

public int disklessTopicCount() {
return this.disklessTopicCount.get();
}

public void setDisklessUnmanagedReplicasTopicCount(int count) {
this.disklessUnmanagedReplicasTopicCount.set(count);
}

public void addToDisklessUnmanagedReplicasTopicCount(int delta) {
this.disklessUnmanagedReplicasTopicCount.addAndGet(delta);
}

public int disklessUnmanagedReplicasTopicCount() {
return this.disklessUnmanagedReplicasTopicCount.get();
}

public void setDisklessManagedReplicasTopicCount(int count) {
this.disklessManagedReplicasTopicCount.set(count);
}

public void addToDisklessManagedReplicasTopicCount(int delta) {
this.disklessManagedReplicasTopicCount.addAndGet(delta);
}

public int disklessManagedReplicasTopicCount() {
return this.disklessManagedReplicasTopicCount.get();
}

@Override
public void close() {
registry.ifPresent(r -> List.of(
Expand All @@ -322,7 +385,10 @@ public void close() {
METADATA_ERROR_COUNT,
UNCLEAN_LEADER_ELECTIONS_PER_SEC,
ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC,
IGNORED_STATIC_VOTERS
IGNORED_STATIC_VOTERS,
DISKLESS_TOPIC_COUNT,
DISKLESS_UNMANAGED_REPLICAS_TOPIC_COUNT,
DISKLESS_MANAGED_REPLICAS_TOPIC_COUNT
).forEach(r::removeMetric));
for (int brokerId : brokerRegistrationStates.keySet()) {
removeBrokerRegistrationStateMetric(brokerId);
Expand Down
Loading