Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3076,6 +3076,16 @@ class ReplicaManager(val config: KafkaConfig,
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
partition.makeLeader(info.partition, isNew, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
partition.seal()
// makeLeader reloads HW from the on-disk checkpoint, which may be stale
// (unclean shutdown, or checkpoint interval hadn't fired). Since the
// partition is sealed, no produces or follower fetches can ever advance
// HW naturally — restore it to the seal offset so consumers can read.
val sealOffset = _inklessMetadataView.getClassicToDisklessStartOffset(tp)
if (sealOffset >= 0 && partition.localLogOrException.highWatermark < sealOffset) {
partition.localLogOrException.maybeUpdateHighWatermark(sealOffset)
stateChangeLogger.info(s"Advanced high watermark to seal offset $sealOffset for " +
s"switched leader partition $tp after restart")
Comment thread
jeqo marked this conversation as resolved.
}
changedPartitions.add(partition)
} catch {
case e: KafkaStorageException =>
Expand Down
86 changes: 86 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9342,6 +9342,92 @@ class ReplicaManagerTest {
}
}

@Test
def testApplyDeltaAdvancesHwmToSealOffsetForSealedLeaderWithStaleCheckpoint(): Unit = {
// After restart, makeLeader reloads HW from the on-disk checkpoint file.
// If the checkpoint is stale (e.g. unclean shutdown, or checkpoint interval
// hadn't fired since HW advanced), HW will be below the seal offset.
// Since the partition is sealed, no produces or follower fetches can ever
// advance HW naturally — consumers cannot read classic data below the seal.
// applyDelta must detect this and restore HW to the seal offset.
val topicName = "switched-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1

val replicaManager = spy(createReplicaManager(List(topicName)))
try {
val log = replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId))
populateLocalLogAtLeoAndCheckpointedHwm(replicaManager, tp, log, leo = 10L, hw = 5L)

// Mark the partition as fully switched with classicToDisklessStartOffset = 10.
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(tp)).thenReturn(10L)

// Apply a delta that makes this broker the leader (post-restart path).
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, brokerId + 1))
.setIsr(util.Arrays.asList(brokerId, brokerId + 1))
.setLeader(brokerId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))

val partition = replicaManager.getPartition(tp)
assertTrue(partition.isInstanceOf[HostedPartition.Online], "Partition should be online")
val onlinePartition = partition.asInstanceOf[HostedPartition.Online].partition
assertTrue(onlinePartition.isLeader, "Partition should be leader")
assertTrue(onlinePartition.isSealed, "Partition should be sealed")
// HW must have been advanced to the seal offset.
assertEquals(10L, onlinePartition.localLogOrException.highWatermark,
"High watermark should be advanced to the seal offset")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testApplyDeltaDoesNotAdvanceHwmForSealedLeaderWithCurrentCheckpoint(): Unit = {
// Post-restart with a fresh checkpoint (HW == seal): no advancement needed.
val topicName = "switched-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1

val replicaManager = spy(createReplicaManager(List(topicName)))
try {
val log = replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId))
populateLocalLogAtLeoAndCheckpointedHwm(replicaManager, tp, log, leo = 10L, hw = 10L)

when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(tp)).thenReturn(10L)

val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, brokerId + 1))
.setIsr(util.Arrays.asList(brokerId, brokerId + 1))
.setLeader(brokerId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
replicaManager.applyDelta(delta, imageFromTopics(delta.apply()))

val partition = replicaManager.getPartition(tp)
val onlinePartition = partition.asInstanceOf[HostedPartition.Online].partition
// HW should remain at 10 (unchanged, since it's already at the seal).
assertEquals(10L, onlinePartition.localLogOrException.highWatermark,
"High watermark should remain at seal offset when already caught up")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testApplyDeltaSkipsPartitionForConsolidatingDisklessTopicWithLocalLogOnLeader(): Unit = {
val topicName = "consolidating-topic"
Expand Down
Loading