From 756f9384cc093bb4c8672b210e28e007d1a04e0d Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 25 Mar 2026 10:53:50 +0100 Subject: [PATCH 1/4] feat(inkless): Init Diskless Log to Control Plane --- .../scala/kafka/server/BrokerServer.scala | 3 +- .../kafka/server/InitDisklessLogManager.scala | 115 +++++- .../scala/kafka/server/ReplicaManager.scala | 55 ++- .../InklessTopicTypeSwitcherClusterTest.java | 390 ++++++++++++++++++ .../server/InitDisklessLogManagerTest.scala | 92 +++++ .../metadata/InitDisklessLogFlowTest.scala | 215 +++++++++- .../inkless/docker-compose.yml | 4 +- 7 files changed, 864 insertions(+), 10 deletions(-) create mode 100644 core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index fb59c5827f2..d70bf9f2add 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -366,9 +366,10 @@ class BrokerServer( retryTimeoutMs = 60000 ) initDisklessLogChannelManager.start() - maybeInitDisklessLogManager = sharedServer.inklessControlPlane.map { _ => + maybeInitDisklessLogManager = sharedServer.inklessControlPlane.map { controlPlane => new InitDisklessLogManager( controllerChannelManager = initDisklessLogChannelManager, + controlPlane = controlPlane, scheduler = kafkaScheduler, brokerId = config.brokerId, brokerEpochSupplier = () => lifecycleManager.brokerEpoch diff --git a/core/src/main/scala/kafka/server/InitDisklessLogManager.scala b/core/src/main/scala/kafka/server/InitDisklessLogManager.scala index d9c2ee0d8c2..645931bc3ea 100644 --- a/core/src/main/scala/kafka/server/InitDisklessLogManager.scala +++ b/core/src/main/scala/kafka/server/InitDisklessLogManager.scala @@ -16,6 +16,7 @@ */ package kafka.server +import io.aiven.inkless.control_plane.{ControlPlane, InitDisklessLogProducerState => CpProducerState, InitDisklessLogRequest => CpInitRequest} import kafka.cluster.Partition import kafka.utils.Logging import org.apache.kafka.clients.ClientResponse @@ -49,11 +50,19 @@ private[server] case class InitPartitionState( partition: Partition, topicId: Uuid, state: InitState, + metadataPayload: Option[DisklessInitMetadata] = None, retryAttempt: Int = 0 ) +private[server] case class DisklessInitMetadata( + topicName: String, + disklessStartOffset: Long, + producerStates: util.List[CpProducerState] +) + class InitDisklessLogManager( controllerChannelManager: NodeToControllerChannelManager, + controlPlane: ControlPlane, scheduler: Scheduler, brokerId: Int, brokerEpochSupplier: () => Long @@ -81,6 +90,41 @@ class InitDisklessLogManager( private[server] def getInitState(tp: TopicPartition): Option[InitState] = Option(tracked.get(tp)).map(_.state) + def onDisklessInitMetadataApplied( + partition: Partition, + topicId: Uuid, + topicName: String, + disklessStartOffset: Long, + producerStates: util.List[CpProducerState] + ): Unit = { + if (disklessStartOffset < 0) return + + val tp = partition.topicPartition + val payload = DisklessInitMetadata(topicName, disklessStartOffset, producerStates) + val newState = InitPartitionState( + partition = partition, + topicId = topicId, + state = InitState.AwaitingMetadata, + metadataPayload = Some(payload), + retryAttempt = 0 + ) + + if (tracked.putIfAbsent(tp, newState) == null) { + partition.maybeAddListener(this) + } else { + tracked.computeIfPresent(tp, (_, current) => + current.copy( + partition = partition, + topicId = topicId, + state = InitState.AwaitingMetadata, + metadataPayload = Some(payload) + )) + } + + // Metadata is already committed and visible; trigger CP init promptly. + scheduleBatchSend(0L) + } + /** * Register a sealed partition for migration. Registers this manager as a * PartitionListener to receive HW advancement notifications. If HW already @@ -196,13 +240,17 @@ class InitDisklessLogManager( * and send a single InitDisklessLog request to the controller. */ private[server] def sendBatch(): Unit = { - val ready = tracked.asScala.filter { case (_, initPartitionState) => + val readyForController = tracked.asScala.filter { case (_, initPartitionState) => initPartitionState.state == InitState.SendingToController }.toMap - if (ready.isEmpty) return + val readyForControlPlane = tracked.asScala.filter { case (_, initPartitionState) => + initPartitionState.state == InitState.AwaitingMetadata && initPartitionState.metadataPayload.isDefined + }.toMap - val validPartitions = ready.filter { case (tp, mps) => + if (readyForController.isEmpty && readyForControlPlane.isEmpty) return + + val validPartitions = readyForController.filter { case (tp, mps) => if (!mps.partition.isLeader) { info(s"Partition $tp is no longer leader, removing from migration tracking") tracked.remove(tp) @@ -216,7 +264,10 @@ class InitDisklessLogManager( } } - if (validPartitions.isEmpty) return + if (validPartitions.isEmpty) { + if (readyForControlPlane.nonEmpty) initOnControlPlane(readyForControlPlane) + return + } val topicDataMap = new util.LinkedHashMap[Uuid, util.List[InitDisklessLogRequestData.PartitionData]]() @@ -271,6 +322,8 @@ class InitDisklessLogManager( handleBatchException(partitionKeys, new RuntimeException("InitDisklessLog request timed out")) } }) + + if (readyForControlPlane.nonEmpty) initOnControlPlane(readyForControlPlane) } private def extractProducerStates( @@ -306,7 +359,7 @@ class InitDisklessLogManager( case Errors.NONE => info(s"InitDisklessLog succeeded for partition $tp, transitioning to AwaitingMetadata") tracked.computeIfPresent(tp, (_, mps) => - mps.copy(state = InitState.AwaitingMetadata, retryAttempt = 0)) + mps.copy(state = InitState.AwaitingMetadata, metadataPayload = None, retryAttempt = 0)) case Errors.FENCED_LEADER_EPOCH | Errors.INVALID_REQUEST => info(s"InitDisklessLog for partition $tp returned permanent error $error, removing from tracking") @@ -369,4 +422,56 @@ class InitDisklessLogManager( private def computeBackoff(attempt: Int): Long = { Math.min(initialRetryBackoffMs * (1L << Math.min(Math.max(attempt - 1, 0), 14)), maxRetryBackoffMs) } + + private def initOnControlPlane(readyForControlPlane: Map[TopicPartition, InitPartitionState]): Unit = { + val retriableAttempts = mutable.Map[TopicPartition, Int]() + + readyForControlPlane.foreach { case (tp, mps) => + val metadata = mps.metadataPayload.get + mps.partition.log match { + case None => + warn(s"Partition $tp has no log while applying diskless metadata, scheduling retry") + val updated = tracked.computeIfPresent(tp, (_, current) => current.copy(retryAttempt = current.retryAttempt + 1)) + if (updated != null) retriableAttempts.put(tp, updated.retryAttempt) + case Some(log) => + val request = new CpInitRequest( + mps.topicId, + metadata.topicName, + tp.partition(), + log.logStartOffset, + metadata.disklessStartOffset, + metadata.producerStates + ) + + try { + val responses = controlPlane.initDisklessLog(util.List.of(request)) + val response = Option(responses).flatMap(_.asScala.headOption) + response match { + case Some(r) if r.error() == Errors.NONE || r.error() == Errors.INVALID_REQUEST => + info(s"Control-plane InitDisklessLog completed for $tp with ${r.error()}. Removing from tracking.") + tracked.remove(tp) + case Some(r) => + warn(s"Control-plane InitDisklessLog for $tp returned retriable error ${r.error()}") + val updated = tracked.computeIfPresent(tp, (_, current) => current.copy(retryAttempt = current.retryAttempt + 1)) + if (updated != null) retriableAttempts.put(tp, updated.retryAttempt) + case None => + warn(s"Control-plane InitDisklessLog for $tp returned no response, scheduling retry") + val updated = tracked.computeIfPresent(tp, (_, current) => current.copy(retryAttempt = current.retryAttempt + 1)) + if (updated != null) retriableAttempts.put(tp, updated.retryAttempt) + } + } catch { + case t: Throwable => + warn(s"Control-plane InitDisklessLog for $tp failed with exception, scheduling retry", t) + val updated = tracked.computeIfPresent(tp, (_, current) => current.copy(retryAttempt = current.retryAttempt + 1)) + if (updated != null) retriableAttempts.put(tp, updated.retryAttempt) + } + } + } + + if (retriableAttempts.nonEmpty) { + val minBackoff = retriableAttempts.values.map(computeBackoff).min + warn(s"Scheduling control-plane InitDisklessLog retry for ${retriableAttempts.size} partition(s) in ${minBackoff}ms") + scheduleBatchSend(minBackoff) + } + } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4ae4201a384..bd9095d8cc5 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,7 +19,7 @@ package kafka.server import com.yammer.metrics.core.Meter import io.aiven.inkless.common.SharedState import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler} -import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse} +import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, InitDisklessLogProducerState} import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer} import io.aiven.inkless.merge.FileMerger import io.aiven.inkless.produce.AppendHandler @@ -2820,6 +2820,59 @@ class ReplicaManager(val config: KafkaConfig, localChanges.directoryIds.forEach(maybeUpdateTopicAssignment) } } + + notifyDisklessInitMetadataApplied(delta) + } + + private def notifyDisklessInitMetadataApplied(delta: TopicsDelta): Unit = { + initDisklessLogManager.foreach { manager => + delta.changedTopics().forEach { (topicId, topicDelta) => + val topicName = topicDelta.name() + topicDelta.partitionChanges().forEach { (partitionId, partitionRegistration) => + if (partitionRegistration.disklessStartOffset != PartitionRegistration.NO_DISKLESS_START_OFFSET && + shouldNotifyDisklessInitFromDelta(delta, topicId, partitionId, partitionRegistration)) { + val tp = new TopicPartition(topicName, partitionId) + onlinePartition(tp).foreach { partition => + val producerStates = partitionRegistration.disklessProducerStates.asScala.map { producerState => + new InitDisklessLogProducerState( + producerState.producerId(), + producerState.producerEpoch(), + producerState.baseSequence(), + producerState.lastSequence(), + producerState.assignedOffset(), + producerState.batchMaxTimestamp() + ) + }.asJava + manager.onDisklessInitMetadataApplied( + partition = partition, + topicId = topicId, + topicName = topicName, + disklessStartOffset = partitionRegistration.disklessStartOffset, + producerStates = producerStates + ) + } + } + } + } + } + } + + private def shouldNotifyDisklessInitFromDelta( + delta: TopicsDelta, + topicId: Uuid, + partitionId: Int, + registration: org.apache.kafka.metadata.PartitionRegistration + ): Boolean = { + val previousPartition = Option(delta.image().getTopic(topicId)).flatMap { topicImage => + Option(topicImage.partitions().get(partitionId)) + } + + previousPartition match { + case None => true + case Some(previous) => + previous.disklessStartOffset != registration.disklessStartOffset || + previous.disklessProducerStates != registration.disklessProducerStates + } } private def applyLocalLeadersDelta( diff --git a/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java b/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java new file mode 100644 index 00000000000..10e409fd399 --- /dev/null +++ b/core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.test.KafkaClusterTestKit; +import org.apache.kafka.common.test.TestKitNodes; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import io.aiven.inkless.config.InklessConfig; +import io.aiven.inkless.control_plane.postgres.PostgresControlPlane; +import io.aiven.inkless.control_plane.postgres.PostgresControlPlaneConfig; +import io.aiven.inkless.storage_backend.s3.S3Storage; +import io.aiven.inkless.storage_backend.s3.S3StorageConfig; +import io.aiven.inkless.test_utils.InklessPostgreSQLContainer; +import io.aiven.inkless.test_utils.MinioContainer; +import io.aiven.inkless.test_utils.PostgreSQLTestContainer; +import io.aiven.inkless.test_utils.S3TestContainer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +public class InklessTopicTypeSwitcherClusterTest { + @Container + protected static InklessPostgreSQLContainer pgContainer = PostgreSQLTestContainer.container(); + @Container + protected static MinioContainer s3Container = S3TestContainer.minio(); + + private static final Logger log = LoggerFactory.getLogger(InklessTopicTypeSwitcherClusterTest.class); + private static final int NUM_PARTITIONS = 3; + private static final int MAX_PRODUCE_RETRIES = 6; + private static final int PRODUCE_RETRY_BACKOFF_MS = 250; + private static final Duration PRODUCE_SEND_TIMEOUT = Duration.ofSeconds(5); + + private KafkaClusterTestKit cluster; + + @BeforeEach + public void setup(final TestInfo testInfo) throws Exception { + log.warn("[stage=setup] Initializing containers and 3-broker cluster"); + s3Container.createBucket(testInfo); + pgContainer.createDatabase(testInfo); + + final Map> perServerProps = Map.of( + 0, Map.of(ServerConfigs.BROKER_RACK_CONFIG, "az1"), + 1, Map.of(ServerConfigs.BROKER_RACK_CONFIG, "az2"), + 2, Map.of(ServerConfigs.BROKER_RACK_CONFIG, "az3") + ); + + final TestKitNodes nodes = new TestKitNodes.Builder() + .setCombined(true) + .setNumBrokerNodes(3) + .setNumControllerNodes(1) + .setPerServerProperties(perServerProps) + .build(); + + cluster = new KafkaClusterTestKit.Builder(nodes) + .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") + .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "true") + .setConfigProp(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, "true") + .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, "true") + .setConfigProp(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "3") + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.USERNAME_CONFIG, PostgreSQLTestContainer.USERNAME) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.PASSWORD_CONFIG, PostgreSQLTestContainer.PASSWORD) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_BACKEND_CLASS_CONFIG, S3Storage.class.getName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_BUCKET_NAME_CONFIG, s3Container.getBucketName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_REGION_CONFIG, s3Container.getRegion()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_ENDPOINT_URL_CONFIG, s3Container.getEndpoint()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true") + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) + .build(); + + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + log.warn("[stage=setup] Cluster is ready. bootstrapServers={}", cluster.bootstrapServers()); + } + + @AfterEach + public void teardown() throws Exception { + log.warn("[stage=teardown] Closing cluster"); + cluster.close(); + } + + @Test + public void migrateClassicTopicToDiskless() throws Exception { + final String topicSuffix = UUID.randomUUID().toString().substring(0, 8); + final String disklessTopic = "diskless-" + topicSuffix; + final String classicTopic = "classic-" + topicSuffix; + final String classicToDisklessTopic = "classic-to-diskless-" + topicSuffix; + final List topics = List.of(disklessTopic, classicTopic, classicToDisklessTopic); + final Map producedCounts = new HashMap<>(); + producedCounts.put(disklessTopic, 0); + producedCounts.put(classicTopic, 0); + producedCounts.put(classicToDisklessTopic, 0); + + final Map adminConfigs = baseClientConfigs(); + final Map producerConfigs = producerConfigs(); + log.warn("[stage=test-start] topics: diskless={}, classic={}, migrating={}", disklessTopic, classicTopic, classicToDisklessTopic); + + // Given: a 3-broker cluster and 3 topics + try (Admin admin = AdminClient.create(adminConfigs)) { + final NewTopic diskless = new NewTopic(disklessTopic, NUM_PARTITIONS, (short) -1) + .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); + final NewTopic classic = new NewTopic(classicTopic, NUM_PARTITIONS, (short) 3) + .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")); + final NewTopic classicToDiskless = new NewTopic(classicToDisklessTopic, NUM_PARTITIONS, (short) 3) + .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")); + + final CreateTopicsResult createTopics = admin.createTopics(List.of(diskless, classic, classicToDiskless)); + createTopics.all().get(30, TimeUnit.SECONDS); + log.warn("[stage=topics-created] Created all topics"); + + assertEquals("true", getTopicConfig(admin, disklessTopic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + assertEquals("false", getTopicConfig(admin, classicTopic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + assertEquals("false", getTopicConfig(admin, classicToDisklessTopic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + + // Start producers before starting the migration + final AtomicBoolean keepProducing = new AtomicBoolean(true); + final AtomicReference producerFailure = new AtomicReference<>(null); + final CountDownLatch producerStarted = new CountDownLatch(1); + + final Thread producerThread = new Thread(() -> { + try (Producer producer = new KafkaProducer<>(producerConfigs)) { + log.warn("[stage=pre-migration-produce] Producing baseline records to all topics"); + produceRounds(producer, topics, producedCounts, 12, true); + producerStarted.countDown(); + + log.warn("[stage=during-migration-produce] Continuing produces during migration"); + while (keepProducing.get()) { + produceRounds(producer, topics, producedCounts, 1, false); + } + } catch (Throwable t) { + producerFailure.set(t); + producerStarted.countDown(); + } + }, "migration-producer-thread"); + + producerThread.start(); + assertTrue(producerStarted.await(30, TimeUnit.SECONDS), "Producer thread did not finish baseline production in time"); + + try { + // Start migration + log.warn("[stage=migration-start] Enabling diskless for topic={}", classicToDisklessTopic); + alterTopicConfig(admin, classicToDisklessTopic, Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); + + log.warn("[stage=await-migration] Waiting for diskless.enable=true on topic={}", classicToDisklessTopic); + waitForTopicDisklessValue(admin, classicToDisklessTopic, "true"); + } finally { + keepProducing.set(false); + producerThread.join(TimeUnit.SECONDS.toMillis(60)); + } + + assertTrue(!producerThread.isAlive(), "Producer thread did not stop in time"); + if (producerFailure.get() != null) { + throw new RuntimeException("Producer thread failed", producerFailure.get()); + } + + assertEquals("true", getTopicConfig(admin, disklessTopic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + assertEquals("false", getTopicConfig(admin, classicTopic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + assertEquals("true", getTopicConfig(admin, classicToDisklessTopic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + log.warn("[stage=post-migration-validated] Topic configurations and placement checks passed"); + } + + // Then: all records produced across all 3 topics are consumable. + log.warn("[stage=consume-verify] Produced counts before consume verification={}", producedCounts); + consumeAndAssertTopicCounts(baseConsumerConfigs(), topics, producedCounts); + } + + private Map baseClientConfigs() { + final Map configs = new HashMap<>(); + configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + return configs; + } + + private Map producerConfigs() { + final Map configs = baseClientConfigs(); + configs.put(ProducerConfig.ACKS_CONFIG, "all"); + configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + configs.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)); + configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + return configs; + } + + private Map baseConsumerConfigs() { + final Map configs = baseClientConfigs(); + configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, "it-group-" + UUID.randomUUID()); + return configs; + } + + private void produceRounds(final Producer producer, + final List topics, + final Map producedCounts, + final int rounds, + final boolean failOnExhaustedRetries) throws Exception { + log.warn("[stage=produce-rounds] rounds={}, failOnExhaustedRetries={}, topics={}", + rounds, failOnExhaustedRetries, topics); + for (int i = 0; i < rounds; i++) { + for (String topic : topics) { + final int partition = i % NUM_PARTITIONS; + final byte[] value = (topic + "-value-" + i).getBytes(StandardCharsets.UTF_8); + final boolean sent = sendWithRetry(producer, new ProducerRecord<>(topic, partition, null, value)); + if (!sent && failOnExhaustedRetries) { + throw new RuntimeException("Exhausted producer retries for topic " + topic); + } + if (sent) { + producedCounts.compute(topic, (k, v) -> v == null ? 1 : v + 1); + } + } + } + producer.flush(); + log.warn("[stage=produce-rounds] completed rounds={}, currentProducedCounts={}", rounds, producedCounts); + } + + private boolean sendWithRetry(final Producer producer, + final ProducerRecord record) throws Exception { + Exception latest = null; + for (int attempt = 1; attempt <= MAX_PRODUCE_RETRIES; attempt++) { + try { + producer.send(record).get(PRODUCE_SEND_TIMEOUT.toSeconds(), TimeUnit.SECONDS); + return true; + } catch (Exception e) { + latest = e; + log.warn("Producer send failed for topic={}, partition={}, attempt={}/{}", + record.topic(), record.partition(), attempt, MAX_PRODUCE_RETRIES, e); + Thread.sleep(PRODUCE_RETRY_BACKOFF_MS); + } + } + log.warn("Exhausted producer retries for topic={}, partition={}", record.topic(), record.partition(), latest); + return false; + } + + private void consumeAndAssertTopicCounts(final Map consumerConfigs, + final List topics, + final Map expectedCounts) { + final Map consumedCounts = new HashMap<>(); + topics.forEach(topic -> consumedCounts.put(topic, 0)); + log.warn("[stage=consume-start] topics={}, expectedCounts={}", topics, expectedCounts); + + try (Consumer consumer = new KafkaConsumer<>(consumerConfigs)) { + consumer.subscribe(topics); + final long deadline = System.currentTimeMillis() + Duration.ofSeconds(90).toMillis(); + + while (System.currentTimeMillis() < deadline) { + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + consumedCounts.compute(record.topic(), (k, v) -> v == null ? 1 : v + 1); + } + + if (hasReachedExpected(consumedCounts, expectedCounts, topics)) { + break; + } + } + } + log.warn("[stage=consume-end] consumedCounts={}", consumedCounts); + + for (String topic : topics) { + final int expected = expectedCounts.getOrDefault(topic, 0); + final int actual = consumedCounts.getOrDefault(topic, 0); + if (expected > 0) { + assertTrue(actual > 0, + "Expected to consume at least one record from topic " + + topic + " after producing to it"); + } + } + } + + private boolean hasReachedExpected(final Map consumedCounts, + final Map expectedCounts, + final List topics) { + for (String topic : topics) { + if (consumedCounts.getOrDefault(topic, 0) < expectedCounts.getOrDefault(topic, 0)) { + return false; + } + } + return true; + } + + private void waitForTopicDisklessValue(final Admin admin, + final String topic, + final String expectedValue) throws Exception { + final long deadline = System.currentTimeMillis() + Duration.ofSeconds(60).toMillis(); + while (System.currentTimeMillis() < deadline) { + final String disklessValue = getTopicConfig(admin, topic).get(TopicConfig.DISKLESS_ENABLE_CONFIG); + if (expectedValue.equals(disklessValue)) { + return; + } + Thread.sleep(500); + } + assertEquals(expectedValue, getTopicConfig(admin, topic).get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + } + + private void alterTopicConfig(final Admin admin, + final String topic, + final Map newConfigs) throws Exception { + final ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + final Collection operations = newConfigs.entrySet().stream() + .map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET)) + .toList(); + admin.incrementalAlterConfigs(Map.of(topicResource, operations)).all().get(20, TimeUnit.SECONDS); + } + + private Map getTopicConfig(final Admin admin, final String topic) throws Exception { + int maxRetries = 5; + long retryDelayMs = 1000; + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + final ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + final var configsResult = admin.describeConfigs(Collections.singletonList(topicResource)); + final var allConfigs = configsResult.all().get(10, TimeUnit.SECONDS); + final Map topicConfigs = new HashMap<>(); + allConfigs.get(topicResource).entries().forEach(entry -> topicConfigs.put(entry.name(), entry.value())); + return topicConfigs; + } catch (ExecutionException e) { + if (attempt == maxRetries) { + throw e; + } + Thread.sleep(retryDelayMs); + } + } + throw new IllegalStateException("Exited retry loop unexpectedly."); + } + +} diff --git a/core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala b/core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala index 292d4d2dce1..8b632c8306a 100644 --- a/core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala @@ -16,6 +16,7 @@ */ package kafka.server +import io.aiven.inkless.control_plane.{ControlPlane, InitDisklessLogProducerState => CpProducerState, InitDisklessLogResponse => CpInitResponse} import kafka.cluster.Partition import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.{TopicPartition, Uuid} @@ -45,6 +46,7 @@ class InitDisklessLogManagerTest { private val tp0 = new TopicPartition("test-topic", 0) private var channelManager: MockInitDisklessLogChannelManager = _ + private var controlPlane: ControlPlane = _ private var mockTime: MockTime = _ private var scheduler: MockScheduler = _ private var manager: InitDisklessLogManager = _ @@ -52,10 +54,12 @@ class InitDisklessLogManagerTest { @BeforeEach def setUp(): Unit = { channelManager = new MockInitDisklessLogChannelManager() + controlPlane = mock(classOf[ControlPlane]) mockTime = new MockTime() scheduler = new MockScheduler(mockTime) manager = new InitDisklessLogManager( controllerChannelManager = channelManager, + controlPlane = controlPlane, scheduler = scheduler, brokerId = brokerId, brokerEpochSupplier = () => brokerEpoch @@ -813,6 +817,94 @@ class InitDisklessLogManagerTest { )) )) } + + @Test + def testMetadataAppliedCallsControlPlaneAndRemovesTracking(): Unit = { + val partition = mockPartition(hw = 100, leo = 100) + when(controlPlane.initDisklessLog(any())).thenReturn(util.List.of(CpInitResponse.success())) + + manager.onDisklessInitMetadataApplied( + partition = partition, + topicId = topicId, + topicName = tp0.topic(), + disklessStartOffset = 100L, + producerStates = util.List.of(new CpProducerState(1L, 0.toShort, 0, 1, 100L, 1000L)) + ) + + scheduler.tick() + + verify(controlPlane).initDisklessLog(any()) + assertTrue(manager.getTrackedPartitions.isEmpty) + } + + @Test + def testMetadataAppliedAlreadyInitializedIsTerminalSuccess(): Unit = { + val partition = mockPartition(hw = 100, leo = 100) + when(controlPlane.initDisklessLog(any())).thenReturn(util.List.of(CpInitResponse.alreadyInitialized())) + + manager.onDisklessInitMetadataApplied( + partition = partition, + topicId = topicId, + topicName = tp0.topic(), + disklessStartOffset = 100L, + producerStates = util.List.of() + ) + + scheduler.tick() + + verify(controlPlane).initDisklessLog(any()) + assertTrue(manager.getTrackedPartitions.isEmpty) + } + + @Test + def testMetadataAppliedRetriableErrorSchedulesRetry(): Unit = { + val partition = mockPartition(hw = 100, leo = 100) + when(controlPlane.initDisklessLog(any())) + .thenReturn(util.List.of(new CpInitResponse(Errors.NOT_CONTROLLER))) + .thenReturn(util.List.of(CpInitResponse.success())) + + manager.onDisklessInitMetadataApplied( + partition = partition, + topicId = topicId, + topicName = tp0.topic(), + disklessStartOffset = 100L, + producerStates = util.List.of() + ) + + scheduler.tick() + assertEquals(Some(InitState.AwaitingMetadata), manager.getInitState(tp0)) + verify(controlPlane, times(1)).initDisklessLog(any()) + + fireRetry() + verify(controlPlane, times(2)).initDisklessLog(any()) + assertTrue(manager.getTrackedPartitions.isEmpty) + } + + @Test + def testMetadataAppliedRepeatedCallbackIsIdempotent(): Unit = { + val partition = mockPartition(hw = 100, leo = 100) + when(controlPlane.initDisklessLog(any())).thenReturn(util.List.of(CpInitResponse.success())) + + manager.onDisklessInitMetadataApplied( + partition = partition, + topicId = topicId, + topicName = tp0.topic(), + disklessStartOffset = 100L, + producerStates = util.List.of() + ) + manager.onDisklessInitMetadataApplied( + partition = partition, + topicId = topicId, + topicName = tp0.topic(), + disklessStartOffset = 100L, + producerStates = util.List.of() + ) + + scheduler.tick() + + verify(controlPlane, atLeastOnce()).initDisklessLog(any()) + assertTrue(manager.getTrackedPartitions.isEmpty) + } } /** diff --git a/core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala b/core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala index d3fa26fc0a7..fe7e392e34b 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala @@ -17,6 +17,7 @@ package kafka.server.metadata +import io.aiven.inkless.control_plane.{ControlPlane, InitDisklessLogResponse => CpInitResponse} import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.LogManager import kafka.server.QuotaFactory.QuotaManagers @@ -28,6 +29,7 @@ import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.metadata.{ConfigRecord, PartitionChangeRecord, PartitionRecord, TopicRecord} import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsDelta, TopicsImage} import org.apache.kafka.image.loader.LogDeltaManifest +import org.apache.kafka.metadata.InitDisklessLogFields import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher} import org.apache.kafka.raft.LeaderAndEpoch import org.apache.kafka.server.common.MetadataVersion @@ -37,7 +39,7 @@ import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers.{any, anyString} -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.{mock, times, verify, when} import java.util import scala.jdk.CollectionConverters._ @@ -51,11 +53,82 @@ class InitDisklessLogFlowTest { replicaManager: ReplicaManager, initDisklessLogManager: InitDisklessLogManager, metadataPublisher: BrokerMetadataPublisher, + controlPlane: ControlPlane, channelManager: MockInitDisklessLogChannelManager, time: MockTime, scheduler: MockScheduler ) + @Test + def testEndToEndFlowFromSealingToControlPlaneInit(): Unit = { + val ctx = newContext() + val topicName = "integration-e2e-seal-to-control-plane-init" + val topicId = Uuid.randomUuid() + val tp = new TopicPartition(topicName, 0) + + when(ctx.replicaManager.inklessMetadataView().isDisklessTopic(anyString())).thenReturn(false) + + try { + val createDelta = new TopicsDelta(TopicsImage.EMPTY) + createDelta.replay(new TopicRecord().setName(topicName).setTopicId(topicId)) + createDelta.replay(new PartitionRecord() + .setTopicId(topicId).setPartitionId(0).setReplicas(util.Arrays.asList(0, 1)) + .setIsr(util.Arrays.asList(0, 1)).setLeader(ctx.config.brokerId) + .setLeaderEpoch(0).setPartitionEpoch(0)) + val createImage = imageFromTopics(createDelta.apply()) + ctx.replicaManager.applyDelta(createDelta, createImage) + + val partition = ctx.replicaManager.getPartitionOrException(tp) + assertFalse(partition.isSealed) + + // Step 1: enable diskless and trigger sealing + controller init request path. + ctx.metadataPublisher._firstPublish = false + when(ctx.replicaManager.inklessMetadataView().isDisklessTopic(topicName)).thenReturn(true) + val enableDisklessDelta = new MetadataDelta(createImage) + enableDisklessDelta.replay(new ConfigRecord() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName(topicName) + .setName(TopicConfig.DISKLESS_ENABLE_CONFIG) + .setValue("true")) + val disklessImage = withClusterBrokers(enableDisklessDelta.apply(MetadataProvenance.EMPTY)) + ctx.metadataPublisher.onMetadataUpdate(enableDisklessDelta, disklessImage, metadataManifest()) + + ctx.time.sleep(ctx.initDisklessLogManager.lingerMs) + ctx.scheduler.tick() + assertTrue(partition.isSealed) + assertEquals(1, ctx.channelManager.requests.size()) + + // Simulate successful controller response. + ctx.channelManager.requests.poll().complete(new org.apache.kafka.common.message.InitDisklessLogResponseData().setTopics(util.List.of( + new org.apache.kafka.common.message.InitDisklessLogResponseData.TopicResponse() + .setTopicId(topicId) + .setPartitions(util.List.of( + new org.apache.kafka.common.message.InitDisklessLogResponseData.PartitionResponse() + .setPartitionId(0) + .setErrorCode(org.apache.kafka.common.protocol.Errors.NONE.code()) + )) + ))) + + // Step 2: apply committed PartitionChangeRecord with diskless fields. + val pcrDelta = new MetadataDelta(disklessImage) + val pcr = new PartitionChangeRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setIsr(util.Arrays.asList(0, 1)) + pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeDisklessStartOffset(100L)) + pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeProducerStates(util.List.of())) + pcrDelta.replay(pcr) + val pcrImage = withClusterBrokers(pcrDelta.apply(MetadataProvenance.EMPTY)) + ctx.metadataPublisher.onMetadataUpdate(pcrDelta, pcrImage, metadataManifest()) + ctx.scheduler.tick() + + // Final check: metadata-triggered control-plane init executed. + verify(ctx.controlPlane, times(1)).initDisklessLog(any()) + } finally { + shutdown(ctx) + } + } + @Test def testOnMetadataUpdateSealsAndRegistersExistingClassicLeader(): Unit = { // Given a classic topic where this broker is already the leader. @@ -477,6 +550,141 @@ class InitDisklessLogFlowTest { } } + @Test + def testOnMetadataUpdatePartitionChangeRecordWithDisklessFieldsTriggersControlPlaneInit(): Unit = { + val ctx = newContext() + val topicName = "integration-diskless-pcr-triggers-control-plane" + val topicId = Uuid.randomUuid() + val tp = new TopicPartition(topicName, 0) + + when(ctx.replicaManager.inklessMetadataView().isDisklessTopic(anyString())).thenReturn(false) + + try { + val createDelta = new TopicsDelta(TopicsImage.EMPTY) + createDelta.replay(new TopicRecord().setName(topicName).setTopicId(topicId)) + createDelta.replay(new PartitionRecord() + .setTopicId(topicId).setPartitionId(0).setReplicas(util.Arrays.asList(0, 1)) + .setIsr(util.Arrays.asList(0, 1)).setLeader(ctx.config.brokerId) + .setLeaderEpoch(0).setPartitionEpoch(0)) + val createImage = imageFromTopics(createDelta.apply()) + ctx.replicaManager.applyDelta(createDelta, createImage) + + ctx.metadataPublisher._firstPublish = false + when(ctx.replicaManager.inklessMetadataView().isDisklessTopic(topicName)).thenReturn(true) + val disklessDelta = new MetadataDelta(createImage) + disklessDelta.replay(new ConfigRecord() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName(topicName) + .setName(TopicConfig.DISKLESS_ENABLE_CONFIG) + .setValue("true")) + val disklessImage = withClusterBrokers(disklessDelta.apply(MetadataProvenance.EMPTY)) + ctx.metadataPublisher.onMetadataUpdate(disklessDelta, disklessImage, metadataManifest()) + + ctx.time.sleep(ctx.initDisklessLogManager.lingerMs) + ctx.scheduler.tick() + assertEquals(1, ctx.channelManager.requests.size()) + ctx.channelManager.requests.poll().complete(new org.apache.kafka.common.message.InitDisklessLogResponseData().setTopics(util.List.of( + new org.apache.kafka.common.message.InitDisklessLogResponseData.TopicResponse() + .setTopicId(topicId) + .setPartitions(util.List.of( + new org.apache.kafka.common.message.InitDisklessLogResponseData.PartitionResponse() + .setPartitionId(0) + .setErrorCode(org.apache.kafka.common.protocol.Errors.NONE.code()) + )) + ))) + assertEquals(Some(InitState.AwaitingMetadata), ctx.initDisklessLogManager.getInitState(tp)) + + val pcrDelta = new MetadataDelta(disklessImage) + val pcr = new PartitionChangeRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setIsr(util.Arrays.asList(0, 1)) + pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeDisklessStartOffset(100L)) + pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeProducerStates(util.List.of())) + pcrDelta.replay(pcr) + val pcrImage = withClusterBrokers(pcrDelta.apply(MetadataProvenance.EMPTY)) + ctx.metadataPublisher.onMetadataUpdate(pcrDelta, pcrImage, metadataManifest()) + + ctx.scheduler.tick() + + verify(ctx.controlPlane, times(1)).initDisklessLog(any()) + assertTrackedStates(ctx, Map.empty) + assertEquals(None, ctx.initDisklessLogManager.getInitState(tp)) + } finally { + shutdown(ctx) + } + } + + @Test + def testOnMetadataUpdateFollowerCanAlsoInvokeControlPlaneAfterCommittedMetadata(): Unit = { + val broker0Ctx = newContext(brokerId = 0) + val broker1Ctx = newContext(brokerId = 1) + val topicName = "integration-follower-can-init-control-plane" + val topicId = Uuid.randomUuid() + + when(broker0Ctx.replicaManager.inklessMetadataView().isDisklessTopic(anyString())).thenReturn(false) + when(broker1Ctx.replicaManager.inklessMetadataView().isDisklessTopic(anyString())).thenReturn(false) + + try { + val createDelta = new TopicsDelta(TopicsImage.EMPTY) + createDelta.replay(new TopicRecord().setName(topicName).setTopicId(topicId)) + createDelta.replay(new PartitionRecord() + .setTopicId(topicId).setPartitionId(0).setReplicas(util.Arrays.asList(0, 1)) + .setIsr(util.Arrays.asList(0, 1)).setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0)) + val createImage = imageFromTopics(createDelta.apply()) + broker0Ctx.replicaManager.applyDelta(createDelta, createImage) + broker1Ctx.replicaManager.applyDelta(createDelta, createImage) + + broker0Ctx.metadataPublisher._firstPublish = false + broker1Ctx.metadataPublisher._firstPublish = false + when(broker0Ctx.replicaManager.inklessMetadataView().isDisklessTopic(topicName)).thenReturn(true) + when(broker1Ctx.replicaManager.inklessMetadataView().isDisklessTopic(topicName)).thenReturn(true) + val disklessDelta = new MetadataDelta(createImage) + disklessDelta.replay(new ConfigRecord() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName(topicName) + .setName(TopicConfig.DISKLESS_ENABLE_CONFIG) + .setValue("true")) + val disklessImage = withClusterBrokers(disklessDelta.apply(MetadataProvenance.EMPTY)) + broker0Ctx.metadataPublisher.onMetadataUpdate(disklessDelta, disklessImage, metadataManifest()) + broker1Ctx.metadataPublisher.onMetadataUpdate(disklessDelta, disklessImage, metadataManifest()) + + broker0Ctx.time.sleep(broker0Ctx.initDisklessLogManager.lingerMs) + broker0Ctx.scheduler.tick() + assertEquals(1, broker0Ctx.channelManager.requests.size()) + broker0Ctx.channelManager.requests.poll().complete(new org.apache.kafka.common.message.InitDisklessLogResponseData().setTopics(util.List.of( + new org.apache.kafka.common.message.InitDisklessLogResponseData.TopicResponse() + .setTopicId(topicId) + .setPartitions(util.List.of( + new org.apache.kafka.common.message.InitDisklessLogResponseData.PartitionResponse() + .setPartitionId(0) + .setErrorCode(org.apache.kafka.common.protocol.Errors.NONE.code()) + )) + ))) + + val pcrDelta = new MetadataDelta(disklessImage) + val pcr = new PartitionChangeRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setIsr(util.Arrays.asList(0, 1)) + pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeDisklessStartOffset(100L)) + pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeProducerStates(util.List.of())) + pcrDelta.replay(pcr) + val pcrImage = withClusterBrokers(pcrDelta.apply(MetadataProvenance.EMPTY)) + broker0Ctx.metadataPublisher.onMetadataUpdate(pcrDelta, pcrImage, metadataManifest()) + broker1Ctx.metadataPublisher.onMetadataUpdate(pcrDelta, pcrImage, metadataManifest()) + + broker0Ctx.scheduler.tick() + broker1Ctx.scheduler.tick() + + verify(broker0Ctx.controlPlane, times(1)).initDisklessLog(any()) + verify(broker1Ctx.controlPlane, times(1)).initDisklessLog(any()) + } finally { + shutdown(broker0Ctx) + shutdown(broker1Ctx) + } + } + private def newContext(brokerId: Int = 0): TestContext = { val config = kafka.server.KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId)) val metadataCache = mock(classOf[KRaftMetadataCache]) @@ -488,8 +696,11 @@ class InitDisklessLogFlowTest { val time = new MockTime() val scheduler = new MockScheduler(time) val channelManager = new MockInitDisklessLogChannelManager() + val controlPlane = mock(classOf[ControlPlane]) + when(controlPlane.initDisklessLog(any())).thenReturn(util.List.of(CpInitResponse.success())) val initDisklessLogManager = new InitDisklessLogManager( controllerChannelManager = channelManager, + controlPlane = controlPlane, scheduler = scheduler, brokerId = config.brokerId, brokerEpochSupplier = () => 1L @@ -531,7 +742,7 @@ class InitDisklessLogFlowTest { faultHandler ) - TestContext(config, metadataCache, logManager, replicaManager, initDisklessLogManager, metadataPublisher, channelManager, time, scheduler) + TestContext(config, metadataCache, logManager, replicaManager, initDisklessLogManager, metadataPublisher, controlPlane, channelManager, time, scheduler) } private def shutdown(ctx: TestContext): Unit = { diff --git a/docker/examples/docker-compose-files/inkless/docker-compose.yml b/docker/examples/docker-compose-files/inkless/docker-compose.yml index 32edb613668..feaa7449d14 100644 --- a/docker/examples/docker-compose-files/inkless/docker-compose.yml +++ b/docker/examples/docker-compose-files/inkless/docker-compose.yml @@ -26,8 +26,10 @@ services: # Inkless KAFKA_DISKLESS_STORAGE_SYSTEM_ENABLE: "true" - KAFKA_LOG_DISKLESS_ENABLE: "true" +# KAFKA_LOG_DISKLESS_ENABLE: "true" KAFKA_INKLESS_CONSUME_CACHE_MAX_COUNT: "100" + KAFKA_DISKLESS_ALLOW_FROM_CLASSIC_ENABLE: "true" + KAFKA_DISKLESS_MANAGED_RF_ENABLE: "true" ## Control Plane ### Postgresql From 810e98874d36d03f354e20f043d53162db8dc029 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Tue, 31 Mar 2026 16:54:40 +0200 Subject: [PATCH 2/4] wip --- .../kafka/server/InitDisklessLogManager.scala | 19 ++++++++++++++++--- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../server/InitDisklessLogManagerTest.scala | 10 +++++----- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/InitDisklessLogManager.scala b/core/src/main/scala/kafka/server/InitDisklessLogManager.scala index 645931bc3ea..1d696431ad1 100644 --- a/core/src/main/scala/kafka/server/InitDisklessLogManager.scala +++ b/core/src/main/scala/kafka/server/InitDisklessLogManager.scala @@ -90,14 +90,28 @@ class InitDisklessLogManager( private[server] def getInitState(tp: TopicPartition): Option[InitState] = Option(tracked.get(tp)).map(_.state) - def onDisklessInitMetadataApplied( + /** + * Handles already-applied diskless init metadata for a partition. + * This moves/keeps the partition in AwaitingMetadata and triggers a prompt + * control-plane init send, since metadata is committed and visible. + * + * @param partition Partition instance for the topic-partition to initialize. + * @param topicId Topic ID currently assigned to the partition. + * @param topicName Topic name used in the control-plane init payload. + * @param disklessStartOffset Start offset for diskless initialization; negative means absent. + * @param producerStates Producer state snapshots to seed the destination replica state. + */ + def initOnControlPlane( partition: Partition, topicId: Uuid, topicName: String, disklessStartOffset: Long, producerStates: util.List[CpProducerState] ): Unit = { - if (disklessStartOffset < 0) return + if (disklessStartOffset < 0) { + warn(s"Received a negative disklessStartOffset ($disklessStartOffset) from the Controller for $topicName:$partition, skipping init on Diskless Coordinator.") + return + } val tp = partition.topicPartition val payload = DisklessInitMetadata(topicName, disklessStartOffset, producerStates) @@ -106,7 +120,6 @@ class InitDisklessLogManager( topicId = topicId, state = InitState.AwaitingMetadata, metadataPayload = Some(payload), - retryAttempt = 0 ) if (tracked.putIfAbsent(tp, newState) == null) { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index bd9095d8cc5..4b4eaf977ab 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2843,7 +2843,7 @@ class ReplicaManager(val config: KafkaConfig, producerState.batchMaxTimestamp() ) }.asJava - manager.onDisklessInitMetadataApplied( + manager.initOnControlPlane( partition = partition, topicId = topicId, topicName = topicName, diff --git a/core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala b/core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala index 8b632c8306a..62182b70c99 100644 --- a/core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala @@ -823,7 +823,7 @@ class InitDisklessLogManagerTest { val partition = mockPartition(hw = 100, leo = 100) when(controlPlane.initDisklessLog(any())).thenReturn(util.List.of(CpInitResponse.success())) - manager.onDisklessInitMetadataApplied( + manager.initOnControlPlane( partition = partition, topicId = topicId, topicName = tp0.topic(), @@ -842,7 +842,7 @@ class InitDisklessLogManagerTest { val partition = mockPartition(hw = 100, leo = 100) when(controlPlane.initDisklessLog(any())).thenReturn(util.List.of(CpInitResponse.alreadyInitialized())) - manager.onDisklessInitMetadataApplied( + manager.initOnControlPlane( partition = partition, topicId = topicId, topicName = tp0.topic(), @@ -863,7 +863,7 @@ class InitDisklessLogManagerTest { .thenReturn(util.List.of(new CpInitResponse(Errors.NOT_CONTROLLER))) .thenReturn(util.List.of(CpInitResponse.success())) - manager.onDisklessInitMetadataApplied( + manager.initOnControlPlane( partition = partition, topicId = topicId, topicName = tp0.topic(), @@ -885,14 +885,14 @@ class InitDisklessLogManagerTest { val partition = mockPartition(hw = 100, leo = 100) when(controlPlane.initDisklessLog(any())).thenReturn(util.List.of(CpInitResponse.success())) - manager.onDisklessInitMetadataApplied( + manager.initOnControlPlane( partition = partition, topicId = topicId, topicName = tp0.topic(), disklessStartOffset = 100L, producerStates = util.List.of() ) - manager.onDisklessInitMetadataApplied( + manager.initOnControlPlane( partition = partition, topicId = topicId, topicName = tp0.topic(), From 98f9e1c977e301d26cfd9d9bb272d78c6ee14714 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 1 Apr 2026 14:41:21 +0200 Subject: [PATCH 3/4] wip --- .../scala/kafka/server/ReplicaManager.scala | 87 +++++++++++-------- .../inkless/docker-compose.yml | 4 +- 2 files changed, 50 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4b4eaf977ab..f02f8c0e76b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2829,27 +2829,56 @@ class ReplicaManager(val config: KafkaConfig, delta.changedTopics().forEach { (topicId, topicDelta) => val topicName = topicDelta.name() topicDelta.partitionChanges().forEach { (partitionId, partitionRegistration) => - if (partitionRegistration.disklessStartOffset != PartitionRegistration.NO_DISKLESS_START_OFFSET && - shouldNotifyDisklessInitFromDelta(delta, topicId, partitionId, partitionRegistration)) { - val tp = new TopicPartition(topicName, partitionId) - onlinePartition(tp).foreach { partition => - val producerStates = partitionRegistration.disklessProducerStates.asScala.map { producerState => - new InitDisklessLogProducerState( - producerState.producerId(), - producerState.producerEpoch(), - producerState.baseSequence(), - producerState.lastSequence(), - producerState.assignedOffset(), - producerState.batchMaxTimestamp() + // Notify only on the specific transition from "not diskless-initialized" + // to "diskless-initialized" in metadata. + val previousPartition = Option(delta.image().getTopic(topicId)).flatMap { topicImage => + Option(topicImage.partitions().get(partitionId)) + } + val shouldNotifyDisklessInit = previousPartition match { + case None => false + case Some(previous) => + previous.disklessStartOffset == PartitionRegistration.NO_DISKLESS_START_OFFSET && + partitionRegistration.disklessStartOffset >= 0 + } + val tp = new TopicPartition(topicName, partitionId) + if (shouldNotifyDisklessInit) { + // Send init only for partitions that currently have a local Partition instance. + onlinePartition(tp) match { + case Some(partition) => + val producerStates = partitionRegistration.disklessProducerStates.asScala.map { producerState => + new InitDisklessLogProducerState( + producerState.producerId(), + producerState.producerEpoch(), + producerState.baseSequence(), + producerState.lastSequence(), + producerState.assignedOffset(), + producerState.batchMaxTimestamp() + ) + }.asJava + manager.initOnControlPlane( + partition = partition, + topicId = topicId, + topicName = topicName, + disklessStartOffset = partitionRegistration.disklessStartOffset, + producerStates = producerStates + ) + case None => + stateChangeLogger.info( + s"Skipping diskless init on control plane for $tp because the partition is not online locally." + ) + } + } else { + previousPartition match { + case None => + stateChangeLogger.info( + s"Skipping diskless init on control plane for $tp because no previous partition registration was found." + ) + case Some(previous) => + stateChangeLogger.info( + s"Skipping diskless init on control plane for $tp because transition did not match " + + s"${PartitionRegistration.NO_DISKLESS_START_OFFSET} -> >=0 " + + s"(previous=${previous.disklessStartOffset}, current=${partitionRegistration.disklessStartOffset})." ) - }.asJava - manager.initOnControlPlane( - partition = partition, - topicId = topicId, - topicName = topicName, - disklessStartOffset = partitionRegistration.disklessStartOffset, - producerStates = producerStates - ) } } } @@ -2857,24 +2886,6 @@ class ReplicaManager(val config: KafkaConfig, } } - private def shouldNotifyDisklessInitFromDelta( - delta: TopicsDelta, - topicId: Uuid, - partitionId: Int, - registration: org.apache.kafka.metadata.PartitionRegistration - ): Boolean = { - val previousPartition = Option(delta.image().getTopic(topicId)).flatMap { topicImage => - Option(topicImage.partitions().get(partitionId)) - } - - previousPartition match { - case None => true - case Some(previous) => - previous.disklessStartOffset != registration.disklessStartOffset || - previous.disklessProducerStates != registration.disklessProducerStates - } - } - private def applyLocalLeadersDelta( changedPartitions: mutable.Set[Partition], delta: TopicsDelta, diff --git a/docker/examples/docker-compose-files/inkless/docker-compose.yml b/docker/examples/docker-compose-files/inkless/docker-compose.yml index feaa7449d14..32edb613668 100644 --- a/docker/examples/docker-compose-files/inkless/docker-compose.yml +++ b/docker/examples/docker-compose-files/inkless/docker-compose.yml @@ -26,10 +26,8 @@ services: # Inkless KAFKA_DISKLESS_STORAGE_SYSTEM_ENABLE: "true" -# KAFKA_LOG_DISKLESS_ENABLE: "true" + KAFKA_LOG_DISKLESS_ENABLE: "true" KAFKA_INKLESS_CONSUME_CACHE_MAX_COUNT: "100" - KAFKA_DISKLESS_ALLOW_FROM_CLASSIC_ENABLE: "true" - KAFKA_DISKLESS_MANAGED_RF_ENABLE: "true" ## Control Plane ### Postgresql From cda0a6e1de67b01be9f5469ba4aa2039246ecac0 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 1 Apr 2026 14:51:13 +0200 Subject: [PATCH 4/4] wip --- core/src/main/scala/kafka/server/ReplicaManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f02f8c0e76b..650d3be22bd 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2821,10 +2821,10 @@ class ReplicaManager(val config: KafkaConfig, } } - notifyDisklessInitMetadataApplied(delta) + initDisklessLogOnControlPlane(delta) } - private def notifyDisklessInitMetadataApplied(delta: TopicsDelta): Unit = { + private def initDisklessLogOnControlPlane(delta: TopicsDelta): Unit = { initDisklessLogManager.foreach { manager => delta.changedTopics().forEach { (topicId, topicDelta) => val topicName = topicDelta.name() @@ -2834,14 +2834,14 @@ class ReplicaManager(val config: KafkaConfig, val previousPartition = Option(delta.image().getTopic(topicId)).flatMap { topicImage => Option(topicImage.partitions().get(partitionId)) } - val shouldNotifyDisklessInit = previousPartition match { + val shouldInitDisklessLogOnControlPlane = previousPartition match { case None => false case Some(previous) => previous.disklessStartOffset == PartitionRegistration.NO_DISKLESS_START_OFFSET && partitionRegistration.disklessStartOffset >= 0 } val tp = new TopicPartition(topicName, partitionId) - if (shouldNotifyDisklessInit) { + if (shouldInitDisklessLogOnControlPlane) { // Send init only for partitions that currently have a local Partition instance. onlinePartition(tp) match { case Some(partition) =>