-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16017: Checkpoint restored offsets instead of written offsets #15044
Conversation
Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally. The fix is to clear the offsets in the record collector on close.
@@ -1228,6 +1290,7 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin | |||
|
|||
try (final LogCaptureAppender logCaptureAppender = | |||
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) { | |||
logCaptureAppender.setThreshold(Level.INFO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this because otherwise the result of the test depends on the configured log level.
@@ -408,6 +408,7 @@ | |||
<allow pkg="com.fasterxml.jackson" /> | |||
<allow pkg="kafka.utils" /> | |||
<allow pkg="org.apache.zookeeper" /> | |||
<allow pkg="org.apache.log4j" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to add this to enable https://github.com/apache/kafka/pull/15044/files#r1430656200
} | ||
|
||
private void close() { | ||
offsets.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the actual fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like having common logic for close clean / close dirty. Should we also move removeAllProducedSensors
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid I do not understand. Method removeAllProducedSensors()
is only called in closeClean()
but not in closedDirty()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the important fix, @cadonna . I left some comments, but overall looking good to me
} | ||
|
||
private void close() { | ||
offsets.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like having common logic for close clean / close dirty. Should we also move removeAllProducedSensors
here?
final int endKey = 30001; | ||
final int valueSize = 1000; | ||
final StringBuilder value1 = new StringBuilder(valueSize); | ||
for (int i = 0; i < valueSize; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the size of the value essential to the test? In other words, "why?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is not essential. Actually it is just to put some load on restoration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I removed the string value and put in place a integer value instead.
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
@@ -1313,7 +1313,6 @@ private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String topic, | |||
final int maxMessages) { | |||
final List<ConsumerRecord<K, V>> consumerRecords; | |||
consumer.subscribe(singletonList(topic)); | |||
System.out.println("Got assignment:" + consumer.assignment()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I discovered this and removed it, because I guess it is a left-over from some other PR.
topic, | ||
numberOfRecords | ||
); | ||
} | ||
|
||
private <K, V> void ensureCommittedRecordsInTopicPartition(final String topic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this method to specifically verify if the partition to verify contains committed records because I saw flaky test failures where the changelog topic of the partition to verify was empty. If the changelog topic is empty, the latch never counts down, the Streams client never closes and the test runs into the test timeout.
LGTM, thanks! |
@@ -230,7 +230,7 @@ | |||
|
|||
<!-- Streams tests --> | |||
<suppress checks="ClassFanOutComplexity" | |||
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest).java"/> | |||
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest).java"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have probably solved this checkstyle issue by moving the test to a separate file but I think it makes sense to keep it in EosIntegrationTest
to avoid starting an additional embedded Kafka.
…15044) Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally. The fix is to clear the offsets in the record collector on close. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
…pache#15044) Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally. The fix is to clear the offsets in the record collector on close. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
…pache#15044) Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally. The fix is to clear the offsets in the record collector on close. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
…pache#15044) Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally. The fix is to clear the offsets in the record collector on close. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
…pache#15044) Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally. The fix is to clear the offsets in the record collector on close. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally.
The fix is to clear the offsets in the record collector on close.
Committer Checklist (excluded from commit message)