From 387912f3e5ab2c7529e35bf30dcb241bc10609b1 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 20 May 2026 16:54:00 +0200 Subject: [PATCH 1/5] perf(inkless): add `Partition.seal` fast path when already sealed `seal()` is invoked from multiple paths (sealTopicPartitions, applyLocalLeadersDelta and applyLocalFollowersDelta) and is intentionally idempotent, but it always grabbed `leaderIsrUpdateLock` in write mode -- contending with the read lock taken on every append. Read the @volatile `_sealed` flag first and only take the lock on the first transition. --- .../main/scala/kafka/cluster/Partition.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c987e07628..7f1c38269d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -265,14 +265,21 @@ class Partition(val topicPartition: TopicPartition, * Seal this Partition so that no more records can be appended locally. * @throws KafkaStorageException If transaction abortion fails due to an I/O error. */ - def seal(): Unit = inWriteLock(leaderIsrUpdateLock) { - if (!_sealed) { - val leaderLog = localLogOrException - if (leaderLog.producerStateManager().firstUndecidedOffset().isPresent) { - abortOngoingTransactions(leaderLog) + def seal(): Unit = { + // Fast path: `_sealed` is @volatile, so a relaxed read is sufficient to avoid taking + // the write lock on idempotent calls (sealTopicPartitions, applyLocalLeadersDelta and + // applyLocalFollowersDelta all call seal() and the write lock contends with + // appendRecordsToLeader / appendRecordsToFollowerOrFutureReplica). + if (_sealed) return + inWriteLock(leaderIsrUpdateLock) { + if (!_sealed) { + val leaderLog = localLogOrException + if (leaderLog.producerStateManager().firstUndecidedOffset().isPresent) { + abortOngoingTransactions(leaderLog) + } + _sealed = true + stateChangeLogger.info(s"Sealed partition $topicPartition for diskless switch with LEO ${leaderLog.logEndOffset}") } - _sealed = true - stateChangeLogger.info(s"Sealed partition $topicPartition for diskless switch with LEO ${leaderLog.logEndOffset}") } } From 285b3beb6a0cd58453e9f002a4f43b3ad3868567 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 20 May 2026 17:02:44 +0200 Subject: [PATCH 2/5] perf(inkless): avoid allocations in switch offset lookup Rewrite `getClassicToDisklessStartOffset` to use nullable lookups instead of wrapping each step in `Option`, since this method is used on fetch and offset-listing paths. --- .../kafka/server/metadata/InklessMetadataView.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala b/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala index ef754b6b1c..c22726c244 100644 --- a/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala +++ b/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala @@ -88,10 +88,14 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf } def getClassicToDisklessStartOffset(topicPartition: TopicPartition): Long = { - Option(metadataCache.currentImage().topics().getTopic(topicPartition.topic())) - .flatMap(topicImage => Option(topicImage.partitions().get(topicPartition.partition()))) - .map(_.classicToDisklessStartOffset) - .getOrElse(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) + // Hot path: called for every fetch request, every replica fetch response and every + // ListOffsets request. Avoid Option/Some allocations by going through nullable + // intermediates and inlining the lookup chain. + val topic = metadataCache.currentImage().topics().getTopic(topicPartition.topic()) + if (topic == null) return PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET + val partReg = topic.partitions().get(topicPartition.partition()) + if (partReg == null) PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET + else partReg.classicToDisklessStartOffset } override def getTopicConfig(topicName: String): LogConfig = topicConfigs.computeIfAbsent(topicName, t => { From f94c98c537e029dbaf28fec648831ae3875f060e Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 20 May 2026 17:05:16 +0200 Subject: [PATCH 3/5] perf(inkless): reuse diskless topic checks in replica transitions Cache the diskless-topic decision in leader and follower transition loops and derive consolidating-diskless status from it. This avoids repeated topic config lookups while applying metadata deltas. --- .../scala/kafka/server/ReplicaManager.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 02441d79e5..369b004f58 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2976,9 +2976,9 @@ class ReplicaManager(val config: KafkaConfig, val consolidatingDisklessPartitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition] localLeaders.foreachEntry { (tp, info) => val isDiskless = _inklessMetadataView.isDisklessTopic(tp.topic()) - val isConsolidatingDisklessTopic = + val isConsolidatingDisklessTopic = isDiskless && config.disklessRemoteStorageConsolidationEnabled && - _inklessMetadataView.isConsolidatingDisklessTopic(tp.topic) + _inklessMetadataView.isRemoteStorageEnabled(tp.topic) val existingPartition = onlinePartition(tp) if (!isDiskless || isConsolidatingDisklessTopic) { getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => @@ -3109,10 +3109,11 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean] val followerTopicSet = new mutable.HashSet[String] localFollowers.foreachEntry { (tp, info) => - val isConsolidatingDisklessTopic = + val isDiskless = _inklessMetadataView.isDisklessTopic(tp.topic()) + val isConsolidatingDisklessTopic = isDiskless && config.disklessRemoteStorageConsolidationEnabled && - _inklessMetadataView.isConsolidatingDisklessTopic(tp.topic) - if (_inklessMetadataView.isDisklessTopic(tp.topic())) { + _inklessMetadataView.isRemoteStorageEnabled(tp.topic) + if (isDiskless) { // Clean up classic-to-diskless switch tracking since only the leader drives classic-to-diskless switch. initDisklessLogManager.foreach(_.removePartition(tp)) val seal = _inklessMetadataView.getClassicToDisklessStartOffset(tp) @@ -3158,7 +3159,7 @@ class ReplicaManager(val config: KafkaConfig, } } } - if (!_inklessMetadataView.isDisklessTopic(tp.topic()) || isConsolidatingDisklessTopic) { + if (!isDiskless || isConsolidatingDisklessTopic) { getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { followerTopicSet.add(tp.topic) @@ -3188,7 +3189,7 @@ class ReplicaManager(val config: KafkaConfig, case e: KafkaStorageException => stateChangeLogger.error(s"Unable to start fetching $tp " + s"with topic ID ${info.topicId} due to a storage error ${e.getMessage}", e) - if (_inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)) + if (isConsolidatingDisklessTopic) consolidationFetcherManager.foreach(_.addFailedPartition(tp)) else replicaFetcherManager.addFailedPartition(tp) @@ -3201,7 +3202,7 @@ class ReplicaManager(val config: KafkaConfig, case e: Throwable => stateChangeLogger.error(s"Unable to start fetching $tp " + s"with topic ID ${info.topicId} due to ${e.getClass.getSimpleName}", e) - if (_inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)) + if (isConsolidatingDisklessTopic) consolidationFetcherManager.foreach(_.addFailedPartition(tp)) else replicaFetcherManager.addFailedPartition(tp) From 33df7a9af29b8ae626fe135430bf23109cc87a0a Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 20 May 2026 17:07:54 +0200 Subject: [PATCH 4/5] perf(inkless): check committed seal before local log lookup In the diskless follower transition path, test `seal >= 0` before calling `logManager.getLog`. Fully switched partitions can then skip the local-log map lookup entirely. --- core/src/main/scala/kafka/server/ReplicaManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 369b004f58..71087564bf 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -3128,7 +3128,7 @@ class ReplicaManager(val config: KafkaConfig, // progress without a committed seal (seal == -2), a missing local log means // there is no classic data to expose on this broker, so we leave the partition // unmanaged here. - if (!isConsolidatingDisklessTopic && (logManager.getLog(tp).isDefined || seal >= 0)) { + if (!isConsolidatingDisklessTopic && (seal >= 0 || logManager.getLog(tp).isDefined)) { getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) From 24b8f009fc098581b0bbda23dda7b69024422e71 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 20 May 2026 17:12:11 +0200 Subject: [PATCH 5/5] perf(inkless): avoid allocating listener on duplicate init registration When a partition is already tracked, re-drive the existing state before creating a new WaitingForReplication listener. This keeps duplicate registrations cheap while preserving the existing re-evaluation behavior. --- .../kafka/server/InitDisklessLogManager.scala | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/server/InitDisklessLogManager.scala b/core/src/main/scala/kafka/server/InitDisklessLogManager.scala index 282dcde052..9034442adf 100644 --- a/core/src/main/scala/kafka/server/InitDisklessLogManager.scala +++ b/core/src/main/scala/kafka/server/InitDisklessLogManager.scala @@ -116,6 +116,28 @@ class InitDisklessLogManager( * send. Otherwise, waits for HW advancement notifications. */ def registerPartition(partition: Partition, topicId: Uuid): Unit = { + val tp = partition.topicPartition + + def redriveTrackedState(): Unit = { + tracked.computeIfPresent(tp, (_, currentState) => currentState match { + case waitingForReplication: WaitingForReplication => + handleWaitingOutcome(tp, waitingForReplication.maybeAdvanceState()) + case sendingToController: SendingToController => + enqueueSendingToController(tp, sendingToController) + sendingToController + case awaitingMetadata: AwaitingMetadata => awaitingMetadata + case done: Done => done + case _: Failed => null + }) + refreshMetrics(tp) + } + + if (tracked.get(tp) != null) { + redriveTrackedState() + info(s"Partition $tp already tracked in state ${Option(tracked.get(tp))}") + return + } + val waitingState = WaitingForReplication(partition, topicId, onPartitionUpdate = (tp, outcome) => { tracked.computeIfPresent(tp, (_, currentState) => currentState match { case _: WaitingForReplication => handleWaitingOutcome(tp, outcome) @@ -124,23 +146,11 @@ class InitDisklessLogManager( refreshMetrics(tp) }) - val tp = partition.topicPartition val inserted = tracked.putIfAbsent(tp, waitingState) == null - // Evaluate immediately for both new and already tracked entries: - // - new entries may already be ready - // - duplicate registrations can re-drive a waiting or queued state - tracked.computeIfPresent(tp, (_, currentState) => currentState match { - case waitingForReplication: WaitingForReplication => - handleWaitingOutcome(tp, waitingForReplication.maybeAdvanceState()) - case sendingToController: SendingToController => - enqueueSendingToController(tp, sendingToController) - sendingToController - case awaitingMetadata: AwaitingMetadata => awaitingMetadata - case done: Done => done - case _: Failed => null - }) - refreshMetrics(tp) + // New entries may already be ready. If another caller inserted the same partition + // concurrently, this also re-drives the existing state without replacing its listener. + redriveTrackedState() if (inserted) { Option(tracked.get(tp)).foreach { @@ -155,8 +165,10 @@ class InitDisklessLogManager( refreshMetrics(tp) case _ => } + info(s"Registered new partition $tp in state ${Option(tracked.get(tp))}") + } else { + info(s"Partition $tp already tracked in state ${Option(tracked.get(tp))}") } - info(s"Registered new partition $tp in state ${Option(tracked.get(tp))}") } private def handleWaitingOutcome(tp: TopicPartition, outcome: WaitingForReplicationOutcome): InitDisklessLogState = {