Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class Partition(val topicPartition: TopicPartition,
@volatile private[cluster] var partitionState: PartitionState = new CommittedPartitionState(util.Set.of(), LeaderRecoveryState.RECOVERED)
@volatile var assignmentState: AssignmentState = new SimpleAssignmentState(util.List.of())

// When true, the partition is sealed for classic-to-diskless migration.
// When true, the partition is sealed for classic-to-diskless switch.
// A sealed partition rejects all produce requests via appendRecordsToLeader.
// Set under the write lock to guarantee that once sealed, LEO cannot increase.
@volatile private var _sealed: Boolean = false
Expand Down Expand Up @@ -272,14 +272,14 @@ class Partition(val topicPartition: TopicPartition,
abortOngoingTransactions(leaderLog)
}
_sealed = true
stateChangeLogger.info(s"Sealed partition $topicPartition for diskless migration with LEO ${leaderLog.logEndOffset}")
stateChangeLogger.info(s"Sealed partition $topicPartition for diskless switch with LEO ${leaderLog.logEndOffset}")
}
}

/**
* Abort all ongoing transactions by appending ABORT markers directly to the log.
* Diskless topics do not support transactions, so any in-flight transaction must
* be resolved before migration proceeds.
* be resolved before the switch from classic to diskless proceeds.
* @throws KafkaStorageException If transaction abortion fails due to an I/O error.
*/
private def abortOngoingTransactions(leaderLog: UnifiedLog): Unit = {
Expand All @@ -304,7 +304,7 @@ class Partition(val topicPartition: TopicPartition,
RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_0.featureLevel())
stateChangeLogger.info(s"Aborted ongoing transaction for producer $producerId " +
s"(epoch=$producerEpoch, txnFirstOffset=$txnFirstOffset) " +
s"on partition $topicPartition before sealing for diskless migration")
s"on partition $topicPartition before sealing for diskless switch")
}
}

Expand Down Expand Up @@ -1281,9 +1281,9 @@ class Partition(val topicPartition: TopicPartition,
): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
if (_sealed) {
// Force metadata refresh and client retries while the migration from classic to diskless is still ongoing.
// Force metadata refresh and client retries while the switch from classic to diskless is still ongoing.
throw new ReplicaNotAvailableException(
s"Partition $topicPartition is sealed for diskless migration on broker $localBrokerId")
s"Partition $topicPartition is sealed for diskless switch on broker $localBrokerId")
}

