Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1894,9 +1894,10 @@ class ReplicaManager(val config: KafkaConfig,
classicFetchInfos += fetchInfo
} else {
val classicToDisklessStartOffset = _inklessMetadataView.getClassicToDisklessStartOffset(tp.topicPartition())
val shouldReadFromUnifiedLog =
classicToDisklessStartOffset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET &&
partitionData.fetchOffset < classicToDisklessStartOffset
val migrationPending =
classicToDisklessStartOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING
val shouldReadFromUnifiedLog = migrationPending || (
classicToDisklessStartOffset >= 0 && partitionData.fetchOffset < classicToDisklessStartOffset)

(shouldReadFromUnifiedLog, config.disklessManagedReplicasEnabled) match {
case (false, _) =>
Expand Down Expand Up @@ -2850,7 +2851,7 @@ class ReplicaManager(val config: KafkaConfig,
Option(topicImage.partitions().get(partitionId))
}
val shouldInitOnControlPlane = previousPartition.exists { previous =>
previous.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET &&
previous.classicToDisklessStartOffset < 0 &&
partitionRegistration.classicToDisklessStartOffset >= 0
}

Expand Down
142 changes: 142 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6427,6 +6427,148 @@ class ReplicaManagerTest {
}
}

@Test
def testFetchDisklessMigrationPendingReadsFromClassicLogWhenManagedReplicasEnabled(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
try {
val cp = mock(classOf[ControlPlane])
val replicaManager = spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp), disklessManagedReplicasEnabled = true))

// Given a diskless topic with classicToDisklessStartOffset = -2 (migration pending)
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING)

doReturn(Seq(disklessTopicPartition ->
new LogReadResult(
new FetchDataInfo(new LogOffsetMetadata(1L, 0L, 0), RECORDS),
Optional.empty(), 10L, 0L, 10L, 0L, 0L, OptionalLong.empty(), Errors.NONE
))
).when(replicaManager).readFromLog(any(), any(), any(), any())

val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request is served from the unified log (classic path) because migration is still pending
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(RECORDS, responseData(disklessTopicPartition).records)
verify(replicaManager, times(1)).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any())
verify(cp, never()).findBatches(any(), any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@Test
def testFetchDisklessMigrationPendingFailsWhenManagedReplicasDisabled(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
try {
val cp = mock(classOf[ControlPlane])
val replicaManager = spy(createReplicaManager(
List(disklessTopicPartition.topic()),
controlPlane = Some(cp),
disklessManagedReplicasEnabled = false,
))

// Given a diskless topic with classicToDisklessStartOffset = -2 (migration pending)
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING)

val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request fails because managed replicas are disabled and data is still in UnifiedLog
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(Errors.INVALID_REQUEST, responseData(disklessTopicPartition).error)
verify(replicaManager, never()).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any())
verify(cp, never()).findBatches(any(), any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@Test
def testFetchFullDisklessTopicRoutesDiskless(): Unit = {
val disklessResponse = Map(disklessTopicPartition ->
new FetchPartitionData(
Errors.NONE,
110L, 100L,
RECORDS,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)
)
val fetchHandlerCtor = mockFetchHandler(disklessResponse)
try {
val batchMetadata = mock(classOf[BatchMetadata])
when(batchMetadata.topicIdPartition()).thenReturn(disklessTopicPartition)
val batch = mock(classOf[BatchInfo])
when(batch.metadata()).thenReturn(batchMetadata)
val findBatchResponse = mock(classOf[FindBatchResponse])
when(findBatchResponse.batches()).thenReturn(util.List.of(batch))
when(findBatchResponse.highWatermark()).thenReturn(110L)
when(findBatchResponse.estimatedByteSize(50L)).thenReturn(RECORDS.sizeInBytes())
when(findBatchResponse.errors()).thenReturn(Errors.NONE)
val cp = mock(classOf[ControlPlane])
when(cp.findBatches(any(), any(), any())).thenReturn(util.List.of(findBatchResponse))
val replicaManager = spy(createReplicaManager(
List(disklessTopicPartition.topic()),
controlPlane = Some(cp),
disklessManagedReplicasEnabled = true,
))

// Given a full diskless topic with classicToDisklessStartOffset = -1 (never migrated)
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET)

val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request is served from diskless path (not from UnifiedLog)
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(Errors.NONE, responseData(disklessTopicPartition).error)
verify(replicaManager, never()).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@Test
def testFetchFailDisklessWhenFromReplicaAndUnmanagedReplicas(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
Expand Down Expand Up @@ -1929,6 +1930,14 @@ public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
if (validateOnly) {
return result.withoutRecords();
} else {
List<ApiMessageAndVersion> migrationRecords =
replicationControl.markClassicToDisklessMigrationStarted(configChanges, result.response());
if (!migrationRecords.isEmpty()) {
List<ApiMessageAndVersion> allRecords = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
allRecords.addAll(result.records());
allRecords.addAll(migrationRecords);
return ControllerResult.atomicOf(allRecords, result.response());
}
return result;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ ControllerResult<InitDisklessLogResponseData> initDisklessLog(
continue;
}

if (partition.classicToDisklessStartOffset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) {
if (partition.classicToDisklessStartOffset >= 0) {
log.info("Rejecting InitDisklessLog request from node {} for {}-{} because " +
"the partition is already initialized with classicToDisklessStartOffset={}.",
request.brokerId(), topic.name, partitionId, partition.classicToDisklessStartOffset);
Expand Down Expand Up @@ -2875,6 +2875,47 @@ private void validatePartitionReplicationFactorUnchanged(PartitionRegistration p
}
}

List<ApiMessageAndVersion> markClassicToDisklessMigrationStarted(
Comment thread
giuseppelillo marked this conversation as resolved.
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
Map<ConfigResource, ApiError> configResults
) {
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> entry : configChanges.entrySet()) {
ConfigResource resource = entry.getKey();
if (resource.type() != TOPIC) continue;
ApiError error = configResults.get(resource);
if (error != null && error != ApiError.NONE) continue;
Map<String, Entry<OpType, String>> changes = entry.getValue();
Entry<OpType, String> disklessChange = changes.get(DISKLESS_ENABLE_CONFIG);
if (disklessChange == null) continue;
if (disklessChange.getKey() != SET || !Boolean.parseBoolean(disklessChange.getValue())) continue;
if (isDisklessTopic(resource.name())) continue;

String topicName = resource.name();
Uuid topicId = topicsByName.get(topicName);
if (topicId == null) continue;
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) continue;

int sizeBefore = records.size();
for (Entry<Integer, PartitionRegistration> partEntry : topicInfo.parts.entrySet()) {
PartitionRegistration partition = partEntry.getValue();
if (partition.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) {
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(partEntry.getKey());
record.unknownTaggedFields().add(
InitDisklessLogFields.encodeClassicToDisklessStartOffset(
PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING));
records.add(new ApiMessageAndVersion(record, (short) 0));
}
}
log.info("Marked {} partition(s) for topic {} as classic-to-diskless migration pending",
records.size() - sizeBefore, topicName);
}
Comment thread
giuseppelillo marked this conversation as resolved.
return records;
}

private boolean isDisklessTopic(String topicName) {
return Boolean.parseBoolean(
configurationControl.currentTopicConfig(topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public PartitionRegistration build() {
}

public static final long NO_CLASSIC_TO_DISKLESS_START_OFFSET = -1L;
public static final long CLASSIC_TO_DISKLESS_MIGRATION_PENDING = -2L;

public final int[] replicas;
public final Uuid[] directories;
Expand Down
Loading
Loading