Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 155 additions & 13 deletions core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,6 @@ void validatesUpdateCases() throws Exception {
}
}

private Optional<String> incrementalAlterTopicConfig(Admin admin, String topic, Map<String, String> newConfigs) {
var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
var operations = newConfigs.entrySet().stream()
.map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET))
.toList();
try {
admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(10, TimeUnit.SECONDS);
return Optional.empty();
} catch (Exception e) {
String message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
return Optional.ofNullable(message);
}
}
}

private KafkaClusterTestKit initCluster() throws Exception {
Expand Down Expand Up @@ -293,6 +280,161 @@ private void createTopicAndAssertEffective(Admin admin,
assertEquals(expectedRemoteStorage, topicConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));
}

/**
* Integration tests for remote storage consolidation — validates end-to-end behavior
* when consolidation is enabled.
*
* <p>Topic type transition matrix (consolidation enabled):
* <pre>
* # | From → To | Condition | Result | Covered by
* ---+------------+-----------+-------------------------------------------------------------+----------------------------+---------------------------------------------
* 1 | (none) → DISKLESS | diskless.enable=true, no remote.storage.enable in request | VALID (auto-enabled) | testConsolidatedTransitionsWithAllowFromClassic
* 2 | (none) → DISKLESS | diskless.enable=true, remote.storage.enable=true | VALID | testConsolidatedTransitionsWithAllowFromClassic
* 3 | (none) → DISKLESS | diskless.enable=true, remote.storage.enable=false | REJECTED | testConsolidatedTransitionsWithAllowFromClassic
* 4 | CLASSIC → DISKLESS | allow-from-classic=true | VALID (switch) | testConsolidatedTransitionsWithAllowFromClassic
* 5 | CLASSIC → DISKLESS | allow-from-classic=false | REJECTED | testConsolidatedTransitionsWithoutAllowFromClassic
* 6 | TIERED → DISKLESS | allow-from-classic=true | VALID (switch) | testConsolidatedTransitionsWithAllowFromClassic
* 7 | TIERED → DISKLESS | allow-from-classic=false | REJECTED | testConsolidatedTransitionsWithoutAllowFromClassic
* 8 | DISKLESS → forbidden | remote.storage.enable=false | REJECTED (mutual exclusion)| testConsolidatedTransitionsWithAllowFromClassic
* 9 | DISKLESS → TIERED | diskless.enable=false | REJECTED (irreversible) | testConsolidatedTransitionsWithAllowFromClassic
* </pre>
*/
@Nested
class ConsolidatedDisklessTopics {
@Test
void testConsolidatedTransitionsWithAllowFromClassic() throws Exception {
var cluster = initConsolidatedCluster(true);
try (Admin admin = AdminClient.create(Map.of(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) {
// Scenario 1: Create diskless without explicit remote.storage.enable — controller auto-enables
createTopicAndAssertEffective(admin, "diskless-auto-rs", Map.of(
DISKLESS_ENABLE_CONFIG, "true"), "true", "true");

// Scenario 2: Create diskless with explicit remote.storage.enable=true
createTopicAndAssertEffective(admin, "diskless-rs-true", Map.of(
DISKLESS_ENABLE_CONFIG, "true",
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "true", "true");

// Scenario 3: Create diskless with remote.storage.enable=false — rejected
Optional<String> error = createTopic(admin, "diskless-rs-false", Map.of(
DISKLESS_ENABLE_CONFIG, "true",
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"));
assertTrue(error.isPresent(), "Should reject diskless with remote.storage.enable=false");

// Scenario 4: CLASSIC → DISKLESS switch
createTopicAndAssertEffective(admin, "classic-to-diskless", Map.of(), "false", "false");
assertTrue(incrementalAlterTopicConfig(admin, "classic-to-diskless", Map.of(
DISKLESS_ENABLE_CONFIG, "true",
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")).isEmpty(),
"CLASSIC→DISKLESS switch should succeed with allow-from-classic + consolidation");
var classicConfig = getTopicConfig(admin, "classic-to-diskless");
assertEquals("true", classicConfig.get(DISKLESS_ENABLE_CONFIG));
assertEquals("true", classicConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));

// Scenario 6: TIERED → DISKLESS switch
createTopicAndAssertEffective(admin, "tiered-to-diskless",
Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "false", "true");
assertTrue(incrementalAlterTopicConfig(admin, "tiered-to-diskless", Map.of(
DISKLESS_ENABLE_CONFIG, "true",
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")).isEmpty(),
"TIERED→DISKLESS switch should succeed with allow-from-classic");
var tieredConfig = getTopicConfig(admin, "tiered-to-diskless");
assertEquals("true", tieredConfig.get(DISKLESS_ENABLE_CONFIG));
assertEquals("true", tieredConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));

// Scenario 8: DISKLESS cannot disable remote storage
createTopicAndAssertEffective(admin, "diskless-no-disable-rs", Map.of(
DISKLESS_ENABLE_CONFIG, "true"), "true", "true");
Optional<String> disableRsError = incrementalAlterTopicConfig(admin, "diskless-no-disable-rs", Map.of(
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"));
assertTrue(disableRsError.isPresent(), "Should not allow disabling remote storage on diskless topic");
assertEquals(DISKLESS_REMOTE_SET_ERROR, disableRsError.get());

// Scenario 9: DISKLESS cannot be disabled
Optional<String> disableDisklessError = incrementalAlterTopicConfig(admin, "diskless-no-disable-rs", Map.of(
DISKLESS_ENABLE_CONFIG, "false"));
assertTrue(disableDisklessError.isPresent(), "Should not allow disabling diskless");
assertEquals(DISABLE_DISKLESS_ERROR, disableDisklessError.get());
} finally {
cluster.close();
}
}

@Test
void testConsolidatedTransitionsWithoutAllowFromClassic() throws Exception {
var cluster = initConsolidatedCluster(false);
try (Admin admin = AdminClient.create(Map.of(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) {
// Scenario 5: CLASSIC → DISKLESS blocked without allow-from-classic
createTopicAndAssertEffective(admin, "classic-blocked", Map.of(), "false", "false");
Optional<String> classicError = incrementalAlterTopicConfig(admin, "classic-blocked", Map.of(
DISKLESS_ENABLE_CONFIG, "true",
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"));
assertTrue(classicError.isPresent(), "CLASSIC→DISKLESS should be blocked without allow-from-classic");
assertEquals(ENABLE_DISKLESS_ERROR, classicError.get());

// Scenario 7: TIERED → DISKLESS blocked without allow-from-classic
createTopicAndAssertEffective(admin, "tiered-blocked",
Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), "false", "true");
Optional<String> tieredError = incrementalAlterTopicConfig(admin, "tiered-blocked", Map.of(
DISKLESS_ENABLE_CONFIG, "true",
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"));
assertTrue(tieredError.isPresent(), "TIERED→DISKLESS should be blocked without allow-from-classic");
assertEquals(ENABLE_DISKLESS_ERROR, tieredError.get());
} finally {
cluster.close();
}
}

private KafkaClusterTestKit initConsolidatedCluster(boolean allowFromClassic) throws Exception {
final TestKitNodes nodes = new TestKitNodes.Builder()
.setCombined(true)
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.build();
var cluster = new KafkaClusterTestKit.Builder(nodes)
.setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
.setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, "false")
.setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "true")
.setConfigProp(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, "true")
.setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(allowFromClassic))
.setConfigProp(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, "true")
.setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
.setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
.setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.USERNAME_CONFIG, PostgreSQLTestContainer.USERNAME)
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.PASSWORD_CONFIG, PostgreSQLTestContainer.PASSWORD)
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_BACKEND_CLASS_CONFIG, S3Storage.class.getName())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_BUCKET_NAME_CONFIG, s3Container.getBucketName())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_REGION_CONFIG, s3Container.getRegion())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_ENDPOINT_URL_CONFIG, s3Container.getEndpoint())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true")
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey())
.build();
cluster.format();
cluster.startup();
cluster.waitForReadyBrokers();
return cluster;
}
}

private Optional<String> incrementalAlterTopicConfig(Admin admin, String topic, Map<String, String> newConfigs) {
var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
var operations = newConfigs.entrySet().stream()
.map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET))
.toList();
try {
admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(10, TimeUnit.SECONDS);
return Optional.empty();
} catch (Exception e) {
String message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
return Optional.ofNullable(message);
}
}

private Map<String, String> getTopicConfig(Admin admin, String topic)
throws ExecutionException, InterruptedException, TimeoutException {
int maxRetries = 3;
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListVa
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

/**
* Topic type transition matrix (consolidation enabled):
*
* Topic types:
* CLASSIC (diskless.enable=false, remote.storage.enable=false) — local-only storage
* TIERED (diskless.enable=false, remote.storage.enable=true) — tiered storage
* DISKLESS (diskless.enable=true, remote.storage.enable=true) — diskless with remote storage
* Forbidden state: diskless.enable=true, remote.storage.enable=false
*
* Creation:
* diskless.enable=true, no remote.storage.enable → VALID (controller auto-enables)
* diskless.enable=true, remote.storage.enable=true → VALID
* diskless.enable=true, remote.storage.enable=false → REJECTED (mutual exclusion)
*
* Alter (allow-from-classic + consolidation):
* CLASSIC → DISKLESS (diskless.enable=true, remote.storage.enable=true) → VALID (switch)
* CLASSIC → DISKLESS (diskless.enable=true, no remote.storage.enable) → VALID (controller auto-enables)
* CLASSIC (remote.storage.enable=false) → DISKLESS (diskless.enable=true) → REJECTED (mutual exclusion)
* TIERED → DISKLESS (diskless.enable=true, remote.storage.enable=true) → VALID (switch)
* DISKLESS → set remote.storage.enable=false → REJECTED (mutual exclusion)
* DISKLESS → set diskless.enable=false → REJECTED (unsupported)
*
* See also: DisklessAndRemoteStorageConfigsTest (integration-level equivalent)
*/
class LogConfigTest {

@Test
Expand Down
Loading