Skip to content

feat(inkless): Init diskless log on Control Plane#563

Merged
viktorsomogyi merged 5 commits into
mainfrom
giuseppelillo/init-diskless-log-state
Apr 14, 2026
Merged

feat(inkless): Init diskless log on Control Plane#563
viktorsomogyi merged 5 commits into
mainfrom
giuseppelillo/init-diskless-log-state

Conversation

@giuseppelillo

Copy link
Copy Markdown
Contributor

Complete the InitDisklessLogManager orchestration with the final transition from AwaitingMetadata to Done.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR completes the classic-to-diskless init flow by triggering a control-plane initDisklessLog call once the committed metadata (diskless start offset + producer states) has been applied, finalizing the AwaitingMetadata portion of the orchestration.

Changes:

  • Wire ControlPlane into InitDisklessLogManager and add an AwaitingMetadata batch queue/protocol to initialize diskless logs on the control plane after metadata is committed.
  • Invoke the new control-plane init path from ReplicaManager.applyDelta when diskless fields appear in a PartitionRegistration.
  • Add/extend unit and cluster-level tests covering the sealing → controller init → metadata commit → control-plane init end-to-end path.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
core/src/main/scala/kafka/server/BrokerServer.scala Passes ControlPlane into InitDisklessLogManager during broker startup.
core/src/main/scala/kafka/server/InitDisklessLogBatchQueue.scala Adds an AwaitingMetadataBatchQueue for control-plane calls.
core/src/main/scala/kafka/server/InitDisklessLogManager.scala Adds initOnControlPlane and enqueues AwaitingMetadata states for control-plane init.
core/src/main/scala/kafka/server/InitDisklessLogState.scala Extends AwaitingMetadata with payload + adds Done and a control-plane batch protocol.
core/src/main/scala/kafka/server/ReplicaManager.scala Detects committed diskless fields in metadata deltas and triggers control-plane init.
core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala Adds unit tests for control-plane init success/error/idempotency behavior.
core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala Adds metadata-flow tests verifying control-plane init is triggered after PCR diskless fields commit.
core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java Adds a Testcontainers-based cluster test for classic → diskless config migration while producing/consuming.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/InitDisklessLogState.scala Outdated
Comment thread core/src/main/scala/kafka/server/InitDisklessLogState.scala Outdated
Comment thread core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Comment thread core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java Outdated
Comment thread core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala Outdated
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/init-diskless-log-state branch 5 times, most recently from 7824c42 to 630cdab Compare April 8, 2026 14:34
Base automatically changed from giuseppelillo/refactor-init-diskless-log-manager to main April 8, 2026 14:59
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/init-diskless-log-state branch 2 times, most recently from 7064e97 to 447e9ed Compare April 9, 2026 08:34
Complete the InitDisklessLogManager orchestration with the final
transition from AwaitingMetadata to Done.
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/init-diskless-log-state branch from 447e9ed to 7b16a9b Compare April 9, 2026 09:21
@giuseppelillo giuseppelillo marked this pull request as ready for review April 9, 2026 13:12
Comment on lines -240 to -255
if (maxAttemptNumber <= 0) {
retryPeriodMs
} else {
// First retry waits retryPeriodMs, then doubles exponentially up to maxRetryTimeMs.
var delay = retryPeriodMs
var exponent = maxAttemptNumber - 1
while (exponent > 0 && delay < maxRetryTimeMs) {
if (delay >= (maxRetryTimeMs + 1L) / 2L) {
delay = maxRetryTimeMs
} else {
delay = delay * 2L
}
exponent -= 1
}
Math.min(delay, maxRetryTimeMs)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use ExponentialBackoff instead.

Comment on lines -100 to -115
val preservedAttemptNumber = Option(queuedByTp.get(tp)).map(_.attemptNumber).getOrElse(0)
// Keep retry progression for already queued partitions, but always refresh the state payload.
queuedByTp.put(tp, Attempt(state, attemptNumber = preservedAttemptNumber))
val currentPromise = resultPromiseByTp.computeIfAbsent(tp, _ => Promise[Boolean]())

taskStatus match {
case NoTask =>
// Schedule a new run, allowing linger for additional ready partitions.
scheduleNewTask(lingerMs)
case TaskScheduled(scheduledTask) =>
// Re-schedule using linger so newly ready partitions can batch together.
scheduledTask.cancel(false)
scheduleNewTask(lingerMs)
case TaskRunning =>
// A task is in-flight. Newly queued work is already in queuedByTp
// and will be picked up by finishTask.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little refactor to allow to handle duplicate requests not yet sent.

@jeqo jeqo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, only a few minor improvements suggested.

Comment thread core/src/main/scala/kafka/server/InitDisklessLogState.scala Outdated
Comment thread core/src/main/scala/kafka/server/InitDisklessLogState.scala
Comment thread core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala Outdated
jeqo
jeqo previously approved these changes Apr 10, 2026
Option(resultPromiseByTp.remove(tp)).foreach(_.trySuccess(false))
def remove(tp: TopicPartition): Unit = {
withQueueLock { queuedByTp.remove(tp) }
completeAndRemovePromise(tp, accepted = false)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you move it to outside the queueLock? It seems like resultPromiseByTp in completeAndRemovePromise is used without the lock, while in enqueue() it's in the lock. That way a promise can be added legitimately but another thread may remove it before it was supposed to be removed. Is this intended?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this was the result of a refactor using a ConcurrentHashMap, but the race condition is still there. I've changed it now so the removal is done within the lock, while the promise completion can be done outside.

Comment thread core/src/main/scala/kafka/server/InitDisklessLogBatchQueue.scala Outdated

@jeqo jeqo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. deferring to @viktorsomogyi for a final look and merge

@viktorsomogyi viktorsomogyi merged commit e1bb414 into main Apr 14, 2026
5 checks passed
@viktorsomogyi viktorsomogyi deleted the giuseppelillo/init-diskless-log-state branch April 14, 2026 14:08
jeqo pushed a commit that referenced this pull request Apr 14, 2026
Complete the InitDisklessLogManager orchestration with the final
transition from AwaitingMetadata to Done.
jeqo pushed a commit that referenced this pull request Apr 14, 2026
Complete the InitDisklessLogManager orchestration with the final
transition from AwaitingMetadata to Done.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants