Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ class Partition(val topicPartition: TopicPartition,
@volatile private[cluster] var partitionState: PartitionState = new CommittedPartitionState(util.Set.of(), LeaderRecoveryState.RECOVERED)
@volatile var assignmentState: AssignmentState = new SimpleAssignmentState(util.List.of())

// When true, the partition is sealed for classic-to-diskless migration.
// A sealed partition rejects all produce requests via appendRecordsToLeader.
// Set under the write lock to guarantee that once sealed, LEO cannot increase.
@volatile private var _sealed: Boolean = false

// Logs belonging to this partition. Majority of time it will be only one log, but if log directory
// is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy
// completes and a switch to new location is performed.
Expand Down Expand Up @@ -252,6 +257,15 @@ class Partition(val topicPartition: TopicPartition,
*/
def isAtMinIsr: Boolean = leaderLogIfLocal.exists { partitionState.isr.size == effectiveMinIsr(_) }

def isSealed: Boolean = _sealed

def seal(): Unit = inWriteLock(leaderIsrUpdateLock) {
if (!_sealed) {
_sealed = true
stateChangeLogger.info(s"Sealed partition $topicPartition for diskless migration with LEO ${localLogOrException.logEndOffset}")
}
}

def isReassigning: Boolean = assignmentState.isInstanceOf[OngoingReassignmentState]

def isAddingLocalReplica: Boolean = assignmentState.isAddingReplica(localBrokerId)
Expand Down Expand Up @@ -1224,6 +1238,12 @@ class Partition(val topicPartition: TopicPartition,
transactionVersion: Short = TransactionVersion.TV_UNKNOWN
): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
if (_sealed) {
// Force metadata refresh and client retries while the migration from classic to diskless is still ongoing.
throw new ReplicaNotAvailableException(
s"Partition $topicPartition is sealed for diskless migration on broker $localBrokerId")
}

leaderLogIfLocal match {
case Some(leaderLog) =>
val minIsr = effectiveMinIsr(leaderLog)
Expand Down
37 changes: 34 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,18 @@ class ReplicaManager(val config: KafkaConfig,
}
}

def sealTopicPartitions(topic: String): Unit = {
allPartitions.forEach { (tp, hostedPartition) =>
if (tp.topic() == topic) {
hostedPartition match {
case HostedPartition.Online(partition) if partition.isLeader =>
partition.seal()
case _ =>
}
}
}
}

private def offlinePartitionCount: Int = {
allPartitions.values.asScala.iterator.count(_.getClass == HostedPartition.Offline.getClass)
}
Expand Down Expand Up @@ -2736,10 +2748,12 @@ class ReplicaManager(val config: KafkaConfig,
"local leaders.")
replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
localLeaders.foreachEntry { (tp, info) =>
if (!_inklessMetadataView.isDisklessTopic(tp.topic()))
val isDiskless = _inklessMetadataView.isDisklessTopic(tp.topic())
val existingPartition = onlinePartition(tp)
if (!isDiskless) {
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
try {
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
partition.makeLeader(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)

changedPartitions.add(partition)
Expand All @@ -2754,6 +2768,23 @@ class ReplicaManager(val config: KafkaConfig,
markPartitionOffline(tp)
}
}
} else if (existingPartition.isDefined) {
// Classic-to-diskless transition: seal the partition before making it leader
// so that no produce request can ever be processed by the new leader.
val partition = existingPartition.get
try {
partition.seal()
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
partition.makeLeader(info.partition, false, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
Comment thread
giuseppelillo marked this conversation as resolved.

changedPartitions.add(partition)
} catch {
case e: KafkaStorageException =>
stateChangeLogger.info(s"Skipped the become-leader state change for transitioning partition $tp " +
s"with topic id ${info.topicId} due to a storage error ${e.getMessage}")
markPartitionOffline(tp)
}
}
}
}

Expand Down Expand Up @@ -2781,7 +2812,7 @@ class ReplicaManager(val config: KafkaConfig,
// - This also ensures that the local replica is created even if the leader
// is unavailable. This is required to ensure that we include the partition's
// high watermark in the checkpoint file (see KAFKA-1647).
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
val isNewLeaderEpoch = partition.makeFollower(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)

if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.server.share.SharePartitionManager
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta, KRaftCoordinatorMetadataImage}
Expand Down Expand Up @@ -143,6 +144,10 @@ class BrokerMetadataPublisher(
debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
}

// Seal existing leader partitions for topics transitioning from classic to diskless.
// New leaders elected are instead sealed inside ReplicaManager.applyLocalLeadersDelta.
sealExistingLeadersOfTopicsMigratedToDiskless(delta, newImage)

// Apply topic deltas.
Option(delta.topicsDelta()).foreach { topicsDelta =>
try {
Expand Down Expand Up @@ -339,6 +344,24 @@ class BrokerMetadataPublisher(
}
}

private def sealExistingLeadersOfTopicsMigratedToDiskless(delta: MetadataDelta, newImage: MetadataImage): Unit = {
Option(delta.configsDelta()).foreach { configsDelta =>
configsDelta.changes().forEach { (resource, _) =>
if (resource.`type`() == ConfigResource.Type.TOPIC) {
val topicName = resource.name()
val oldProps = delta.image().configs().configProperties(resource)
val wasDiskless = oldProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
val newProps = newImage.configs().configProperties(resource)
val isDiskless = newProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
if (!wasDiskless && isDiskless) {
info(s"Topic $topicName transitioning from classic to diskless, sealing leader partitions")
replicaManager.sealTopicPartitions(topicName)
}
}
}
}
}

private def initializeManagers(newImage: MetadataImage): Unit = {
try {
// Start log manager, which will perform (potentially lengthy)
Expand Down
100 changes: 99 additions & 1 deletion core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric
import kafka.log.LogManager
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, UnknownLeaderEpochException}
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, ReplicaNotAvailableException, UnknownLeaderEpochException}
import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
Expand Down Expand Up @@ -4073,4 +4073,102 @@ class PartitionTest extends AbstractPartitionTest {
partition.setLog(mockLog, false)
assertThrows(classOf[PolicyViolationException], () => partition.deleteRecordsOnLeader(1L))
}

@Test
def testSealPartition(): Unit = {
val leaderEpoch = 1
partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)

assertFalse(partition.isSealed)

partition.seal()

assertTrue(partition.isSealed)
}

@Test
def testSealIsIdempotent(): Unit = {
val leaderEpoch = 1
partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)

partition.seal()
assertTrue(partition.isSealed)

partition.seal()
assertTrue(partition.isSealed)
}

@Test
def testSealedPartitionRejectsAppends(): Unit = {
val leaderEpoch = 1
partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)

val requestLocal = RequestLocal.withThreadConfinedCaching
val records = TestUtils.records(List(new SimpleRecord("k".getBytes, "v".getBytes)))

// Appending before sealing should succeed
partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)

