feat(inkless): Init diskless log on Control Plane#563
Conversation
f505e6a to
d0e3bd5
Compare
There was a problem hiding this comment.
Pull request overview
This PR completes the classic-to-diskless init flow by triggering a control-plane initDisklessLog call once the committed metadata (diskless start offset + producer states) has been applied, finalizing the AwaitingMetadata portion of the orchestration.
Changes:
- Wire
ControlPlaneintoInitDisklessLogManagerand add anAwaitingMetadatabatch queue/protocol to initialize diskless logs on the control plane after metadata is committed. - Invoke the new control-plane init path from
ReplicaManager.applyDeltawhen diskless fields appear in aPartitionRegistration. - Add/extend unit and cluster-level tests covering the sealing → controller init → metadata commit → control-plane init end-to-end path.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| core/src/main/scala/kafka/server/BrokerServer.scala | Passes ControlPlane into InitDisklessLogManager during broker startup. |
| core/src/main/scala/kafka/server/InitDisklessLogBatchQueue.scala | Adds an AwaitingMetadataBatchQueue for control-plane calls. |
| core/src/main/scala/kafka/server/InitDisklessLogManager.scala | Adds initOnControlPlane and enqueues AwaitingMetadata states for control-plane init. |
| core/src/main/scala/kafka/server/InitDisklessLogState.scala | Extends AwaitingMetadata with payload + adds Done and a control-plane batch protocol. |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Detects committed diskless fields in metadata deltas and triggers control-plane init. |
| core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala | Adds unit tests for control-plane init success/error/idempotency behavior. |
| core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala | Adds metadata-flow tests verifying control-plane init is triggered after PCR diskless fields commit. |
| core/src/test/java/kafka/server/InklessTopicTypeSwitcherClusterTest.java | Adds a Testcontainers-based cluster test for classic → diskless config migration while producing/consuming. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
7824c42 to
630cdab
Compare
7064e97 to
447e9ed
Compare
Complete the InitDisklessLogManager orchestration with the final transition from AwaitingMetadata to Done.
447e9ed to
7b16a9b
Compare
| if (maxAttemptNumber <= 0) { | ||
| retryPeriodMs | ||
| } else { | ||
| // First retry waits retryPeriodMs, then doubles exponentially up to maxRetryTimeMs. | ||
| var delay = retryPeriodMs | ||
| var exponent = maxAttemptNumber - 1 | ||
| while (exponent > 0 && delay < maxRetryTimeMs) { | ||
| if (delay >= (maxRetryTimeMs + 1L) / 2L) { | ||
| delay = maxRetryTimeMs | ||
| } else { | ||
| delay = delay * 2L | ||
| } | ||
| exponent -= 1 | ||
| } | ||
| Math.min(delay, maxRetryTimeMs) | ||
| } |
There was a problem hiding this comment.
Refactored to use ExponentialBackoff instead.
| val preservedAttemptNumber = Option(queuedByTp.get(tp)).map(_.attemptNumber).getOrElse(0) | ||
| // Keep retry progression for already queued partitions, but always refresh the state payload. | ||
| queuedByTp.put(tp, Attempt(state, attemptNumber = preservedAttemptNumber)) | ||
| val currentPromise = resultPromiseByTp.computeIfAbsent(tp, _ => Promise[Boolean]()) | ||
|
|
||
| taskStatus match { | ||
| case NoTask => | ||
| // Schedule a new run, allowing linger for additional ready partitions. | ||
| scheduleNewTask(lingerMs) | ||
| case TaskScheduled(scheduledTask) => | ||
| // Re-schedule using linger so newly ready partitions can batch together. | ||
| scheduledTask.cancel(false) | ||
| scheduleNewTask(lingerMs) | ||
| case TaskRunning => | ||
| // A task is in-flight. Newly queued work is already in queuedByTp | ||
| // and will be picked up by finishTask. |
There was a problem hiding this comment.
Little refactor to allow to handle duplicate requests not yet sent.
jeqo
left a comment
There was a problem hiding this comment.
Looks good to me, only a few minor improvements suggested.
| Option(resultPromiseByTp.remove(tp)).foreach(_.trySuccess(false)) | ||
| def remove(tp: TopicPartition): Unit = { | ||
| withQueueLock { queuedByTp.remove(tp) } | ||
| completeAndRemovePromise(tp, accepted = false) |
There was a problem hiding this comment.
why do you move it to outside the queueLock? It seems like resultPromiseByTp in completeAndRemovePromise is used without the lock, while in enqueue() it's in the lock. That way a promise can be added legitimately but another thread may remove it before it was supposed to be removed. Is this intended?
There was a problem hiding this comment.
You're right, this was the result of a refactor using a ConcurrentHashMap, but the race condition is still there. I've changed it now so the removal is done within the lock, while the promise completion can be done outside.
jeqo
left a comment
There was a problem hiding this comment.
LGTM. deferring to @viktorsomogyi for a final look and merge
Complete the InitDisklessLogManager orchestration with the final transition from AwaitingMetadata to Done.
Complete the InitDisklessLogManager orchestration with the final transition from AwaitingMetadata to Done.
Complete the InitDisklessLogManager orchestration with the final transition from
AwaitingMetadatatoDone.