Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,21 @@ class Partition(val topicPartition: TopicPartition,
* Seal this Partition so that no more records can be appended locally.
* @throws KafkaStorageException If transaction abortion fails due to an I/O error.
*/
def seal(): Unit = inWriteLock(leaderIsrUpdateLock) {
if (!_sealed) {
val leaderLog = localLogOrException
if (leaderLog.producerStateManager().firstUndecidedOffset().isPresent) {
abortOngoingTransactions(leaderLog)
def seal(): Unit = {
// Fast path: `_sealed` is @volatile, so a relaxed read is sufficient to avoid taking
// the write lock on idempotent calls (sealTopicPartitions, applyLocalLeadersDelta and
// applyLocalFollowersDelta all call seal() and the write lock contends with
// appendRecordsToLeader / appendRecordsToFollowerOrFutureReplica).
if (_sealed) return
inWriteLock(leaderIsrUpdateLock) {
if (!_sealed) {
val leaderLog = localLogOrException
if (leaderLog.producerStateManager().firstUndecidedOffset().isPresent) {
abortOngoingTransactions(leaderLog)
}
_sealed = true
stateChangeLogger.info(s"Sealed partition $topicPartition for diskless switch with LEO ${leaderLog.logEndOffset}")
}
_sealed = true
stateChangeLogger.info(s"Sealed partition $topicPartition for diskless switch with LEO ${leaderLog.logEndOffset}")
}
}

Expand Down
44 changes: 28 additions & 16 deletions core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ class InitDisklessLogManager(
* send. Otherwise, waits for HW advancement notifications.
*/
def registerPartition(partition: Partition, topicId: Uuid): Unit = {
val tp = partition.topicPartition

def redriveTrackedState(): Unit = {
tracked.computeIfPresent(tp, (_, currentState) => currentState match {
case waitingForReplication: WaitingForReplication =>
handleWaitingOutcome(tp, waitingForReplication.maybeAdvanceState())
case sendingToController: SendingToController =>
enqueueSendingToController(tp, sendingToController)
sendingToController
case awaitingMetadata: AwaitingMetadata => awaitingMetadata
case done: Done => done
case _: Failed => null
})
refreshMetrics(tp)
}

if (tracked.get(tp) != null) {
redriveTrackedState()
info(s"Partition $tp already tracked in state ${Option(tracked.get(tp))}")
return
}

val waitingState = WaitingForReplication(partition, topicId, onPartitionUpdate = (tp, outcome) => {
tracked.computeIfPresent(tp, (_, currentState) => currentState match {
case _: WaitingForReplication => handleWaitingOutcome(tp, outcome)
Expand All @@ -124,23 +146,11 @@ class InitDisklessLogManager(
refreshMetrics(tp)
})

val tp = partition.topicPartition
val inserted = tracked.putIfAbsent(tp, waitingState) == null

// Evaluate immediately for both new and already tracked entries:
// - new entries may already be ready
// - duplicate registrations can re-drive a waiting or queued state
tracked.computeIfPresent(tp, (_, currentState) => currentState match {
case waitingForReplication: WaitingForReplication =>
handleWaitingOutcome(tp, waitingForReplication.maybeAdvanceState())
case sendingToController: SendingToController =>
enqueueSendingToController(tp, sendingToController)
sendingToController
case awaitingMetadata: AwaitingMetadata => awaitingMetadata
case done: Done => done
case _: Failed => null
})
refreshMetrics(tp)
// New entries may already be ready. If another caller inserted the same partition
// concurrently, this also re-drives the existing state without replacing its listener.
redriveTrackedState()

if (inserted) {
Option(tracked.get(tp)).foreach {
Expand All @@ -155,8 +165,10 @@ class InitDisklessLogManager(
refreshMetrics(tp)
case _ =>
}
info(s"Registered new partition $tp in state ${Option(tracked.get(tp))}")
} else {
info(s"Partition $tp already tracked in state ${Option(tracked.get(tp))}")
}
info(s"Registered new partition $tp in state ${Option(tracked.get(tp))}")
}

private def handleWaitingOutcome(tp: TopicPartition, outcome: WaitingForReplicationOutcome): InitDisklessLogState = {
Expand Down
19 changes: 10 additions & 9 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2976,9 +2976,9 @@ class ReplicaManager(val config: KafkaConfig,
val consolidatingDisklessPartitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
localLeaders.foreachEntry { (tp, info) =>
val isDiskless = _inklessMetadataView.isDisklessTopic(tp.topic())
val isConsolidatingDisklessTopic =
val isConsolidatingDisklessTopic = isDiskless &&
config.disklessRemoteStorageConsolidationEnabled &&
_inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)
_inklessMetadataView.isRemoteStorageEnabled(tp.topic)
val existingPartition = onlinePartition(tp)
if (!isDiskless || isConsolidatingDisklessTopic) {
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
Expand Down Expand Up @@ -3109,10 +3109,11 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
val followerTopicSet = new mutable.HashSet[String]
localFollowers.foreachEntry { (tp, info) =>
val isConsolidatingDisklessTopic =
val isDiskless = _inklessMetadataView.isDisklessTopic(tp.topic())
val isConsolidatingDisklessTopic = isDiskless &&
config.disklessRemoteStorageConsolidationEnabled &&
_inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)
if (_inklessMetadataView.isDisklessTopic(tp.topic())) {
_inklessMetadataView.isRemoteStorageEnabled(tp.topic)
if (isDiskless) {
// Clean up classic-to-diskless switch tracking since only the leader drives classic-to-diskless switch.
initDisklessLogManager.foreach(_.removePartition(tp))
val seal = _inklessMetadataView.getClassicToDisklessStartOffset(tp)
Expand All @@ -3127,7 +3128,7 @@ class ReplicaManager(val config: KafkaConfig,
// progress without a committed seal (seal == -2), a missing local log means
// there is no classic data to expose on this broker, so we leave the partition
// unmanaged here.
if (!isConsolidatingDisklessTopic && (logManager.getLog(tp).isDefined || seal >= 0)) {
if (!isConsolidatingDisklessTopic && (seal >= 0 || logManager.getLog(tp).isDefined)) {
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
try {
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
Expand Down Expand Up @@ -3158,7 +3159,7 @@ class ReplicaManager(val config: KafkaConfig,
}
}
}
if (!_inklessMetadataView.isDisklessTopic(tp.topic()) || isConsolidatingDisklessTopic) {
if (!isDiskless || isConsolidatingDisklessTopic) {
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
try {
followerTopicSet.add(tp.topic)
Expand Down Expand Up @@ -3188,7 +3189,7 @@ class ReplicaManager(val config: KafkaConfig,
case e: KafkaStorageException =>
stateChangeLogger.error(s"Unable to start fetching $tp " +
s"with topic ID ${info.topicId} due to a storage error ${e.getMessage}", e)
if (_inklessMetadataView.isConsolidatingDisklessTopic(tp.topic))
if (isConsolidatingDisklessTopic)
consolidationFetcherManager.foreach(_.addFailedPartition(tp))
else
replicaFetcherManager.addFailedPartition(tp)
Expand All @@ -3201,7 +3202,7 @@ class ReplicaManager(val config: KafkaConfig,
case e: Throwable =>
stateChangeLogger.error(s"Unable to start fetching $tp " +
s"with topic ID ${info.topicId} due to ${e.getClass.getSimpleName}", e)
if (_inklessMetadataView.isConsolidatingDisklessTopic(tp.topic))
if (isConsolidatingDisklessTopic)
consolidationFetcherManager.foreach(_.addFailedPartition(tp))
else
replicaFetcherManager.addFailedPartition(tp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,14 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf
}

def getClassicToDisklessStartOffset(topicPartition: TopicPartition): Long = {
Option(metadataCache.currentImage().topics().getTopic(topicPartition.topic()))
.flatMap(topicImage => Option(topicImage.partitions().get(topicPartition.partition())))
.map(_.classicToDisklessStartOffset)
.getOrElse(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET)
// Hot path: called for every fetch request, every replica fetch response and every
// ListOffsets request. Avoid Option/Some allocations by going through nullable
// intermediates and inlining the lookup chain.
val topic = metadataCache.currentImage().topics().getTopic(topicPartition.topic())
if (topic == null) return PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET
val partReg = topic.partitions().get(topicPartition.partition())
if (partReg == null) PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET
else partReg.classicToDisklessStartOffset
}

override def getTopicConfig(topicName: String): LogConfig = topicConfigs.computeIfAbsent(topicName, t => {
Expand Down
Loading