leaderLogIfLocal match {
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ import scala.jdk.OptionConverters.RichOptional
* 2. Hybrid diskless partition (`classicToDisklessStartOffset > 0` and managed replicas enabled,
* or a consolidating diskless topic): fetch offset in classic and/or diskless path, depending
* on the requested timestamp. Consolidating topics also allow followers on the classic leg
* (same rationale as sealed migrated replicas: clients may be routed to any ISR replica).
* 3. Partition that is being migrated from classic to diskless
* (`classicToDisklessStartOffset == CLASSIC_TO_DISKLESS_MIGRATION_PENDING` and managed
* (same rationale as sealed switched replicas: clients may be routed to any ISR replica).
* 3. Partition that is being switched from classic to diskless
* (`classicToDisklessStartOffset == CLASSIC_TO_DISKLESS_SWITCH_PENDING` and managed
* replicas enabled, and not a consolidating diskless topic): fetch offset only in the
* classic path.
* 4. Follower requests on a migrated partition: ListOffsets requests (used by ReplicaFetcher
* 4. Follower requests on a switched partition: ListOffsets requests (used by ReplicaFetcher
* for truncation / initial offset bootstrap) must never see diskless offsets, otherwise the
* follower would try to truncate to or fetch from offsets that live only in object storage:
* fetch offset only in the classic path.
Expand Down Expand Up @@ -93,34 +93,34 @@ class DisklessFetchOffsetRouter(
classicFetchOffset: (TopicPartition, ListOffsetsPartition, Boolean) => ListOffsetsPartitionStatus
): ListOffsetsPartitionStatus = {
val classicToDisklessStartOffset = inklessMetadataView.getClassicToDisklessStartOffset(topicPartition)
val migrationPending = classicToDisklessStartOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING
val isMigratedWithClassicAccess = (classicToDisklessStartOffset > 0 && disklessManagedReplicasEnabled)
val switchPending = classicToDisklessStartOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING
val isSwitchedWithClassicAccess = (classicToDisklessStartOffset > 0 && disklessManagedReplicasEnabled)
val isConsolidatingPartition = disklessConsolidationEnabled && inklessMetadataView.isConsolidatingDisklessTopic(topicPartition.topic)
Comment thread
giuseppelillo marked this conversation as resolved.

// Migrated partitions seal their classic local log: once classicToDisklessStartOffset is
// Switched partitions seal their classic local log: once classicToDisklessStartOffset is
// committed the LEO can no longer grow and every ISR replica has the same data on disk.
// Any replica can therefore safely answer ListOffsets from its local log, so we let
// followers serve the classic-side query as well.
// Since consolidating partitions contain only data that has been stored in the diskless
// coordinator and its offsets won't change, we can allow follower requests.
val allowFromFollower = isMigratedWithClassicAccess || isConsolidatingPartition
val allowFromFollower = isSwitchedWithClassicAccess || isConsolidatingPartition
val isFollowerRequest = replicaId >= 0

def classicLookup(): ListOffsetsPartitionStatus = classicFetchOffset(topicPartition, partition, allowFromFollower)
def disklessLookup: Lookup = disklessLookupOnJob(job, topicPartition, partition)
def disklessLookupOnNewJob: Lookup = disklessLookupOnJob(newJob(), topicPartition, partition, startNow = true)

// Consolidating diskless topics can still carry CLASSIC_TO_DISKLESS_MIGRATION_PENDING in
// Consolidating diskless topics can still carry CLASSIC_TO_DISKLESS_SWITCH_PENDING in
// metadata while the leader serves (or is catching up) from object storage; ListOffsets must
// not be forced down the classic-only path in that case (see ReplicaManager diskless fetch
// routing for the analogous consolidating carve-out).
if (migrationPending && disklessManagedReplicasEnabled && !isConsolidatingPartition) {
if (switchPending && disklessManagedReplicasEnabled && !isConsolidatingPartition) {
// Case 3.
classicFetchOffset(topicPartition, partition, false)
} else if (isMigratedWithClassicAccess && isFollowerRequest) {
} else if (isSwitchedWithClassicAccess && isFollowerRequest) {
// Case 4.
classicLookup()
} else if (isMigratedWithClassicAccess || isConsolidatingPartition) {
} else if (isSwitchedWithClassicAccess || isConsolidatingPartition) {
// Case 2: hybrid, route by timestamp.
partition.timestamp() match {
case ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP | ListOffsetsRequest.LATEST_TIERED_TIMESTAMP =>
Expand Down
56 changes: 28 additions & 28 deletions core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class InitDisklessLogManager(
}

/**
* Register a sealed partition for migration. Registers this manager as a
* Register a sealed partition for init diskless log. Registers this manager as a
* PartitionListener to receive HW advancement notifications. If HW already
* equals LEO, immediately marks the partition ready and schedules a batch
* send. Otherwise, waits for HW advancement notifications.
Expand Down Expand Up @@ -165,7 +165,7 @@ class InitDisklessLogManager(
enqueueSendingToController(tp, sendingToController)
sendingToController
case _: Failed =>
// remove from tracking and count as a failed migration.
// remove from tracking and count as a failed init diskless log operation.
metrics.markFailed()
null
case waitingForReplication: WaitingForReplication => waitingForReplication
Expand All @@ -184,7 +184,7 @@ class InitDisklessLogManager(
} else {
// PermanentFailure on controller call (e.g. FENCED_LEADER_EPOCH /
// INVALID_REQUEST) or local pre-flight rejection (leader/log lost):
// the migration won't progress.
// the init diskless log operation won't progress.
// Only count a failure when this callback observed `tp` as still
// tracked. `removePartition` completes the queue promise with `false`
// (via `queue.remove`) AFTER clearing `tracked`, so the cancellation
Expand All @@ -209,7 +209,7 @@ class InitDisklessLogManager(
)
// Only count completion when this callback observed `tp` as still
// tracked. If `removePartition` cleared `tracked` (and/or the queue
// promise) concurrently, the migration was cancelled externally and
// promise) concurrently, the init diskless log operation was cancelled externally and
// we must not inflate the completed meter.
if (tracked.remove(tp) != null) {
metrics.markCompleted()
Expand Down Expand Up @@ -248,34 +248,34 @@ object InitDisklessLogManager {
private val MetricsPackage = "kafka.server"
private val MetricsClassName = "InitDisklessLogManager"

private[server] val MigrationsInFlightMetricName = "ClassicToDisklessMigrationsInFlight"
private[server] val WaitingForReplicationCountMetricName = "ClassicToDisklessMigrationsWaitingForReplicationCount"
private[server] val SendingToControllerCountMetricName = "ClassicToDisklessMigrationsSendingToControllerCount"
private[server] val AwaitingMetadataCountMetricName ="ClassicToDisklessMigrationsAwaitingMetadataCount"
private[server] val InFlightPartitionsMetricName = "InFlightPartitions"
private[server] val WaitingForReplicationPartitionsMetricName = "WaitingForReplicationPartitions"
private[server] val SendingToControllerPartitionsMetricName = "SendingToControllerPartitions"
private[server] val AwaitingMetadataPartitionsMetricName = "AwaitingMetadataPartitions"

// Oldest-age-per-state gauges. Reads 0 when no partition is currently in the corresponding state.
private[server] val OldestWaitingForReplicationAgeMsMetricName = "ClassicToDisklessMigrationOldestWaitingForReplicationAgeMs"
private[server] val OldestSendingToControllerAgeMsMetricName = "ClassicToDisklessMigrationOldestSendingToControllerAgeMs"
private[server] val OldestAwaitingMetadataAgeMsMetricName = "ClassicToDisklessMigrationOldestAwaitingMetadataAgeMs"
private[server] val OldestWaitingForReplicationAgeMsMetricName = "OldestWaitingForReplicationAgeMs"
private[server] val OldestSendingToControllerAgeMsMetricName = "OldestSendingToControllerAgeMs"
private[server] val OldestAwaitingMetadataAgeMsMetricName = "OldestAwaitingMetadataAgeMs"

private[server] val MigrationsCompletedPerSecMetricName = "ClassicToDisklessMigrationsCompletedPerSec"
private[server] val MigrationsFailedPerSecMetricName = "ClassicToDisklessMigrationsFailedPerSec"
private[server] val MigrationsRetriedPerSecMetricName = "ClassicToDisklessMigrationsRetriedPerSec"
private[server] val InitCompletedPerSecMetricName = "InitCompletedPerSec"
private[server] val InitFailedPerSecMetricName = "InitFailedPerSec"
private[server] val InitRetriedPerSecMetricName = "InitRetriedPerSec"

private[server] val GaugeMetricNames = Set(
MigrationsInFlightMetricName,
WaitingForReplicationCountMetricName,
SendingToControllerCountMetricName,
AwaitingMetadataCountMetricName,
InFlightPartitionsMetricName,
WaitingForReplicationPartitionsMetricName,
SendingToControllerPartitionsMetricName,
AwaitingMetadataPartitionsMetricName,
OldestWaitingForReplicationAgeMsMetricName,
OldestSendingToControllerAgeMsMetricName,
OldestAwaitingMetadataAgeMsMetricName,
)

private[server] val MeterMetricNames = Set(
MigrationsCompletedPerSecMetricName,
MigrationsFailedPerSecMetricName,
MigrationsRetriedPerSecMetricName,
InitCompletedPerSecMetricName,
InitFailedPerSecMetricName,
InitRetriedPerSecMetricName,
)

private[server] val MetricNames: Set[String] = GaugeMetricNames union MeterMetricNames
Expand All @@ -302,17 +302,17 @@ object InitDisklessLogManager {
classOf[AwaitingMetadata] -> new ConcurrentHashMap[TopicPartition, java.lang.Long]()
)

metricsGroup.newGauge(MigrationsInFlightMetricName, () => trackedSize())
metricsGroup.newGauge(WaitingForReplicationCountMetricName, () => counters(classOf[WaitingForReplication]).get)
metricsGroup.newGauge(SendingToControllerCountMetricName, () => counters(classOf[SendingToController]).get)
metricsGroup.newGauge(AwaitingMetadataCountMetricName, () => counters(classOf[AwaitingMetadata]).get)
metricsGroup.newGauge(InFlightPartitionsMetricName, () => trackedSize())
metricsGroup.newGauge(WaitingForReplicationPartitionsMetricName, () => counters(classOf[WaitingForReplication]).get)
metricsGroup.newGauge(SendingToControllerPartitionsMetricName, () => counters(classOf[SendingToController]).get)
metricsGroup.newGauge(AwaitingMetadataPartitionsMetricName, () => counters(classOf[AwaitingMetadata]).get)
metricsGroup.newGauge(OldestWaitingForReplicationAgeMsMetricName, () => oldestAgeMs(classOf[WaitingForReplication]))
metricsGroup.newGauge(OldestSendingToControllerAgeMsMetricName, () => oldestAgeMs(classOf[SendingToController]))
metricsGroup.newGauge(OldestAwaitingMetadataAgeMsMetricName, () => oldestAgeMs(classOf[AwaitingMetadata]))

private val completedMeter = metricsGroup.newMeter(MigrationsCompletedPerSecMetricName, "migrations", TimeUnit.SECONDS)
private val failedMeter = metricsGroup.newMeter(MigrationsFailedPerSecMetricName, "migrations", TimeUnit.SECONDS)
private val retriedMeter = metricsGroup.newMeter(MigrationsRetriedPerSecMetricName, "retries", TimeUnit.SECONDS)
private val completedMeter = metricsGroup.newMeter(InitCompletedPerSecMetricName, "inits", TimeUnit.SECONDS)
private val failedMeter = metricsGroup.newMeter(InitFailedPerSecMetricName, "inits", TimeUnit.SECONDS)
private val retriedMeter = metricsGroup.newMeter(InitRetriedPerSecMetricName, "inits", TimeUnit.SECONDS)

def markCompleted(): Unit = completedMeter.mark()
def markFailed(): Unit = failedMeter.mark()
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class ReplicaFetcherThread(name: String,
// Visible for testing
private[server] val partitionsWithNewHighWatermark = mutable.Buffer[TopicPartition]()

// Partitions that have caught up to a fully-migrated diskless topic's classicToDisklessStartOffset
// Partitions that have caught up to a fully-switched diskless topic's classicToDisklessStartOffset
// and should be evicted from this fetcher.
private[server] val partitionsToEvictAfterDisklessMigration = mutable.Buffer[TopicPartition]()
private[server] val partitionsToEvictAfterDisklessSwitch = mutable.Buffer[TopicPartition]()

override protected def latestEpoch(topicPartition: TopicPartition): Optional[Integer] = {
replicaMgr.localLogOrException(topicPartition).latestEpoch
Expand Down Expand Up @@ -99,7 +99,7 @@ class ReplicaFetcherThread(name: String,
override def doWork(): Unit = {
super.doWork()
completeDelayedFetchRequests()
evictFullyMigratedDisklessPartitions()
evictFullySwitchedDisklessPartitions()
}

// process fetched data
Expand Down Expand Up @@ -153,12 +153,12 @@ class ReplicaFetcherThread(name: String,

brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)

// Stop fetching after the migration from classic to diskless is completed: once the controller
// Stop fetching after the switch from classic to diskless is completed: once the controller
// has committed a classicToDisklessStartOffset for this partition AND our local LEO has reached it,
// the follower is fully caught up to the leader's frozen classic log and must not keep fetching.
val classicToDisklessStartOffset = replicaMgr.inklessMetadataView().getClassicToDisklessStartOffset(topicPartition)
if (classicToDisklessStartOffset >= 0 && log.logEndOffset >= classicToDisklessStartOffset) {
partitionsToEvictAfterDisklessMigration += topicPartition
partitionsToEvictAfterDisklessSwitch += topicPartition
}

logAppendInfo
Expand All @@ -171,12 +171,12 @@ class ReplicaFetcherThread(name: String,
}
}

private def evictFullyMigratedDisklessPartitions(): Unit = {
if (partitionsToEvictAfterDisklessMigration.nonEmpty) {
val toEvict = partitionsToEvictAfterDisklessMigration.toSet
partitionsToEvictAfterDisklessMigration.clear()
private def evictFullySwitchedDisklessPartitions(): Unit = {
if (partitionsToEvictAfterDisklessSwitch.nonEmpty) {
val toEvict = partitionsToEvictAfterDisklessSwitch.toSet
partitionsToEvictAfterDisklessSwitch.clear()
info(s"Evicting partitions from this replica fetcher because they have completed the " +
s"classic-to-diskless migration and the local log has caught up to the seal offset: $toEvict")
s"classic-to-diskless switch and the local log has caught up to the seal offset: $toEvict")
replicaMgr.replicaFetcherManager.removeFetcherForPartitions(toEvict)
}
}
Expand Down
Loading
Loading