Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ class InitDisklessLogManager(
private[server] val initialRetryBackoffMs = 1000L
private[server] val maxRetryBackoffMs = 30000L

def trackedPartitions: Set[TopicPartition] = tracked.keySet().asScala.toSet
private[server] def getTrackedPartitions: Set[TopicPartition] = tracked.keySet().asScala.toSet

def initState(tp: TopicPartition): Option[InitState] =
private[server] def getInitState(tp: TopicPartition): Option[InitState] =
Option(tracked.get(tp)).map(_.state)

/**
Expand Down
26 changes: 23 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kafka.server.metadata.{InklessMetadataView, KRaftMetadataCache}
import kafka.server.share.DelayedShareFetch
import kafka.utils._
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.{Plugin, Topic}
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
Expand All @@ -53,7 +54,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataDelta, MetadataImage, TopicsDelta}
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
Expand Down Expand Up @@ -581,6 +582,25 @@ class ReplicaManager(val config: KafkaConfig,
}
}

def sealExistingLeadersOfTopicsMigratedToDiskless(delta: MetadataDelta, newImage: MetadataImage): Unit = {
Option(delta.configsDelta()).foreach { configsDelta =>
configsDelta.changes().forEach { (resource, _) =>
if (resource.`type`() == ConfigResource.Type.TOPIC) {
val topicName = resource.name()
val oldProps = delta.image().configs().configProperties(resource)
val wasDiskless = oldProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
val newProps = newImage.configs().configProperties(resource)
val isDiskless = newProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
val topicExistedBefore = delta.image().topics().getTopic(topicName) != null
if (topicExistedBefore && !wasDiskless && isDiskless) {
info(s"Topic $topicName transitioning from classic to diskless, sealing leader partitions")
sealTopicPartitions(topicName)
}
}
}
}
}

private def offlinePartitionCount: Int = {
allPartitions.values.asScala.iterator.count(_.getClass == HostedPartition.Offline.getClass)
}
Expand Down Expand Up @@ -2790,12 +2810,12 @@ class ReplicaManager(val config: KafkaConfig,
// so that no produce request can ever be processed by the new leader.
val partition = existingPartition.get
try {
partition.seal()
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
partition.seal()
partition.makeLeader(info.partition, false, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)

initDisklessLogManager.foreach(_.registerPartition(partition, info.topicId()))
changedPartitions.add(partition)
initDisklessLogManager.foreach(_.registerPartition(partition, info.topicId))
} catch {
case e: KafkaStorageException =>
stateChangeLogger.info(s"Skipped the become-leader state change for transitioning partition $tp " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import kafka.server.share.SharePartitionManager
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta, KRaftCoordinatorMetadataImage}
Expand Down Expand Up @@ -146,8 +145,7 @@ class BrokerMetadataPublisher(

// Seal existing leader partitions for topics transitioning from classic to diskless.
// New leaders elected are instead sealed inside ReplicaManager.applyLocalLeadersDelta.
sealExistingLeadersOfTopicsMigratedToDiskless(delta, newImage)

replicaManager.sealExistingLeadersOfTopicsMigratedToDiskless(delta, newImage)
// Apply topic deltas.
Option(delta.topicsDelta()).foreach { topicsDelta =>
try {
Expand Down Expand Up @@ -344,24 +342,6 @@ class BrokerMetadataPublisher(
}
}

private def sealExistingLeadersOfTopicsMigratedToDiskless(delta: MetadataDelta, newImage: MetadataImage): Unit = {
Option(delta.configsDelta()).foreach { configsDelta =>
configsDelta.changes().forEach { (resource, _) =>
if (resource.`type`() == ConfigResource.Type.TOPIC) {
val topicName = resource.name()
val oldProps = delta.image().configs().configProperties(resource)
val wasDiskless = oldProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
val newProps = newImage.configs().configProperties(resource)
val isDiskless = newProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
if (!wasDiskless && isDiskless) {
info(s"Topic $topicName transitioning from classic to diskless, sealing leader partitions")
replicaManager.sealTopicPartitions(topicName)
}
}
}
}
}

Comment on lines -347 to -364

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved all the logic inside ReplicaManager

private def initializeManagers(newImage: MetadataImage): Unit = {
try {
// Start log manager, which will perform (potentially lengthy)
Expand Down
Loading
Loading