Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,12 @@ private void publishDelta(MetadataDelta delta, MetadataImage newImage) {
changes.handleDeletedTopic(prevTopic, isDisklessTopic(prevImage.configs(), prevTopic.name()));
}
for (Entry<Uuid, TopicDelta> entry : delta.topicsDelta().changedTopics().entrySet()) {
// Use prevImage configs intentionally: diskless.enable transitions
// are handled by the configsDelta loop below, not here.
changes.handleTopicChange(
prevImage.topics().getTopic(entry.getKey()),
entry.getValue(),
isDisklessTopic(newImage.configs(), entry.getValue().name()));
isDisklessTopic(prevImage.configs(), entry.getValue().name()));
}
Comment thread
jeqo marked this conversation as resolved.
}
// Handle diskless.enable config changes on existing topics (no TopicDelta required).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
Expand Down Expand Up @@ -211,4 +214,127 @@ public void testLoadSnapshotWithDisklessTopics() {
assertEquals(3, env.metrics.disklessOfflinePartitionCount());
}
}

@Test
public void testCreateDisklessTopicViaDeltaDoesNotDoubleCount() {
// Regression test: creating a diskless topic produces both a TopicDelta and a ConfigDelta
// in the same metadata batch. The diskless topic count must not be incremented twice.
try (TestEnv env = new TestEnv()) {
Uuid topicId = Uuid.fromString("JKNp6fQaT-icHxh654ok-w");
String topicName = "diskless-topic";

MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
delta.replay(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setReplicas(java.util.List.of(0, 1, 2))
.setIsr(java.util.List.of(0, 1, 2))
.setLeader(0)
.setLeaderEpoch(0)
.setPartitionEpoch(0));
delta.replay(new ConfigRecord()
.setResourceType(ConfigResource.Type.TOPIC.id())
.setResourceName(topicName)
.setName(TopicConfig.DISKLESS_ENABLE_CONFIG)
.setValue("true"));
MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);

env.publisher.onMetadataUpdate(delta, newImage, fakeManifest(false));

assertEquals(1, env.metrics.globalTopicCount(),
"Global topic count should be 1");
assertEquals(1, env.metrics.disklessTopicCount(),
"Diskless topic count should equal global topic count, not be double-counted");
assertEquals(1, env.metrics.globalPartitionCount());
assertEquals(1, env.metrics.disklessPartitionCount(),
"Diskless partition count should equal global partition count");
assertEquals(0, env.metrics.offlinePartitionCount());
assertEquals(0, env.metrics.disklessOfflinePartitionCount());
}
}

@Test
public void testDeleteDisklessTopicViaDelta() {
// First create a diskless topic via snapshot, then delete it via delta.
try (TestEnv env = new TestEnv()) {
Uuid topicId = Uuid.fromString("JKNp6fQaT-icHxh654ok-w");
String topicName = "diskless-topic";
TopicsImage topicsImage = fakeTopicsImage(
fakeTopicImage(topicName, topicId, fakePartitionRegistration(NORMAL)));
ConfigurationsImage configsImage = fakeDisklessConfigsImage(topicsImage);
MetadataImage baseImage = fakeImageFromTopicsImage(topicsImage, configsImage);

// Load snapshot to set baseline
MetadataDelta snapshotDelta = new MetadataDelta(MetadataImage.EMPTY);
ImageReWriter writer = new ImageReWriter(snapshotDelta);
baseImage.write(writer, new ImageWriterOptions.Builder(MetadataVersion.MINIMUM_VERSION).build());
env.publisher.onMetadataUpdate(snapshotDelta, baseImage, fakeManifest(true));
assertEquals(1, env.metrics.globalTopicCount());
assertEquals(1, env.metrics.disklessTopicCount());

// Now delete via delta
MetadataDelta deleteDelta = new MetadataDelta(baseImage);
deleteDelta.replay(new org.apache.kafka.common.metadata.RemoveTopicRecord().setTopicId(topicId));
MetadataImage afterDelete = deleteDelta.apply(MetadataProvenance.EMPTY);

env.publisher.onMetadataUpdate(deleteDelta, afterDelete, fakeManifest(false));

assertEquals(0, env.metrics.globalTopicCount());
assertEquals(0, env.metrics.disklessTopicCount());
assertEquals(0, env.metrics.globalPartitionCount());
assertEquals(0, env.metrics.disklessPartitionCount());
}
}

@Test
public void testDisklessConfigChangeOnExistingTopicWithNewPartitions() {
// An existing classic topic gets new partitions AND its diskless.enable config changes
// in the same batch. The new partitions should not be double-counted in diskless metrics.
try (TestEnv env = new TestEnv()) {
Uuid topicId = Uuid.fromString("JKNp6fQaT-icHxh654ok-w");
String topicName = "migrating-topic";

// Start with a classic topic (1 partition)
TopicsImage topicsImage = fakeTopicsImage(
fakeTopicImage(topicName, topicId, fakePartitionRegistration(NORMAL)));
MetadataImage baseImage = fakeImageFromTopicsImage(topicsImage);

MetadataDelta snapshotDelta = new MetadataDelta(MetadataImage.EMPTY);
ImageReWriter writer = new ImageReWriter(snapshotDelta);
baseImage.write(writer, new ImageWriterOptions.Builder(MetadataVersion.MINIMUM_VERSION).build());
env.publisher.onMetadataUpdate(snapshotDelta, baseImage, fakeManifest(true));
assertEquals(1, env.metrics.globalTopicCount());
assertEquals(1, env.metrics.globalPartitionCount());
assertEquals(0, env.metrics.disklessTopicCount());
assertEquals(0, env.metrics.disklessPartitionCount());

// Now add a partition AND enable diskless in the same batch
MetadataDelta delta = new MetadataDelta(baseImage);
delta.replay(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(1)
.setReplicas(java.util.List.of(0, 1, 2))
.setIsr(java.util.List.of(0, 1, 2))
.setLeader(0)
.setLeaderEpoch(0)
.setPartitionEpoch(0));
delta.replay(new ConfigRecord()
.setResourceType(ConfigResource.Type.TOPIC.id())
.setResourceName(topicName)
.setName(TopicConfig.DISKLESS_ENABLE_CONFIG)
.setValue("true"));
MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);

env.publisher.onMetadataUpdate(delta, newImage, fakeManifest(false));

assertEquals(1, env.metrics.globalTopicCount());
assertEquals(2, env.metrics.globalPartitionCount());
assertEquals(1, env.metrics.disklessTopicCount(),
"Diskless topic count should be 1");
assertEquals(2, env.metrics.disklessPartitionCount(),
"Both partitions (old + new) should be counted as diskless exactly once");
assertEquals(0, env.metrics.offlinePartitionCount());
}
}
}
Loading