partition.seal()

// Appending after sealing should throw ReplicaNotAvailableException
val newRecords = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes)))
assertThrows(classOf[ReplicaNotAvailableException], () =>
partition.appendRecordsToLeader(newRecords, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal))
}

@Test
def testSealedPartitionStabilizesLeo(): Unit = {
val leaderEpoch = 1
partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)

val requestLocal = RequestLocal.withThreadConfinedCaching
val records = TestUtils.records(List(new SimpleRecord("k".getBytes, "v".getBytes)))
partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)

val leoBeforeSeal = partition.localLogOrException.logEndOffset

partition.seal()

// LEO should remain stable after sealing
assertEquals(leoBeforeSeal, partition.localLogOrException.logEndOffset)

// Further appends are rejected
val newRecords = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes)))
assertThrows(classOf[ReplicaNotAvailableException], () =>
partition.appendRecordsToLeader(newRecords, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal))

// LEO remains unchanged
assertEquals(leoBeforeSeal, partition.localLogOrException.logEndOffset)
}

@Test
def testSealedPartitionPreservedAcrossMakeLeader(): Unit = {
val leaderEpoch = 1
partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)

partition.seal()
assertTrue(partition.isSealed)

// Transitioning to leader with a new epoch should not clear the sealed flag
val replicas = Array(brokerId, remoteReplicaId)
val newRegistration = new PartitionRegistration.Builder()
.setLeader(brokerId)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(leaderEpoch + 1)
.setIsr(replicas)
.setPartitionEpoch(2)
.setReplicas(replicas)
.setDirectories(DirectoryId.unassignedArray(replicas.length))
.build()

partition.makeLeader(newRegistration, isNew = false, offsetCheckpoints, None)

assertTrue(partition.isSealed, "Sealed flag should be preserved across makeLeader")

val requestLocal = RequestLocal.withThreadConfinedCaching
val records = TestUtils.records(List(new SimpleRecord("k".getBytes, "v".getBytes)))
assertThrows(classOf[ReplicaNotAvailableException], () =>
partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal))
}
}
120 changes: 120 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6861,6 +6861,126 @@ class ReplicaManagerTest {

// TODO: Add more fetch tests combinations, edge cases ara not covered yet.

@Test
def testSealTopicPartitionsSealsOnlyLeadersOfTargetTopic(): Unit = {
val props = TestUtils.createBrokerConfig(0)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties()))
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION)
val rm = new ReplicaManager(
metrics = metrics,
config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = metadataCache,
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager)

