Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16017: Checkpoint restored offsets instead of written offsets #15044

Merged
merged 8 commits into from
Dec 21, 2023

Conversation

cadonna
Copy link
Contributor

@cadonna cadonna commented Dec 18, 2023

Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.
@@ -1228,6 +1290,7 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin

try (final LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
logCaptureAppender.setThreshold(Level.INFO);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because otherwise the result of the test depends on the configured log level.

@@ -408,6 +408,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.log4j" />
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private void close() {
offsets.clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like having common logic for close clean / close dirty. Should we also move removeAllProducedSensors here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid I do not understand. Method removeAllProducedSensors() is only called in closeClean() but not in closedDirty().

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the important fix, @cadonna . I left some comments, but overall looking good to me

}

private void close() {
offsets.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like having common logic for close clean / close dirty. Should we also move removeAllProducedSensors here?

final int endKey = 30001;
final int valueSize = 1000;
final StringBuilder value1 = new StringBuilder(valueSize);
for (int i = 0; i < valueSize; ++i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the size of the value essential to the test? In other words, "why?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is not essential. Actually it is just to put some load on restoration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I removed the string value and put in place a integer value instead.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@@ -1313,7 +1313,6 @@ private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String topic,
final int maxMessages) {
final List<ConsumerRecord<K, V>> consumerRecords;
consumer.subscribe(singletonList(topic));
System.out.println("Got assignment:" + consumer.assignment());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered this and removed it, because I guess it is a left-over from some other PR.

topic,
numberOfRecords
);
}

private <K, V> void ensureCommittedRecordsInTopicPartition(final String topic,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this method to specifically verify if the partition to verify contains committed records because I saw flaky test failures where the changelog topic of the partition to verify was empty. If the changelog topic is empty, the latch never counts down, the Streams client never closes and the test runs into the test timeout.

@lucasbru
Copy link
Member

LGTM, thanks!

@@ -230,7 +230,7 @@

<!-- Streams tests -->
<suppress checks="ClassFanOutComplexity"
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest).java"/>
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest).java"/>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have probably solved this checkstyle issue by moving the test to a separate file but I think it makes sense to keep it in EosIntegrationTest to avoid starting an additional embedded Kafka.

@cadonna cadonna merged commit 19727f8 into apache:trunk Dec 21, 2023
1 check failed
cadonna added a commit that referenced this pull request Dec 21, 2023
…15044)

Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
cadonna added a commit to cadonna/kafka that referenced this pull request Jan 11, 2024
…pache#15044)

Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
gaurav-narula pushed a commit to gaurav-narula/kafka that referenced this pull request Jan 24, 2024
…pache#15044)

Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
…pache#15044)

Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
…pache#15044)

Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.

The fix is to clear the offsets in the record collector on close.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants