Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,10 @@ class BrokerServer(
retryTimeoutMs = 60000
)
initDisklessLogChannelManager.start()
maybeInitDisklessLogManager = sharedServer.inklessControlPlane.map { _ =>
maybeInitDisklessLogManager = sharedServer.inklessControlPlane.map { controlPlane =>
new InitDisklessLogManager(
controllerChannelManager = initDisklessLogChannelManager,
controlPlane = controlPlane,
scheduler = kafkaScheduler,
brokerId = config.brokerId,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch
Expand Down
121 changes: 80 additions & 41 deletions core/src/main/scala/kafka/server/InitDisklessLogBatchQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package kafka.server

import kafka.server.InitDisklessLogBatchQueue.ParsedResponse
import io.aiven.inkless.control_plane.ControlPlane
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.InitDisklessLogResponseData
import org.apache.kafka.common.requests.{InitDisklessLogRequest, InitDisklessLogResponse}
import org.apache.kafka.common.utils.ExponentialBackoff
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.util.Scheduler

Expand Down Expand Up @@ -65,7 +67,8 @@ abstract class RetriableInitDisklessLogBatchQueue[S <: InitDisklessLogState](

private case class Attempt(state: S, attemptNumber: Int)
private var queuedByTp = new java.util.LinkedHashMap[TopicPartition, Attempt]()
private val resultPromiseByTp = new java.util.HashMap[TopicPartition, Promise[Boolean]]()
private val resultPromiseByTp = new java.util.concurrent.ConcurrentHashMap[TopicPartition, Promise[Boolean]]()
private val retryBackoff = new ExponentialBackoff(retryPeriodMs, 2, maxRetryTimeMs, 0.0)

private sealed trait TaskStatus
private case object NoTask extends TaskStatus
Expand Down Expand Up @@ -97,32 +100,25 @@ abstract class RetriableInitDisklessLogBatchQueue[S <: InitDisklessLogState](

def enqueue(tp: TopicPartition, state: S): Future[Boolean] = {
val promise = withQueueLock {
val preservedAttemptNumber = Option(queuedByTp.get(tp)).map(_.attemptNumber).getOrElse(0)
// Keep retry progression for already queued partitions, but always refresh the state payload.
queuedByTp.put(tp, Attempt(state, attemptNumber = preservedAttemptNumber))
val currentPromise = resultPromiseByTp.computeIfAbsent(tp, _ => Promise[Boolean]())

taskStatus match {
case NoTask =>
// Schedule a new run, allowing linger for additional ready partitions.
scheduleNewTask(lingerMs)
case TaskScheduled(scheduledTask) =>
// Re-schedule using linger so newly ready partitions can batch together.
scheduledTask.cancel(false)
scheduleNewTask(lingerMs)
case TaskRunning =>
// A task is in-flight. Newly queued work is already in queuedByTp
// and will be picked up by finishTask.
Comment on lines -100 to -115

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little refactor to allow to handle duplicate requests not yet sent.

val existing = Option(queuedByTp.get(tp))
val isDuplicate = existing.exists(_.state == state)
if (!isDuplicate) {
val attemptNum = existing.map(_.attemptNumber).getOrElse(0)
queuedByTp.put(tp, Attempt(state, attemptNumber = attemptNum))
}
maybeScheduleOrReschedule(lingerMs)
currentPromise
}

promise.future
}

def remove(tp: TopicPartition): Unit = withQueueLock {
queuedByTp.remove(tp)
Option(resultPromiseByTp.remove(tp)).foreach(_.trySuccess(false))
def remove(tp: TopicPartition): Unit = {
val promise = withQueueLock {
queuedByTp.remove(tp)
Option(resultPromiseByTp.remove(tp))
}
promise.foreach(_.trySuccess(false))
}

private def task(): Unit = {
Expand Down Expand Up @@ -207,12 +203,22 @@ abstract class RetriableInitDisklessLogBatchQueue[S <: InitDisklessLogState](
}

private def finishTask(): Unit = withQueueLock {
if (queuedByTp.isEmpty) {
taskStatus = NoTask
} else {
taskStatus = NoTask
if (!queuedByTp.isEmpty) {
val hasFreshAttempts = queuedByTp.values().asScala.exists(_.attemptNumber == 0)
val delayMs = if (hasFreshAttempts) lingerMs else computeRetryDelayMs()
scheduleNewTask(delayMs)
maybeScheduleOrReschedule(delayMs)
}
}

private def maybeScheduleOrReschedule(delayMs: Long): Unit = {
taskStatus match {
case NoTask =>
scheduleNewTask(delayMs)
case TaskScheduled(scheduledTask) =>
scheduledTask.cancel(false)
scheduleNewTask(delayMs)
case TaskRunning =>
}
}

Expand All @@ -232,27 +238,16 @@ abstract class RetriableInitDisklessLogBatchQueue[S <: InitDisklessLogState](
}

private def completeAndRemovePromise(tp: TopicPartition, accepted: Boolean): Unit = withQueueLock {
Option(resultPromiseByTp.remove(tp)).foreach(_.trySuccess(accepted))
if (!queuedByTp.containsKey(tp)) {
Option(resultPromiseByTp.remove(tp)).foreach(_.trySuccess(accepted))
}
}

private def computeRetryDelayMs(): Long = {
val maxAttemptNumber = queuedByTp.values().asScala.map(_.attemptNumber).maxOption.getOrElse(0)
if (maxAttemptNumber <= 0) {
retryPeriodMs
} else {
// First retry waits retryPeriodMs, then doubles exponentially up to maxRetryTimeMs.
var delay = retryPeriodMs
var exponent = maxAttemptNumber - 1
while (exponent > 0 && delay < maxRetryTimeMs) {
if (delay >= (maxRetryTimeMs + 1L) / 2L) {
delay = maxRetryTimeMs
} else {
delay = delay * 2L
}
exponent -= 1
}
Math.min(delay, maxRetryTimeMs)
}
Comment on lines -240 to -255

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use ExponentialBackoff instead.

// First retry waits retryPeriodMs, then doubles exponentially up to maxRetryTimeMs.
val attempts = Math.max(maxAttemptNumber - 1, 0)
retryBackoff.backoff(attempts)
}

}
Expand Down Expand Up @@ -317,3 +312,47 @@ class SendingToControllerBatchQueue(
}
}
}

class AwaitingMetadataBatchQueue(
controlPlane: ControlPlane,
scheduler: Scheduler,
brokerId: Int,
brokerEpochSupplier: () => Long,
lingerMs: Long,
retryPeriodMs: Long,
maxRetryTimeMs: Long
) extends RetriableInitDisklessLogBatchQueue[AwaitingMetadata](
scheduler = scheduler,
brokerId = brokerId,
brokerEpochSupplier = brokerEpochSupplier,
lingerMs = lingerMs,
retryPeriodMs = retryPeriodMs,
maxRetryTimeMs = maxRetryTimeMs
) {
logIdent = s"[AwaitingMetadataBatchQueue] "

override protected def shouldSend(state: AwaitingMetadata): Boolean = {
if (state.metadataPayload.isDefined) {
true
} else {
state.warn(s"Skipping InitDisklessLog control-plane request because metadata payload is missing for ${state.tp}")
false
}
}

override protected def sendBatch(
states: Iterable[AwaitingMetadata],
brokerId: Int,
brokerEpoch: Long,
onBatchComplete: Either[String, Iterable[ParsedResponse]] => Unit
): Unit = {
AwaitingMetadata.sendBatch(
states = states,
destination = controlPlane,
brokerId = brokerId,
brokerEpoch = brokerEpoch,
onBatchComplete = onBatchComplete
)
}
}

60 changes: 58 additions & 2 deletions core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.server

import io.aiven.inkless.control_plane.{ControlPlane, InitDisklessLogProducerState => CpProducerState}
import kafka.cluster.Partition
import kafka.utils.Logging
import org.apache.kafka.common.{TopicPartition, Uuid}
Expand All @@ -28,6 +29,7 @@ import scala.jdk.CollectionConverters._

class InitDisklessLogManager(
controllerChannelManager: NodeToControllerChannelManager,
controlPlane: ControlPlane,
scheduler: Scheduler,
brokerId: Int,
brokerEpochSupplier: () => Long
Expand All @@ -53,13 +55,49 @@ class InitDisklessLogManager(
retryPeriodMs = retryPeriodMs,
maxRetryTimeMs = maxRetryTimeMs
)
private val awaitingMetadataQueue = new AwaitingMetadataBatchQueue(
controlPlane = controlPlane,
scheduler = scheduler,
brokerId = brokerId,
brokerEpochSupplier = brokerEpochSupplier,
lingerMs = lingerMs,
retryPeriodMs = retryPeriodMs,
maxRetryTimeMs = maxRetryTimeMs
)

private[server] def getTrackedPartitions: Set[TopicPartition] = tracked.keySet().asScala.toSet

private[server] def getInitState(tp: TopicPartition): Option[InitDisklessLogState] = Option(tracked.get(tp))

/**
* Register a sealed partition for migration. If HW already
* Handles already-applied diskless init metadata for a partition.
* This keeps/moves the partition to AwaitingMetadata and triggers a prompt
* control-plane init send, since metadata is committed and visible.
*/
def initOnControlPlane(
partition: Partition,
topicId: Uuid,
topicName: String,
disklessStartOffset: Long,
producerStates: java.util.List[CpProducerState]
): Unit = {
if (disklessStartOffset < 0) {
warn(s"Received negative disklessStartOffset ($disklessStartOffset) for $topicName:${partition.topicPartition}, skipping control-plane init")
return
}

val tp = partition.topicPartition
val payload = DisklessInitMetadata(topicName, disklessStartOffset, producerStates)
val newState = AwaitingMetadata(partition, topicId, Some(payload))
if (tracked.putIfAbsent(tp, newState) != null) {
tracked.computeIfPresent(tp, (_, _) => newState)
}
enqueueAwaitingMetadata(tp, newState)
}

/**
* Register a sealed partition for migration. Registers this manager as a
* PartitionListener to receive HW advancement notifications. If HW already
* equals LEO, immediately marks the partition ready and schedules a batch
* send. Otherwise, waits for HW advancement notifications.
*/
Expand All @@ -81,6 +119,7 @@ class InitDisklessLogManager(
enqueueSendingToController(tp, sendingToController)
sendingToController
case awaitingMetadata: AwaitingMetadata => awaitingMetadata
case done: Done => done
case _: Failed => null
})

Expand Down Expand Up @@ -120,13 +159,30 @@ class InitDisklessLogManager(
}(ExecutionContext.parasitic)
}

private def enqueueAwaitingMetadata(tp: TopicPartition, state: AwaitingMetadata): Unit = {
awaitingMetadataQueue.enqueue(tp, state).foreach { accepted =>
if (accepted) {
tracked.computeIfPresent(tp, (_, initDisklessLogState) =>
initDisklessLogState match {
case awaitingMetadata: AwaitingMetadata => awaitingMetadata.onSuccess
case other => other
}
)
tracked.remove(tp)
} else {
tracked.remove(tp)
}
Comment thread
giuseppelillo marked this conversation as resolved.
}(ExecutionContext.parasitic)
}

/**
* Remove a partition from tracking (e.g., when leadership is lost).
*/
def removePartition(tp: TopicPartition): Unit = {
if (tracked.remove(tp) != null) {
sendingToControllerQueue.remove(tp)
info(s"Removed partition $tp from diskless init log tracking")
awaitingMetadataQueue.remove(tp)
info(s"Removed partition $tp from diskless init tracking")
}
}
}
Loading
Loading