Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ class LogConfigTest {
val noExisting: util.Map[String, String] = util.Map.of()
val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless switch or consolidation."

// Allowed to set diskless.enable=true at creation
// Allowed: diskless.enable=true without explicit remote.storage.enable — controller will auto-enable
assertValid(noExisting, topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true"), kafkaConfig,
remoteStorageConsolidationEnabled = true)

Expand All @@ -560,7 +560,7 @@ class LogConfigTest {
kafkaConfig,
remoteStorageConsolidationEnabled = true)

// NOT allowed to set diskless.enable=true and remote.storage.enable=false at creation
// NOT allowed to set diskless.enable=true and remote.storage.enable=false at creation (mutual exclusion fires first)
assertInvalid(noExisting, topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false"),
Expand Down Expand Up @@ -677,8 +677,10 @@ class LogConfigTest {
// Case 1: set diskless.enable=true with allowFromClassic=true
val setDisklessTrue = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true")

// With consolidation, enabling diskless without explicit remote.storage.enable is allowed (controller will auto-enable)
assertValid(existingWithoutDisklessOrRemote, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true)
assertValid(existingWithDisklessFalse, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true)
// Already diskless — no-op, not rejected (legacy state allowed for existing topics)
assertValid(existingWithDisklessTrue, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true)
// Mutual exclusion still applies when existing remote.storage.enable=false
assertInvalid(existingWithRemoteFalse, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true)
Expand All @@ -690,6 +692,10 @@ class LogConfigTest {
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"
)
assertValid(existingWithRemoteTrue, setDisklessTrueWithExistingRemoteTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true)
// CLASSIC→DISKLESS direct switch: both diskless.enable=true and remote.storage.enable=true on a topic with neither config
assertValid(existingWithoutDisklessOrRemote, setDisklessTrueWithExistingRemoteTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true)
// Same switch rejected without consolidation gate
assertInvalid(existingWithoutDisklessOrRemote, setDisklessTrueWithExistingRemoteTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = false)

// Case 2: set diskless.enable=false with allowFromClassic=true - disabling diskless is still forbidden
val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,24 @@ public boolean wasRemoteStorageEnabled() {
}

public boolean isSwitchedFromClassicWithRemoteStorage() {
return isDisklessAllowFromClassicEnabled
&& isDisklessEnabled()
&& wasRemoteStorageExplicitlySet() && wasRemoteStorageEnabled()
&& requestedRemoteStorageEnabled();
// Allows both diskless and remote-storage to be set when:
// - The allow-from-classic flag is on, AND
// - Diskless is being enabled, AND
// - Remote-Storage was already enabled (TIERED→DISKLESS switch), OR
// Remote-Storage is being enabled in the same request
// AND consolidation is on (CLASSIC→DISKLESS direct switch)
// If remote.storage.enable doesn't resolve to true, no valid switch is happening:
// either mutual exclusion won't fire (remote.storage.enable absent) or the request
// is invalid (remote.storage.enable=false).
if (!isDisklessAllowFromClassicEnabled || !isDisklessEnabled() || !requestedRemoteStorageEnabled()) {
return false;
}
// TIERED→DISKLESS: Remote-Storage was already explicitly set and enabled
if (wasRemoteStorageExplicitlySet() && wasRemoteStorageEnabled()) {
return true;
}
// CLASSIC→DISKLESS (single request): Remote-Storage is being newly enabled, requires consolidation gate
return isRemoteStorageConsolidationEnabled && isRemoteStorageBecomesEnabled();
Comment thread
jeqo marked this conversation as resolved.
}

/** Both overrides were already present and remain off; used to skip mutual exclusion without consolidation. */
Expand Down Expand Up @@ -649,10 +663,19 @@ private static void validateDiskless(Map<String, String> existingConfigs,
// Exception 4: both keys were already present and remain explicitly false (no-op alter); allowed even
// when cluster consolidation is off, so routine config updates do not trip mutual exclusion.
final boolean isBothExplicitlyDisabledSteadyState = logConfigHelper.isBothExplicitlyDisabledSteadyStateUpdate();
if (!isSwitchedFromClassicWithRemoteStorage && !isDisklessConsolidationOnCreation && !isValidConsolidationModeTransitionOnUpdate
&& !isBothExplicitlyDisabledSteadyState) {
if (!isSwitchedFromClassicWithRemoteStorage &&
!isDisklessConsolidationOnCreation &&
!isValidConsolidationModeTransitionOnUpdate &&
!isBothExplicitlyDisabledSteadyState) {
validateDisklessAndRemoteStorageMutualExclusion(logConfigHelper);
}

// When consolidation is enabled, enforce that diskless topics must have remote storage.
// This replaces mutual exclusion with a stricter invariant: diskless.enable=true requires
// remote.storage.enable=true.
if (isRemoteStorageConsolidationEnabled) {
validateDisklessRequiresRemoteStorage(logConfigHelper);
}
}

private static void validateDisklessTransition(LogConfigHelper logConfigHelper,
Expand All @@ -679,6 +702,29 @@ private static void validateDisklessAndRemoteStorageMutualExclusion(LogConfigHel
}
}

private static void validateDisklessRequiresRemoteStorage(LogConfigHelper logConfigHelper) {
// Diskless topics must have remote storage enabled.
// Only reject when remote.storage.enable is explicitly set to false.
// If remote.storage.enable was never set (implicit default), allow —
// the controller will auto-enable remote storage via config record.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this happen? Could not find anything

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's coming as a follow-up: #619

if (!logConfigHelper.isDisklessEnabled() || logConfigHelper.isRemoteStorageEnabled()) {
return;
}
// Since we returned above if remote storage is enabled, explicit-set here implies set-to-false.
boolean isRemoteStorageExplicitlySetToFalse = logConfigHelper.wasRemoteStorageExplicitlySet()
|| logConfigHelper.isRemoteStorageExplicitlySet();
if (!isRemoteStorageExplicitlySetToFalse) {
return;
}
boolean isDisklessBeingEnabled = logConfigHelper.isCreation()
|| (logConfigHelper.isDisklessExplicitlySet() && !logConfigHelper.wasDisklessEnabled());
boolean isRemoteStorageBeingDisabled = logConfigHelper.isRemoteStorageExplicitlySet() && !logConfigHelper.isRemoteStorageEnabled();
Comment thread
jeqo marked this conversation as resolved.
// Reject either transition that independently creates diskless + remote.storage.enable=false
if (isDisklessBeingEnabled || isRemoteStorageBeingDisabled) {
throw new InvalidConfigurationException(
"Diskless topics must have remote storage enabled. Set remote.storage.enable=true when enabling diskless.");
}
}

/**
* Validates the values of the given properties. Should be called only by the broker.
Expand Down
Loading