try {
val topicToSeal = "topic-to-seal"
val otherTopic = "other-topic"
val sealTopicId = Uuid.randomUuid()
val otherTopicId = Uuid.randomUuid()
setupMetadataCacheWithTopicIds(Map(topicToSeal -> sealTopicId, otherTopic -> otherTopicId), metadataCache)

val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicToSeal).setTopicId(sealTopicId))
delta.replay(new TopicRecord().setName(otherTopic).setTopicId(otherTopicId))
// topicToSeal: partition 0 is leader, partition 1 is follower
delta.replay(new PartitionRecord()
.setPartitionId(0).setTopicId(sealTopicId)
.setReplicas(util.Arrays.asList(0, 1)).setIsr(util.Arrays.asList(0, 1))
.setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0))
delta.replay(new PartitionRecord()
.setPartitionId(1).setTopicId(sealTopicId)
.setReplicas(util.Arrays.asList(0, 1)).setIsr(util.Arrays.asList(0, 1))
.setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0))
// otherTopic: partition 0 is leader
delta.replay(new PartitionRecord()
.setPartitionId(0).setTopicId(otherTopicId)
.setReplicas(util.Arrays.asList(0, 1)).setIsr(util.Arrays.asList(0, 1))
.setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0))
val image = imageFromTopics(delta.apply())
rm.applyDelta(delta, image)

val sealLeader = rm.getPartitionOrException(new TopicPartition(topicToSeal, 0))
val sealFollower = rm.getPartitionOrException(new TopicPartition(topicToSeal, 1))
val otherLeader = rm.getPartitionOrException(new TopicPartition(otherTopic, 0))

rm.sealTopicPartitions(topicToSeal)

assertTrue(sealLeader.isSealed, "Leader of target topic should be sealed")
assertFalse(sealFollower.isSealed, "Follower of target topic should not be sealed")
assertFalse(otherLeader.isSealed, "Leader of other topic should not be sealed")
} finally {
rm.shutdown(checkpointHW = false)
}
}

@Test
def testApplyDeltaSealsMigratedLeaderPartitions(): Unit = {
val props = TestUtils.createBrokerConfig(0)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties()))
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
val kraftMetadataCache: KRaftMetadataCache = mock(classOf[KRaftMetadataCache])
when(kraftMetadataCache.topicConfig(anyString())).thenReturn(new Properties())
mockGetAliveBrokerFunctions(kraftMetadataCache, aliveBrokers)
when(kraftMetadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION)
val topicName = "transitioning-topic"
val transTopicId = Uuid.randomUuid()
setupMetadataCacheWithTopicIds(Map(topicName -> transTopicId), kraftMetadataCache)

val inklessMetadata = mock(classOf[InklessMetadataView])
when(inklessMetadata.isDisklessTopic(any())).thenReturn(false)

val rm = new ReplicaManager(
metrics = metrics,
config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = kraftMetadataCache,
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
inklessMetadataView = Some(inklessMetadata))

try {
// First, create the partition as a classic follower (leader is broker 1)
val createDelta = new TopicsDelta(TopicsImage.EMPTY)
createDelta.replay(new TopicRecord().setName(topicName).setTopicId(transTopicId))
createDelta.replay(new PartitionRecord()
.setPartitionId(0).setTopicId(transTopicId)
.setReplicas(util.Arrays.asList(0, 1)).setIsr(util.Arrays.asList(0, 1))
.setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0))
val createImage = imageFromTopics(createDelta.apply())
rm.applyDelta(createDelta, createImage)

val partition = rm.getPartitionOrException(new TopicPartition(topicName, 0))
assertFalse(partition.isLeader)
assertFalse(partition.isSealed)

// Now simulate a follower->leader transition while the topic becomes diskless
when(inklessMetadata.isDisklessTopic(topicName)).thenReturn(true)

val changeDelta = new TopicsDelta(createDelta.apply())
changeDelta.replay(new PartitionChangeRecord()
.setPartitionId(0).setTopicId(transTopicId)
.setLeader(0).setIsr(util.Arrays.asList(0, 1)))
val changeImage = imageFromTopics(changeDelta.apply())
rm.applyDelta(changeDelta, changeImage)

assertTrue(partition.isSealed, "Transitioning leader partition should be sealed after applyDelta")
} finally {
rm.shutdown(checkpointHW = false)
}
}

private def mockFetchHandler(disklessResponse: Map[TopicIdPartition, FetchPartitionData]) = {
// We use constructor mocking here to inject a FetchHandler mock into ReplicaManager,
// because ReplicaManager internally constructs its own FetchHandler instance and does not
Expand Down
Loading
Loading