Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,46 @@
package io.aiven.inkless.consolidation

import io.aiven.inkless.control_plane.{ControlPlane, PruneDisklessLogsError, PruneDisklessLogsRequest}
import kafka.cluster.Partition
import kafka.server.ReplicaManager
import kafka.server.metadata.InklessMetadataView
import kafka.utils.Logging
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.metadata.PartitionRegistration

import scala.jdk.CollectionConverters.{CollectionHasAsScala, SeqHasAsJava}

class ConsolidatedDisklessLogPruner(replicaManager: ReplicaManager,
inklessMetadataView: InklessMetadataView,
controlPlane: ControlPlane) extends Runnable with Logging {

/**
* This method is repeatedly invoked until the thread shuts down or this method throws an exception
*/
override def run(): Unit = {
val disklessTopicIdPartitions = inklessMetadataView.getConsolidatingDisklessTopicPartitions.asScala
val eitherErrorOrLog = disklessTopicIdPartitions
.map(tip => replicaManager.getPartitionOrError(tip.topicPartition))
.partition(either => either.isLeft)
eitherErrorOrLog._1
.flatMap {
case Left(error) => Some(error)
case _ => None
}
// Read the classic-to-diskless start offset once per partition and thread it through, so the
// eligibility check and the per-partition prune decision always see the same value (no TOCTOU
// between dropping SWITCH_PENDING and computing the safe prune offset).
val eligiblePartitionsWithSeal = inklessMetadataView.getConsolidatingDisklessTopicPartitions.asScala
.map(tip => (tip, inklessMetadataView.getClassicToDisklessStartOffset(tip.topicPartition)))
.filter { case (_, seal) => seal != PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING }
.map { case (tip, seal) => (replicaManager.getPartitionOrError(tip.topicPartition), seal) }
eligiblePartitionsWithSeal
.collect { case (Left(error), _) => error }
.foreach(error => logger.warn("Got error during pruning consolidated diskless logs: {}", error.message))
val requests = eitherErrorOrLog._2
.flatMap {
case Right(partition) =>
partition.topicId.flatMap { topicId =>
partition.log.flatMap { log =>
val highestRemoteOffset = log.highestOffsetInRemoteStorage
if (highestRemoteOffset < 0) {
None
} else {
val requests = eligiblePartitionsWithSeal
.collect { case (Right(partition), seal) => (partition, seal) }
.flatMap { case (partition, seal) =>
partition.topicId.flatMap { topicId =>
partition.log.flatMap { log =>
val highestRemoteOffset = log.highestOffsetInRemoteStorage
if (highestRemoteOffset < 0) {
None
} else {
safePruneOffset(partition, seal, highestRemoteOffset).map { safeHighestRemoteOffset =>
val topicIdPartition = new TopicIdPartition(topicId, partition.topicPartition)
Some(new PruneDisklessLogsRequest(topicIdPartition, highestRemoteOffset))
new PruneDisklessLogsRequest(topicIdPartition, safeHighestRemoteOffset)
}
}
}
case _ => None
}
}.toSeq.asJava
if (!requests.isEmpty) {
controlPlane.pruneDisklessLogs(requests).asScala.foreach { pruneDisklessLogsResponse =>
Expand All @@ -70,7 +70,7 @@ class ConsolidatedDisklessLogPruner(replicaManager: ReplicaManager,
replicaManager.getPartitionOrError(pruneDisklessLogsResponse.topicIdPartition.topicPartition) match {
case Right(partition) =>
val newDisklessLogStart = pruneDisklessLogsResponse.disklessLogStartOffset
partition.maybeAdvanceLastAppliedDisklessLogStartOffset(newDisklessLogStart)
partition.maybeAdvanceConsolidationPruneFloor(newDisklessLogStart)
case Left(error) => logger.warn("Couldn't update diskless start offset for {} due to: {}",
pruneDisklessLogsResponse.topicIdPartition.topicPartition,
error.message
Expand All @@ -80,4 +80,18 @@ class ConsolidatedDisklessLogPruner(replicaManager: ReplicaManager,
}
}
}

private def safePruneOffset(partition: Partition, seal: Long, highestRemoteOffset: Long): Option[Long] = {
seal match {
case PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET =>
Some(highestRemoteOffset)
case classicToDisklessStartOffset if classicToDisklessStartOffset >= 0 =>
partition.getSafeConsolidatedDisklessPruneOffset(highestRemoteOffset)
case unexpected =>
logger.warn("Skipping pruning for {} due to unexpected classic-to-diskless start offset {}",
partition.topicPartition,
unexpected)
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class ConsolidationFetcherThread(name: String,
}
}

override protected def shouldEvictFullySwitchedDisklessPartitions: Boolean = false

override def processPartitionData(
topicPartition: TopicPartition,
fetchOffset: Long,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Inkless
* Copyright (C) 2024 - 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package io.aiven.inkless.consolidation

import kafka.cluster.Partition
import kafka.server.{InitialFetchState, ReplicaManager}
import kafka.server.metadata.InklessMetadataView
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.storage.internals.log.UnifiedLog

import scala.collection.{Set, mutable}
import scala.jdk.OptionConverters.RichOptional

/**
* Outcome of per-partition reconciliation before consolidation can start.
* - Ready: the partition is safe to hand to the consolidation fetcher at the given offset.
* - Retry: the partition cannot consolidate yet (e.g., pending seal, LEO below seal) — skip
* this round; a later metadata delta or classic-fetcher catch-up will re-trigger.
* - Failed: an unrecoverable error; mark the partition as failed in the fetcher manager.
*/
private sealed trait ConsolidationStartState
Comment thread
viktorsomogyi marked this conversation as resolved.
private object ConsolidationStartState {
final case class Ready(offset: Long) extends ConsolidationStartState
final case class Retry(reason: String) extends ConsolidationStartState
final case class Failed(reason: Throwable) extends ConsolidationStartState
}

private class ReconciliationException(message: String) extends RuntimeException(message)

class ConsolidationReconciler(replicaManager: ReplicaManager,
stateChangeLogger: StateChangeLogger,
consolidationMetrics: ConsolidationMetrics,
inklessMetadataView: InklessMetadataView,
initialFetchOffset: UnifiedLog => Long,
consolidationFetcherManager: ConsolidationFetcherManager) {

/**
* Starts the consolidation fetchers for the given partitions in the parameter. These partitions
* must be ready for consolidation in order to be started (meaning LEO == seal offset).
* If a partition wasn't ready for consolidation because of some error or the LEO for the given
* partition was behind the seal offset, then it will be logged and won't be started.
*
* @param consolidatingPartitions the consolidating partitions to start fetching.
*/
def startConsolidationFetchers(consolidatingPartitions: mutable.HashMap[TopicPartition, Partition]): Unit = {
if (consolidatingPartitions.nonEmpty) {
val consolidatingPartitionAndOffsets: mutable.HashMap[TopicPartition, InitialFetchState] =
initConsolidatingPartitionFetching(consolidatingPartitions)

consolidationFetcherManager.addFetcherForPartitions(consolidatingPartitionAndOffsets)
consolidatingPartitionAndOffsets.keys.foreach(tp => consolidationMetrics.registerPartition(tp))
}
}

def startConsolidationFetchersForCaughtUpClassicPartitions(topicPartitions: Set[TopicPartition]): Unit = {
val consolidatingDisklessPartitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
topicPartitions.foreach { tp =>
if (inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)) {
replicaManager.onlinePartition(tp).foreach(partition => consolidatingDisklessPartitionsToStartFetching.put(tp, partition))
}
}
startConsolidationFetchers(consolidatingDisklessPartitionsToStartFetching)
}

def initConsolidatingPartitionFetching(consolidatingDisklessPartitionsToStartFetching: mutable.HashMap[TopicPartition, Partition]
): mutable.HashMap[TopicPartition, InitialFetchState] = {
val consolidatingPartitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState]

consolidatingDisklessPartitionsToStartFetching.foreachEntry { (topicPartition, partition) =>
val log = partition.localLogOrException
reconcileSwitchedConsolidatingDisklessPartition(partition) match {
case ConsolidationStartState.Ready(offset) =>
consolidatingPartitionAndOffsets.put(topicPartition, InitialFetchState(
log.topicId.toScala,
new BrokerEndPoint(-1, "diskless", -1),
partition.getLeaderEpoch,
offset
))
case ConsolidationStartState.Retry(reason) =>
stateChangeLogger.info(reason)
case ConsolidationStartState.Failed(reason) =>
stateChangeLogger.error("Error happened during consolidating log reconciliation before initial fetch from diskless control plane", reason)
consolidationFetcherManager.addFailedPartition(topicPartition)
}
}
consolidatingPartitionAndOffsets
}

private def reconcileSwitchedConsolidatingDisklessPartition(partition: Partition): ConsolidationStartState = {
val tp = partition.topicPartition
inklessMetadataView.getClassicToDisklessStartOffset(tp) match {
case PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET =>
// Born-diskless/born-consolidated topics don't have a classic seal boundary to reconcile.
ConsolidationStartState.Ready(initialFetchOffset(partition.localLogOrException))
case PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING =>
ConsolidationStartState.Retry(s"Skipping consolidation for $tp because classic-to-diskless migration is still pending")
case seal if seal >= 0 =>
val log = partition.localLogOrException
if (log.logEndOffset < seal) {
// The classic prefix hasn't been fully replicated locally yet; a classic catch-up
// fetcher must finish bringing the local log up to the seal before consolidation
// can take over.
ConsolidationStartState.Retry(
s"Skipping consolidation for $tp because local LEO ${log.logEndOffset} is below " +
s"classic-to-diskless start offset $seal")
} else {
// LEO >= seal. This covers both the initial switch (LEO == seal, nothing consolidated
// yet) and resuming an already-progressed partition after a restart, leadership
// failover, or reassignment (the local log either kept its consolidated frontier or
// was rehydrated from tiered storage). In every case we resume from the current local
// LEO so we never re-consolidate or skip data the local log already holds.
//
// The prune floor is the higher of the seal and the current log start offset:
// - at first switch logStartOffset is still the classic prefix start, so the floor is
// the seal, which blocks pruning the diskless region until consolidation has tiered
// past the boundary;
// - on resume logStartOffset has advanced past the seal as consolidated segments were
// tiered and deleted, so it reflects real pruning progress.
val pruneFloor = math.max(seal, log.logStartOffset)
partition.ensureConsolidationPruneFloorAtLeast(pruneFloor)
ConsolidationStartState.Ready(log.logEndOffset)
Comment on lines +126 to +140

@viktorsomogyi viktorsomogyi May 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is valid, but would require us to track epoch possibly or persist the consolidation frontier. Will defer to a follow-up PR.

}
case unexpected =>
ConsolidationStartState.Failed(new ReconciliationException(s"Skipping consolidation for $tp due to unexpected classic-to-diskless start offset: $unexpected"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ class DisklessLeaderEndPoint(
val job = fetchOffsetHandler.createJob()
val futures = mutable.Map.empty[TopicPartition, CompletableFuture[FileRecordsOrError]]

// TODO: POD-2419, offsets for leader epoch needs to be implemented for proper truncation
partitions.forEach { (tp, epochData) =>
if (epochData.leaderEpoch != OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH && job.mustHandle(tp.topic)) {
val partitionRequest = new ListOffsetsPartition()
Expand Down
56 changes: 45 additions & 11 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,14 @@ class Partition(val topicPartition: TopicPartition,
}
}

// Broker-local watermark for the latest diskless log start offset applied from pruning responses.
// The control plane owns the persisted log start offset; this value only catches stale or
// out-of-order pruner responses before they can regress local partition state.
@volatile private var lastAppliedDisklessLogStartOffset: Long = PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET
// Monotonic prune watermark for a consolidating diskless partition. It plays two roles:
// - gate: the consolidated diskless region must not be pruned until tiered (remote) storage has
// caught up to this offset, so pruning is only allowed once highestRemoteOffset >= floor;
// - progress tracker: it advances to the latest applied diskless log start offset as pruning
// proceeds, guarding against stale or out-of-order pruner responses regressing local state.
// None means the partition has no consolidation prune boundary established yet (so it must not be
// pruned on the switched-partition path).
@volatile private var safeConsolidationPruningFloor: Option[Long] = None

this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "

Expand Down Expand Up @@ -264,18 +268,48 @@ class Partition(val topicPartition: TopicPartition,
*/
def isAtMinIsr: Boolean = leaderLogIfLocal.exists { partitionState.isr.size == effectiveMinIsr(_) }

def maybeAdvanceLastAppliedDisklessLogStartOffset(newDisklessLogStartOffset: Long): Unit = {
/**
* Establishes a lower bound on the consolidation prune floor when consolidation (re)starts.
* Keeping an already-higher floor (e.g. the pruner advanced it while logStartOffset still points
* at the classic prefix) is the expected steady state, so this is silent on a no-op.
*/
def ensureConsolidationPruneFloorAtLeast(floor: Long): Unit = {
inWriteLock(leaderIsrUpdateLock) {
if (newDisklessLogStartOffset >= lastAppliedDisklessLogStartOffset) {
lastAppliedDisklessLogStartOffset = newDisklessLogStartOffset
} else {
warn(s"Ignoring stale diskless log start offset for $topicPartition. " +
s"The new value ($newDisklessLogStartOffset) is less than the last locally applied value " +
s"($lastAppliedDisklessLogStartOffset).")
raiseConsolidationPruneFloor(floor)
}
}

/**
* Advances the consolidation prune floor as pruning progresses. A value below the current floor
* indicates a stale or out-of-order pruner response, which is unexpected and surfaced as a warning.
*/
def maybeAdvanceConsolidationPruneFloor(newFloor: Long): Unit = {
inWriteLock(leaderIsrUpdateLock) {
if (!raiseConsolidationPruneFloor(newFloor)) {
warn(s"Ignoring stale consolidation prune floor for $topicPartition. " +
s"The new value ($newFloor) is less than the current floor " +
s"(${safeConsolidationPruningFloor.get}).")
}
}
}

// Raises the floor to `floor` if it does not regress an existing one. Returns whether the floor
// was applied. Callers must hold leaderIsrUpdateLock.
private def raiseConsolidationPruneFloor(floor: Long): Boolean = {
if (safeConsolidationPruningFloor.forall(floor >= _)) {
safeConsolidationPruningFloor = Some(floor)
true
} else {
false
}
}

def getSafeConsolidatedDisklessPruneOffset(highestRemoteOffset: Long): Option[Long] = {
safeConsolidationPruningFloor
.filter(floor => highestRemoteOffset >= floor)
.map(_ => highestRemoteOffset)
}

def isSealed: Boolean = _sealed

/**
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ class ReplicaFetcherThread(name: String,
evictFullySwitchedDisklessPartitions()
}

/**
* Whether the eviction check in processPartitionData should mark fully-switched partitions
* for removal. The classic ReplicaFetcherThread enables this (so it self-evicts at the seal
* and hands off to consolidation). The ConsolidationFetcherThread disables it because it
* intentionally fetches for already-switched partitions.
*/

protected def shouldEvictFullySwitchedDisklessPartitions: Boolean = true
Comment thread
viktorsomogyi marked this conversation as resolved.

// process fetched data
override def processPartitionData(
topicPartition: TopicPartition,
Expand Down Expand Up @@ -157,7 +166,9 @@ class ReplicaFetcherThread(name: String,
// has committed a classicToDisklessStartOffset for this partition AND our local LEO has reached it,
// the follower is fully caught up to the leader's frozen classic log and must not keep fetching.
val classicToDisklessStartOffset = replicaMgr.inklessMetadataView().getClassicToDisklessStartOffset(topicPartition)
if (classicToDisklessStartOffset >= 0 && log.logEndOffset >= classicToDisklessStartOffset) {
if (shouldEvictFullySwitchedDisklessPartitions &&
classicToDisklessStartOffset >= 0 &&
log.logEndOffset >= classicToDisklessStartOffset) {
partitionsToEvictAfterDisklessSwitch += topicPartition
}

Expand All @@ -178,6 +189,7 @@ class ReplicaFetcherThread(name: String,
info(s"Evicting partitions from this replica fetcher because they have completed the " +
s"classic-to-diskless switch and the local log has caught up to the seal offset: $toEvict")
replicaMgr.replicaFetcherManager.removeFetcherForPartitions(toEvict)
replicaMgr.startConsolidationFetchersForCaughtUpClassicPartitions(toEvict)
}
}

Expand Down
Loading
Loading