Skip to content

refactor(inkless): Refactor InitDisklessLog flow and add integration tests#549

Merged
jeqo merged 2 commits into
mainfrom
giuseppelillo/refactor-sealing-and-register
Mar 25, 2026
Merged

refactor(inkless): Refactor InitDisklessLog flow and add integration tests#549
jeqo merged 2 commits into
mainfrom
giuseppelillo/refactor-sealing-and-register

Conversation

@giuseppelillo

@giuseppelillo giuseppelillo commented Mar 23, 2026

Copy link
Copy Markdown
Contributor

Moved all the logic to ReplicaManager and added new tests that test the integration among BrokerMetadataPublisher, ReplicaManager and InitDisklessLogManager.

@giuseppelillo giuseppelillo force-pushed the giuseppelillo/refactor-sealing-and-register branch from a9520d7 to 2bdfee6 Compare March 24, 2026 12:08
@giuseppelillo giuseppelillo changed the title MINOR: refactor InitDisklessLog orchestration calls refactor(inkless): Refactor InitDisklessLog flow and add integration tests Mar 24, 2026
Comment on lines -347 to -364
private def sealExistingLeadersOfTopicsMigratedToDiskless(delta: MetadataDelta, newImage: MetadataImage): Unit = {
Option(delta.configsDelta()).foreach { configsDelta =>
configsDelta.changes().forEach { (resource, _) =>
if (resource.`type`() == ConfigResource.Type.TOPIC) {
val topicName = resource.name()
val oldProps = delta.image().configs().configProperties(resource)
val wasDiskless = oldProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
val newProps = newImage.configs().configProperties(resource)
val isDiskless = newProps.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
if (!wasDiskless && isDiskless) {
info(s"Topic $topicName transitioning from classic to diskless, sealing leader partitions")
replicaManager.sealTopicPartitions(topicName)
}
}
}
}
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved all the logic inside ReplicaManager

// SharePartitionManager is receiving the latest changes.
verify(sharePartitionManager).onShareVersionToggle(any(), any())
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved tests to core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala


// TODO: Add more fetch tests combinations, edge cases ara not covered yet.

@Test

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved tests to core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala

@giuseppelillo giuseppelillo force-pushed the giuseppelillo/refactor-sealing-and-register branch 2 times, most recently from 993a474 to 920a867 Compare March 24, 2026 14:54
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/refactor-sealing-and-register branch from 920a867 to 9c43f8d Compare March 24, 2026 15:35
@giuseppelillo giuseppelillo marked this pull request as ready for review March 24, 2026 15:36
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala Outdated
@jeqo jeqo merged commit 05f6a26 into main Mar 25, 2026
6 checks passed
@jeqo jeqo deleted the giuseppelillo/refactor-sealing-and-register branch March 25, 2026 12:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants