Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
{ "name": "PartitionId", "type": "int32", "versions": "0+",
"about": "The partition ID." },
{ "name": "DisklessStartOffset", "type": "int64", "versions": "0+",
"about": "The first offset retained for diskless reads." },
"about": "The first offset that will be used for storing messages in diskless format." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch seen by the broker." },
{ "name": "ProducerStates", "type": "[]ProducerState", "versions": "0+",
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,16 @@ class InitDisklessLogManager(
partition: Partition,
topicId: Uuid,
topicName: String,
disklessStartOffset: Long,
classicToDisklessStartOffset: Long,
producerStates: java.util.List[CpProducerState]
): Unit = {
if (disklessStartOffset < 0) {
warn(s"Received negative disklessStartOffset ($disklessStartOffset) for $topicName:${partition.topicPartition}, skipping control-plane init")
if (classicToDisklessStartOffset < 0) {
warn(s"Received negative classicToDisklessStartOffset ($classicToDisklessStartOffset) for $topicName:${partition.topicPartition}, skipping control-plane init")
return
}

val tp = partition.topicPartition
val payload = DisklessInitMetadata(topicName, disklessStartOffset, producerStates)
val payload = DisklessInitMetadata(topicName, classicToDisklessStartOffset, producerStates)
val newState = AwaitingMetadata(partition, topicId, Some(payload))
if (tracked.putIfAbsent(tp, newState) != null) {
tracked.computeIfPresent(tp, (_, _) => newState)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/InitDisklessLogState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ sealed trait InitDisklessLogState extends Logging {

final case class DisklessInitMetadata(
topicName: String,
disklessStartOffset: Long,
classicToDisklessStartOffset: Long,
producerStates: util.List[CpProducerState]
)

Expand Down Expand Up @@ -201,7 +201,7 @@ object SendingToController {

}

/** Controller accepted the request; waiting for the PartitionChangeRecord with disklessStartOffset to propagate. */
/** Controller accepted the request; waiting for the PartitionChangeRecord with classicToDisklessStartOffset to propagate. */
final case class AwaitingMetadata(
partition: Partition,
topicId: Uuid,
Expand Down Expand Up @@ -259,7 +259,7 @@ object AwaitingMetadata {
metadata.topicName,
state.tp.partition(),
log.logStartOffset,
metadata.disklessStartOffset,
metadata.classicToDisklessStartOffset,
metadata.producerStates
))
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1879,10 +1879,10 @@ class ReplicaManager(val config: KafkaConfig,
if (!_inklessMetadataView.isDisklessTopic(tp.topic())) {
classicFetchInfos += fetchInfo
} else {
val disklessStartOffset = _inklessMetadataView.getDisklessStartOffset(tp.topicPartition())
val classicToDisklessStartOffset = _inklessMetadataView.getClassicToDisklessStartOffset(tp.topicPartition())
val shouldReadFromUnifiedLog =
disklessStartOffset != PartitionRegistration.NO_DISKLESS_START_OFFSET &&
partitionData.fetchOffset < disklessStartOffset
classicToDisklessStartOffset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET &&
partitionData.fetchOffset < classicToDisklessStartOffset

(shouldReadFromUnifiedLog, config.disklessManagedReplicasEnabled) match {
case (false, _) =>
Expand Down Expand Up @@ -2833,8 +2833,8 @@ class ReplicaManager(val config: KafkaConfig,
Option(topicImage.partitions().get(partitionId))
}
val shouldInitOnControlPlane = previousPartition.exists { previous =>
previous.disklessStartOffset == PartitionRegistration.NO_DISKLESS_START_OFFSET &&
partitionRegistration.disklessStartOffset >= 0
previous.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET &&
partitionRegistration.classicToDisklessStartOffset >= 0
}

val tp = new TopicPartition(topicName, partitionId)
Expand All @@ -2855,7 +2855,7 @@ class ReplicaManager(val config: KafkaConfig,
partition = partition,
topicId = topicId,
topicName = topicName,
disklessStartOffset = partitionRegistration.disklessStartOffset,
classicToDisklessStartOffset = partitionRegistration.classicToDisklessStartOffset,
producerStates = producerStates
)
case Some(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf
.collect(Collectors.toSet[TopicIdPartition]())
}

def getDisklessStartOffset(topicPartition: TopicPartition): Long = {
def getClassicToDisklessStartOffset(topicPartition: TopicPartition): Long = {
Option(metadataCache.currentImage().topics().getTopic(topicPartition.topic()))
.flatMap(topicImage => Option(topicImage.partitions().get(topicPartition.partition())))
.map(_.disklessStartOffset)
.getOrElse(PartitionRegistration.NO_DISKLESS_START_OFFSET)
.map(_.classicToDisklessStartOffset)
.getOrElse(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET)
}

override def getTopicConfig(topicName: String): LogConfig = topicConfigs.computeIfAbsent(topicName, t => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ class InitDisklessLogManagerTest {
partition = partition,
topicId = topicId,
topicName = tp0.topic(),
disklessStartOffset = 100L,
classicToDisklessStartOffset = 100L,
producerStates = util.List.of(new CpProducerState(1L, 0.toShort, 0, 1, 100L, 1000L))
)

Expand All @@ -1036,7 +1036,7 @@ class InitDisklessLogManagerTest {
partition = partition,
topicId = topicId,
topicName = tp0.topic(),
disklessStartOffset = 100L,
classicToDisklessStartOffset = 100L,
producerStates = util.List.of()
)

Expand All @@ -1057,7 +1057,7 @@ class InitDisklessLogManagerTest {
partition = partition,
topicId = topicId,
topicName = tp0.topic(),
disklessStartOffset = 100L,
classicToDisklessStartOffset = 100L,
producerStates = util.List.of()
)

Expand All @@ -1079,14 +1079,14 @@ class InitDisklessLogManagerTest {
partition = partition,
topicId = topicId,
topicName = tp0.topic(),
disklessStartOffset = 100L,
classicToDisklessStartOffset = 100L,
producerStates = util.List.of()
)
manager.initOnControlPlane(
partition = partition,
topicId = topicId,
topicName = tp0.topic(),
disklessStartOffset = 100L,
classicToDisklessStartOffset = 100L,
producerStates = util.List.of()
)

Expand Down
14 changes: 7 additions & 7 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6345,8 +6345,8 @@ class ReplicaManagerTest {
// Given managed replicas enabled
val replicaManager = spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp), disklessManagedReplicasEnabled = true))

// Given a diskless topic with disklessStartOffset = 100
when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)
// Given a diskless topic with classicToDisklessStartOffset = 100
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)

// Given a classic log read result for offsets below diskless start offset
doReturn(Seq(disklessTopicPartition ->
Expand Down Expand Up @@ -6395,8 +6395,8 @@ class ReplicaManagerTest {
disklessManagedReplicasEnabled = false,
))

// Given a diskless topic with disklessStartOffset = 100
when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)
// Given a diskless topic with classicToDisklessStartOffset = 100
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)

// When fetching messages below the diskless start offset
val fetchParams = new FetchParams(
Expand Down Expand Up @@ -6459,7 +6459,7 @@ class ReplicaManagerTest {

@ParameterizedTest(name = "testFetchDisklessAtOrAboveStartOffsetUsesDiskless with managedReplicasEnabled: {0}")
@ValueSource(booleans = Array(true, false))
def testFetchDisklessAtOrAboveDisklessStartOffset(managedReplicasEnabled: Boolean): Unit = {
def testFetchDisklessAtOrAboveClassicToDisklessStartOffset(managedReplicasEnabled: Boolean): Unit = {
val disklessResponse = Map(disklessTopicPartition ->
new FetchPartitionData(
Errors.NONE,
Expand All @@ -6486,8 +6486,8 @@ class ReplicaManagerTest {
disklessManagedReplicasEnabled = managedReplicasEnabled,
))

// Given a diskless topic with disklessStartOffset = 100
when(replicaManager.inklessMetadataView().getDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)
// Given a diskless topic with classicToDisklessStartOffset = 100
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())).thenReturn(100L)

// When fetching messages at the diskless start offset
val fetchParams = new FetchParams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class InitDisklessLogFlowTest {
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(util.Arrays.asList(0, 1))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeDisklessStartOffset(100L))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeClassicToDisklessStartOffset(100L))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeProducerStates(util.List.of()))
pcrDelta.replay(pcr)
val pcrImage = withClusterBrokers(pcrDelta.apply(MetadataProvenance.EMPTY))
Expand Down Expand Up @@ -600,7 +600,7 @@ class InitDisklessLogFlowTest {
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(util.Arrays.asList(0, 1))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeDisklessStartOffset(100L))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeClassicToDisklessStartOffset(100L))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeProducerStates(util.List.of()))
pcrDelta.replay(pcr)
val pcrImage = withClusterBrokers(pcrDelta.apply(MetadataProvenance.EMPTY))
Expand Down Expand Up @@ -669,7 +669,7 @@ class InitDisklessLogFlowTest {
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(util.Arrays.asList(0, 1))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeDisklessStartOffset(100L))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeClassicToDisklessStartOffset(100L))
pcr.unknownTaggedFields().add(InitDisklessLogFields.encodeProducerStates(util.List.of()))
pcrDelta.replay(pcr)
val pcrImage = withClusterBrokers(pcrDelta.apply(MetadataProvenance.EMPTY))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ CompletableFuture<AssignReplicasToDirsResponseData> assignReplicasToDirs(

/**
* Initialize diskless logs for classic-to-diskless migration. Validates the leader
* and persists disklessStartOffset and producer states in a PartitionChangeRecord.
* and persists classicToDisklessStartOffset and producer states in a PartitionChangeRecord.
*
* @param context The controller request context.
* @param request The InitDisklessLog request data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1515,10 +1515,10 @@ ControllerResult<InitDisklessLogResponseData> initDisklessLog(
continue;
}

if (partition.disklessStartOffset != PartitionRegistration.NO_DISKLESS_START_OFFSET) {
if (partition.classicToDisklessStartOffset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) {
log.info("Rejecting InitDisklessLog request from node {} for {}-{} because " +
"the partition is already initialized with disklessStartOffset={}.",
request.brokerId(), topic.name, partitionId, partition.disklessStartOffset);
"the partition is already initialized with classicToDisklessStartOffset={}.",
request.brokerId(), topic.name, partitionId, partition.classicToDisklessStartOffset);
partitionResponses.add(new InitDisklessLogResponseData.PartitionResponse()
.setPartitionId(partitionId)
.setErrorCode(INVALID_REQUEST.code()));
Expand All @@ -1540,15 +1540,15 @@ ControllerResult<InitDisklessLogResponseData> initDisklessLog(
.setTopicId(topicId)
.setPartitionId(partitionId);
record.unknownTaggedFields().add(
InitDisklessLogFields.encodeDisklessStartOffset(partitionData.disklessStartOffset()));
InitDisklessLogFields.encodeClassicToDisklessStartOffset(partitionData.disklessStartOffset()));
if (!producerStates.isEmpty()) {
record.unknownTaggedFields().add(
InitDisklessLogFields.encodeProducerStates(producerStates));
}

records.add(new ApiMessageAndVersion(record, (short) 0));

log.info("InitDisklessLog for {}-{}: disklessStartOffset={}, producerStates.size={}",
log.info("InitDisklessLog for {}-{}: classicToDisklessStartOffset={}, producerStates.size={}",
topic.name, partitionId, partitionData.disklessStartOffset(),
producerStates.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,26 @@
*/
public final class InitDisklessLogFields {

public static final int DISKLESS_START_OFFSET_TAG = 100;
public static final int CLASSIC_TO_DISKLESS_START_OFFSET_TAG = 100;
public static final int PRODUCER_STATES_TAG = 101;

private InitDisklessLogFields() {}

// --- disklessStartOffset (tag 100): a single int64 ---
// --- classicToDisklessStartOffset (tag 100): a single int64 ---

public static RawTaggedField encodeDisklessStartOffset(long disklessStartOffset) {
public static RawTaggedField encodeClassicToDisklessStartOffset(long classicToDisklessStartOffset) {
byte[] data = new byte[8];
ByteBuffer.wrap(data).putLong(disklessStartOffset);
return new RawTaggedField(DISKLESS_START_OFFSET_TAG, data);
ByteBuffer.wrap(data).putLong(classicToDisklessStartOffset);
return new RawTaggedField(CLASSIC_TO_DISKLESS_START_OFFSET_TAG, data);
}

public static long decodeDisklessStartOffset(List<RawTaggedField> taggedFields) {
public static long decodeClassicToDisklessStartOffset(List<RawTaggedField> taggedFields) {
for (RawTaggedField field : taggedFields) {
if (field.tag() == DISKLESS_START_OFFSET_TAG) {
if (field.tag() == CLASSIC_TO_DISKLESS_START_OFFSET_TAG) {
return ByteBuffer.wrap(field.data()).getLong();
}
}
return PartitionRegistration.NO_DISKLESS_START_OFFSET;
return PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET;
}

// --- producerStates (tag 101): count + fixed-size entries ---
Expand Down
Loading
Loading