Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2949,6 +2949,22 @@ class ReplicaManager(val config: KafkaConfig,
s"with topic id ${info.topicId} due to a storage error ${e.getMessage}")
markPartitionOffline(tp)
}
} else if (logManager.getLog(tp).isDefined && !isConsolidatingDisklessTopic) {
// Post-restart: diskless topic still has classic data on local disk (offsets < disklessStartOffset).
// Create the Partition so classic data remains accessible for reads.
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
try {
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
partition.makeLeader(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
partition.seal()
changedPartitions.add(partition)
} catch {
case e: KafkaStorageException =>
stateChangeLogger.info(s"Skipped the become-leader state change for migrated partition $tp " +
s"with topic id ${info.topicId} due to a storage error ${e.getMessage}")
markPartitionOffline(tp)
}
}
}
}

Expand Down Expand Up @@ -3001,6 +3017,23 @@ class ReplicaManager(val config: KafkaConfig,
if (_inklessMetadataView.isDisklessTopic(tp.topic())) {
// Clean up migration tracking since only the leader drives classic -> diskless migration.
initDisklessLogManager.foreach(_.removePartition(tp))
// Post-restart: diskless topic still has classic data on local disk.
// Create the Partition so classic data remains accessible for reads.
if (logManager.getLog(tp).isDefined && !isConsolidatingDisklessTopic) {
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
Comment thread
viktorsomogyi marked this conversation as resolved.
try {
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
partition.makeFollower(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
partition.seal()
changedPartitions.add(partition)
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Unable to create follower for migrated partition $tp " +
s"with topic ID ${info.topicId} due to a storage error ${e.getMessage}", e)
markPartitionOffline(tp)
}
}
}
}
if (!_inklessMetadataView.isDisklessTopic(tp.topic()) || isConsolidatingDisklessTopic) {
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
Expand Down
218 changes: 218 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7177,6 +7177,224 @@ class ReplicaManagerTest {
consolidationCtor.close()
}
}

@Test
def testApplyDeltaCreatesPartitionForDisklessTopicWithLocalLogOnLeader(): Unit = {
val topicName = "migrated-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1

val replicaManager = createReplicaManager(List(topicName))
try {
// Pre-create a local log to simulate a post-restart scenario where the log
// was loaded from disk but no Partition object exists yet.
replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId))
assertTrue(replicaManager.logManager.getLog(tp).isDefined)

// No partition should exist before applying the delta.
assertEquals(HostedPartition.None, replicaManager.getPartition(tp))

// Apply a delta that makes this broker the leader.
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, brokerId + 1))
.setIsr(util.Arrays.asList(brokerId, brokerId + 1))
.setLeader(brokerId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))

// Partition should now exist, be online, sealed, and be the leader.
val partition = replicaManager.getPartition(tp)
assertTrue(partition.isInstanceOf[HostedPartition.Online], "Partition should be online")
val onlinePartition = partition.asInstanceOf[HostedPartition.Online].partition
assertTrue(onlinePartition.isLeader, "Partition should be leader")
assertTrue(onlinePartition.isSealed, "Partition should be sealed")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testApplyDeltaSkipsPartitionForConsolidatingDisklessTopicWithLocalLogOnLeader(): Unit = {
val topicName = "consolidating-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1

val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = {
case (mock, _) =>
when(mock.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
}
val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit)
try {
val replicaManager = createReplicaManager(
List(topicName),
disklessRemoteStorageConsolidationEnabled = true,
consolidatingDisklessTopics = Set(topicName),
)
try {
replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId))
assertTrue(replicaManager.logManager.getLog(tp).isDefined)

assertEquals(HostedPartition.None, replicaManager.getPartition(tp))

val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, brokerId + 1))
.setIsr(util.Arrays.asList(brokerId, brokerId + 1))
.setLeader(brokerId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))

// No sealed partition should be created for a consolidating diskless topic,
// even though a local log exists.
val partition = replicaManager.getPartition(tp)
assertTrue(partition.isInstanceOf[HostedPartition.Online], "Partition should be online")
val onlinePartition = partition.asInstanceOf[HostedPartition.Online].partition
assertTrue(onlinePartition.isLeader, "Partition should be leader")
assertFalse(onlinePartition.isSealed, "Partition should NOT be sealed")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
} finally {
consolidationCtor.close()
}
}

@Test
def testApplyDeltaSkipsPartitionForConsolidatingDisklessTopicWithLocalLogOnFollower(): Unit = {
val topicName = "consolidating-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1
val leaderId = 2

val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = {
case (mock, _) =>
when(mock.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
}
val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit)
try {
val replicaManager = createReplicaManager(
List(topicName),
disklessRemoteStorageConsolidationEnabled = true,
consolidatingDisklessTopics = Set(topicName),
)
try {
replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId))
assertTrue(replicaManager.logManager.getLog(tp).isDefined)

assertEquals(HostedPartition.None, replicaManager.getPartition(tp))

val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, leaderId))
.setIsr(util.Arrays.asList(brokerId, leaderId))
.setLeader(leaderId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))

// No sealed partition should be created for a consolidating diskless topic on a follower,
// even though a local log exists.
val partition = replicaManager.getPartition(tp)
assertTrue(partition.isInstanceOf[HostedPartition.Online], "Partition should be online")
val onlinePartition = partition.asInstanceOf[HostedPartition.Online].partition
assertFalse(onlinePartition.isLeader, "Partition should be follower")
assertFalse(onlinePartition.isSealed, "Partition should NOT be sealed")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
} finally {
consolidationCtor.close()
}
}

@Test
def testApplyDeltaCreatesPartitionForDisklessTopicWithLocalLogOnFollower(): Unit = {
val topicName = "migrated-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1
val leaderId = 2

val replicaManager = createReplicaManager(List(topicName))
try {
replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId))
assertTrue(replicaManager.logManager.getLog(tp).isDefined)

assertEquals(HostedPartition.None, replicaManager.getPartition(tp))

// Apply a delta that makes this broker a follower.
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, leaderId))
.setIsr(util.Arrays.asList(brokerId, leaderId))
.setLeader(leaderId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))

val partition = replicaManager.getPartition(tp)
assertTrue(partition.isInstanceOf[HostedPartition.Online], "Partition should be online")
val onlinePartition = partition.asInstanceOf[HostedPartition.Online].partition
assertFalse(onlinePartition.isLeader, "Partition should be follower")
assertTrue(onlinePartition.isSealed, "Partition should be sealed")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testApplyDeltaSkipsPartitionForDisklessTopicWithoutLocalLog(): Unit = {
val topicName = "fully-diskless-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1

val replicaManager = createReplicaManager(List(topicName))
try {
// No local log exists — this is a fully-diskless topic with no classic data.
assertTrue(replicaManager.logManager.getLog(tp).isEmpty)

val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, brokerId + 1))
.setIsr(util.Arrays.asList(brokerId, brokerId + 1))
.setLeader(brokerId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))

// No partition should be created for a diskless topic without local data.
assertEquals(HostedPartition.None, replicaManager.getPartition(tp))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

private def mockFetchHandler(disklessResponse: Map[TopicIdPartition, FetchPartitionData]) = {
// We use constructor mocking here to inject a FetchHandler mock into ReplicaManager,
Expand Down
Loading