From 6a284116cdd11f8a45e7c489c9026989d7ca5b36 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 18 Dec 2023 22:01:16 +0100 Subject: [PATCH 1/8] KAFKA-16017: Checkpoint restored offsets instead of written offsets Kafka Streams checkpoints the wrong offset when a task is closed during restoration. If under exactly-once processing guarantees a TaskCorruptedException happens, the affected task is closed dirty, its state content is wiped out and the task is re-initialized. If during the following restoration the task is closed cleanly, the task writes the offsets that it stores in its record collector to the checkpoint file. Those offsets are the offsets that the task wrote to the changelog topics. In other words, the task writes the end offsets of its changelog topics to the checkpoint file. Consequently, when the task is initialized again on the same Streams client, the checkpoint file is read and the task assumes it is fully restored although the records between the last offsets the task restored before closing clean and the end offset of the changelog topics are missing locally. The fix is to clear the offsets in the record collector on close. --- checkstyle/import-control.xml | 1 + .../internals/RecordCollectorImpl.java | 7 +- .../integration/EosIntegrationTest.java | 185 +++++++++++++++++- .../internals/RecordCollectorTest.java | 63 ++++++ 4 files changed, 252 insertions(+), 4 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 1569494b455d..8842ec70c290 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -408,6 +408,7 @@ + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index e2af471275c5..8abf5e897623 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -361,7 +361,7 @@ public void closeClean() { // transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked // tasks had any data in the current transaction and therefore there is no need to commit or abort it. - checkForException(); + close(); } /** @@ -377,6 +377,11 @@ public void closeDirty() { streamsProducer.abortTransaction(); } + close(); + } + + private void close() { + offsets.clear(); checkForException(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index e8e37fca433b..8b52b7e547f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -26,21 +26,31 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.RangeQuery; @@ -60,6 +70,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.Assertions; import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -70,6 +81,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -82,13 +94,19 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; import static org.apache.kafka.streams.query.StateQueryRequest.inStore; import static org.apache.kafka.test.TestUtils.consumerConfig; @@ -789,6 +807,159 @@ public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception { verifyOffsetsAreInCheckpoint(1); } + @Test + public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled() throws Exception { + shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true); + } + + @Test + public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled() throws Exception { + if (!processingThreadsEnabled) { + shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false); + } + } + + @SuppressWarnings("deprecation") + private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean stateUpdaterEnabled) throws Exception { + if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && !eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) { + return; + } + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); + streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); + streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); + streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); + final String stateStoreName = "stateStore"; + + purgeLocalStreamsState(streamsConfiguration); + + final int startKey = 1; + final int endKey = 30001; + final int valueSize = 1000; + final StringBuilder value1 = new StringBuilder(valueSize); + for (int i = 0; i < valueSize; ++i) { + value1.append("A"); + } + final String valueStr1 = value1.toString(); + final List> recordBatch1 = IntStream.range(startKey, endKey).mapToObj(i -> KeyValue.pair(i, valueStr1)).collect(Collectors.toList()); + IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, + recordBatch1, + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + CLUSTER.time); + + final StoreBuilder> stateStore = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(stateStoreName), + Serdes.Integer(), + Serdes.String()).withCachingEnabled(); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean throwException = new AtomicBoolean(false); + final TaskId task00 = new TaskId(0, 0); + final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0); + final Topology topology = new Topology(); + topology + .addSource("source", MULTI_PARTITION_INPUT_TOPIC) + .addProcessor("processor", () -> new Processor() { + KeyValueStore stateStore; + org.apache.kafka.streams.processor.api.ProcessorContext context; + + @Override + public void init(final org.apache.kafka.streams.processor.api.ProcessorContext context) { + Processor.super.init(context); + this.context = context; + stateStore = context.getStateStore(stateStoreName); + } + + @Override + public void process(final Record record) { + context.recordMetadata().ifPresent(recordMetadata -> { + if (recordMetadata.partition() == 0) { + if (throwException.compareAndSet(true, false)) { + throw new TaskCorruptedException(Collections.singleton(task00)); + } + stateStore.put(record.key(), record.value()); + } else { + stateStore.put(record.key(), record.value()); + if (restoredOffsetsForPartition0.get() > 0) { + latch.countDown(); + } + } + }); + } + + @Override + public void close() { + Processor.super.close(); + } + }, "source") + .addStateStore(stateStore, "processor") + .addSink("sink", MULTI_PARTITION_OUTPUT_TOPIC, "processor"); + + final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); + kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { + @Override + public void onRestoreStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long endingOffset) { + if (topicPartition.partition() == 0) { + System.out.println("Restore listener - Starting offset: " + startingOffset); + } + } + @Override + public void onBatchRestored(final TopicPartition topicPartition, + final String storeName, + final long batchEndOffset, + final long numRestored) { + if (topicPartition.partition() == 0) { + System.out.println("Restore listener - Batch end offset: " + batchEndOffset); + restoredOffsetsForPartition0.set(batchEndOffset); + } + } + @Override + public void onRestoreEnd(final TopicPartition topicPartition, + final String storeName, + final long totalRestored) { + if (topicPartition.partition() == 0) { + System.out.println("Restore listener - Total restored: " + totalRestored); + } + } + }); + startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60)); + readResult( + applicationId + "-" + stateStoreName + "-changelog", + 2000, + StringDeserializer.class, + StringDeserializer.class, + CONSUMER_GROUP_ID + ); + throwException.set(true); + latch.await(); + kafkaStreams.close(); + + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60)); + final File checkpointFile = Paths.get( + streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG), + streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), + task00.toString(), + ".checkpoint" + ).toFile(); + Assertions.assertTrue(checkpointFile.exists()); + final Map checkpoints = new OffsetCheckpoint(checkpointFile).read(); + Assertions.assertEquals( + restoredOffsetsForPartition0.get(), + new ArrayList<>(checkpoints.values()).get(0) + ); + } + private void verifyOffsetsAreInCheckpoint(final int partition) throws IOException { final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator; @@ -989,13 +1160,21 @@ private void writeInputData(final List> records) { private List> readResult(final String topic, final int numberOfRecords, final String groupId) throws Exception { + return readResult(topic, numberOfRecords, LongDeserializer.class, LongDeserializer.class, groupId); + } + + private List> readResult(final String topic, + final int numberOfRecords, + final Class> keyDeserializer, + final Class> valueDeserializer, + final String groupId) throws Exception { if (groupId != null) { return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), groupId, - LongDeserializer.class, - LongDeserializer.class, + keyDeserializer, + valueDeserializer, Utils.mkProperties(Collections.singletonMap( ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))), @@ -1006,7 +1185,7 @@ private List> readResult(final String topic, // read uncommitted return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), + TestUtils.consumerConfig(CLUSTER.bootstrapServers(), keyDeserializer, valueDeserializer), topic, numberOfRecords ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 2496dc73552a..bf943d5b5e6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockClientSupplier; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -90,6 +91,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -779,6 +781,66 @@ public void shouldForwardFlushToStreamsProducerEosEnabled() { collector.flush(); } + @Test + public void shouldClearOffsetsOnCloseClean() { + shouldClearOffsetsOnClose(true); + } + + @Test + public void shouldClearOffsetsOnCloseDirty() { + shouldClearOffsetsOnClose(false); + } + + private void shouldClearOffsetsOnClose(final boolean clean) { + final StreamsProducer streamsProducer = mock(StreamsProducer.class); + when(streamsProducer.eosEnabled()).thenReturn(true); + final long offset = 1234L; + final RecordMetadata metadata = new RecordMetadata( + new TopicPartition(topic, 0), + offset, + 0, + 0, + 1, + 1 + ); + when(streamsProducer.send(any(), any())).thenAnswer(invocation -> { + ((Callback) invocation.getArgument(1)).onCompletion(metadata, null); + return null; + }); + final ProcessorTopology topology = mock(ProcessorTopology.class); + + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + streamsProducer, + productionExceptionHandler, + streamsMetrics, + topology + ); + collector.send( + topic + "-changelog", + "key", + "value", + new RecordHeaders(), + 0, + 0L, + new StringSerializer(), + new StringSerializer(), + null, + null + ); + + assertFalse(collector.offsets().isEmpty()); + + if (clean) { + collector.closeClean(); + } else { + collector.closeDirty(); + } + + assertTrue(collector.offsets().isEmpty()); + } + @Test public void shouldNotAbortTxOnCloseCleanIfEosEnabled() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); @@ -1228,6 +1290,7 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin try (final LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) { + logCaptureAppender.setThreshold(Level.INFO); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); collector.flush(); From a3fb43630ba8a29eb00f8a5b1580ae3a4e701569 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 18 Dec 2023 22:31:04 +0100 Subject: [PATCH 2/8] Use more meaningful name --- .../integration/EosIntegrationTest.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 8b52b7e547f9..833b57ccdd67 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -70,7 +70,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.jupiter.api.Assertions; import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -934,28 +933,27 @@ public void onRestoreEnd(final TopicPartition topicPartition, } }); startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60)); - readResult( + ensureCommittedRecordsInTopic( applicationId + "-" + stateStoreName + "-changelog", 2000, StringDeserializer.class, - StringDeserializer.class, - CONSUMER_GROUP_ID + StringDeserializer.class ); throwException.set(true); latch.await(); kafkaStreams.close(); - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60)); + final File checkpointFile = Paths.get( streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG), streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), task00.toString(), ".checkpoint" ).toFile(); - Assertions.assertTrue(checkpointFile.exists()); + assertTrue(checkpointFile.exists()); final Map checkpoints = new OffsetCheckpoint(checkpointFile).read(); - Assertions.assertEquals( - restoredOffsetsForPartition0.get(), + assertEquals( + Long.valueOf(restoredOffsetsForPartition0.get()), new ArrayList<>(checkpoints.values()).get(0) ); } @@ -1191,6 +1189,13 @@ private List> readResult(final String topic, ); } + private List> ensureCommittedRecordsInTopic(final String topic, + final int numberOfRecords, + final Class> keyDeserializer, + final Class> valueDeserializer) throws Exception { + return readResult(topic, numberOfRecords, keyDeserializer, valueDeserializer, CONSUMER_GROUP_ID); + } + private List> computeExpectedResult(final List> input) { final List> expectedResult = new ArrayList<>(input.size()); From 6be04052a04cfbd268293ff22237cc861456c65c Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Tue, 19 Dec 2023 11:26:36 +0100 Subject: [PATCH 3/8] Delete System.outs and try improve robustness of integration test --- .../integration/EosIntegrationTest.java | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 833b57ccdd67..6b5b45bd8771 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -834,6 +835,7 @@ private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(fina streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); + streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100); final String stateStoreName = "stateStore"; purgeLocalStreamsState(streamsConfiguration); @@ -887,9 +889,6 @@ public void process(final Record record) { stateStore.put(record.key(), record.value()); } else { stateStore.put(record.key(), record.value()); - if (restoredOffsetsForPartition0.get() > 0) { - latch.countDown(); - } } }); } @@ -899,8 +898,7 @@ public void close() { Processor.super.close(); } }, "source") - .addStateStore(stateStore, "processor") - .addSink("sink", MULTI_PARTITION_OUTPUT_TOPIC, "processor"); + .addStateStore(stateStore, "processor"); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { @@ -908,35 +906,29 @@ public void close() { public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, - final long endingOffset) { - if (topicPartition.partition() == 0) { - System.out.println("Restore listener - Starting offset: " + startingOffset); - } - } + final long endingOffset) {} @Override public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) { if (topicPartition.partition() == 0) { - System.out.println("Restore listener - Batch end offset: " + batchEndOffset); restoredOffsetsForPartition0.set(batchEndOffset); + if (batchEndOffset > 100) { + latch.countDown(); + } } } @Override public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, - final long totalRestored) { - if (topicPartition.partition() == 0) { - System.out.println("Restore listener - Total restored: " + totalRestored); - } - } + final long totalRestored) {} }); startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60)); ensureCommittedRecordsInTopic( applicationId + "-" + stateStoreName + "-changelog", - 2000, - StringDeserializer.class, + 10000, + IntegerDeserializer.class, StringDeserializer.class ); throwException.set(true); From 97e42270f7f16a54fc358ddadc588440cfd3ba3e Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Tue, 19 Dec 2023 12:00:40 +0100 Subject: [PATCH 4/8] Simplify record value and ensure exception is thrown --- .../integration/EosIntegrationTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 6b5b45bd8771..a162fae514fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -842,18 +842,12 @@ private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(fina final int startKey = 1; final int endKey = 30001; - final int valueSize = 1000; - final StringBuilder value1 = new StringBuilder(valueSize); - for (int i = 0; i < valueSize; ++i) { - value1.append("A"); - } - final String valueStr1 = value1.toString(); - final List> recordBatch1 = IntStream.range(startKey, endKey).mapToObj(i -> KeyValue.pair(i, valueStr1)).collect(Collectors.toList()); + final List> recordBatch1 = IntStream.range(startKey, endKey - 1000).mapToObj(i -> KeyValue.pair(i, 0)).collect(Collectors.toList()); IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, recordBatch1, TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, - StringSerializer.class), + IntegerSerializer.class), CLUSTER.time); final StoreBuilder> stateStore = Stores.keyValueStoreBuilder( @@ -929,9 +923,16 @@ public void onRestoreEnd(final TopicPartition topicPartition, applicationId + "-" + stateStoreName + "-changelog", 10000, IntegerDeserializer.class, - StringDeserializer.class + IntegerDeserializer.class ); throwException.set(true); + final List> recordBatch2 = IntStream.range(endKey - 1000, endKey).mapToObj(i -> KeyValue.pair(i, 0)).collect(Collectors.toList()); + IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, + recordBatch2, + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class), + CLUSTER.time); latch.await(); kafkaStreams.close(); waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60)); From e0826555265ffa5a3328ef98b1babd3ee5a9ad5c Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Tue, 19 Dec 2023 13:15:09 +0100 Subject: [PATCH 5/8] Fix checkstyle error --- .../apache/kafka/streams/integration/EosIntegrationTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index a162fae514fe..3977597a1bea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -32,8 +32,6 @@ import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; From e69abfba5075232683c7cfe68da5edd3096a1328 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 20 Dec 2023 12:06:45 +0100 Subject: [PATCH 6/8] Ensure partition to verify contains committed records --- .../integration/EosIntegrationTest.java | 42 +++++++++++++++---- .../utils/IntegrationTestUtils.java | 1 - 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 3977597a1bea..74759fe7287d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -106,6 +106,7 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinRecordsReceived; import static org.apache.kafka.streams.query.StateQueryRequest.inStore; import static org.apache.kafka.test.TestUtils.consumerConfig; import static org.apache.kafka.test.TestUtils.waitForCondition; @@ -853,9 +854,10 @@ private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(fina Serdes.Integer(), Serdes.String()).withCachingEnabled(); + final int partitionToVerify = 0; final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean throwException = new AtomicBoolean(false); - final TaskId task00 = new TaskId(0, 0); + final TaskId task00 = new TaskId(0, partitionToVerify); final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0); final Topology topology = new Topology(); topology @@ -874,7 +876,7 @@ public void init(final org.apache.kafka.streams.processor.api.ProcessorContext record) { context.recordMetadata().ifPresent(recordMetadata -> { - if (recordMetadata.partition() == 0) { + if (recordMetadata.partition() == partitionToVerify) { if (throwException.compareAndSet(true, false)) { throw new TaskCorruptedException(Collections.singleton(task00)); } @@ -917,9 +919,10 @@ public void onRestoreEnd(final TopicPartition topicPartition, final long totalRestored) {} }); startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60)); - ensureCommittedRecordsInTopic( + ensureCommittedRecordsInTopicPartition( applicationId + "-" + stateStoreName + "-changelog", - 10000, + partitionToVerify, + 2000, IntegerDeserializer.class, IntegerDeserializer.class ); @@ -1180,11 +1183,32 @@ private List> readResult(final String topic, ); } - private List> ensureCommittedRecordsInTopic(final String topic, - final int numberOfRecords, - final Class> keyDeserializer, - final Class> valueDeserializer) throws Exception { - return readResult(topic, numberOfRecords, keyDeserializer, valueDeserializer, CONSUMER_GROUP_ID); + private void ensureCommittedRecordsInTopicPartition(final String topic, + final int partition, + final int numberOfRecords, + final Class> keyDeserializer, + final Class> valueDeserializer) throws Exception { + boolean containsRecordsFromPartition; + final int maxTries = 3; + int tries = 0; + do { + final List> consumerRecords = waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + CONSUMER_GROUP_ID, + keyDeserializer, + valueDeserializer, + Utils.mkProperties(Collections.singletonMap( + ConsumerConfig.ISOLATION_LEVEL_CONFIG, + IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)) + ) + ), + topic, + numberOfRecords + ); + ++tries; + containsRecordsFromPartition = consumerRecords.stream().anyMatch(record -> record.partition() == partition); + } while (!containsRecordsFromPartition && tries < maxTries); } private List> computeExpectedResult(final List> input) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index b6c0e281651c..4f1d8d3d4266 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -1313,7 +1313,6 @@ private static List> readRecords(final String topic, final int maxMessages) { final List> consumerRecords; consumer.subscribe(singletonList(topic)); - System.out.println("Got assignment:" + consumer.assignment()); final int pollIntervalMs = 100; consumerRecords = new ArrayList<>(); int totalPollTimeMs = 0; From db151a55b2b44315617436610605f8a42be857bf Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 20 Dec 2023 13:56:20 +0100 Subject: [PATCH 7/8] Abort the verification for committed records regarding some conditions --- .../integration/EosIntegrationTest.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 74759fe7287d..e58840e11cee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -102,6 +102,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; @@ -1188,10 +1189,11 @@ private void ensureCommittedRecordsInTopicPartition(final String topic, final int numberOfRecords, final Class> keyDeserializer, final Class> valueDeserializer) throws Exception { - boolean containsRecordsFromPartition; - final int maxTries = 3; + final long timeoutMs = 2*DEFAULT_TIMEOUT; + final int maxTries = 10; + final long deadline = System.currentTimeMillis() + timeoutMs; int tries = 0; - do { + while (true) { final List> consumerRecords = waitUntilMinRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), @@ -1204,11 +1206,23 @@ private void ensureCommittedRecordsInTopicPartition(final String topic, ) ), topic, - numberOfRecords + numberOfRecords, + timeoutMs ); ++tries; - containsRecordsFromPartition = consumerRecords.stream().anyMatch(record -> record.partition() == partition); - } while (!containsRecordsFromPartition && tries < maxTries); + if(consumerRecords.stream().anyMatch(record -> record.partition() == partition)) { + return; + } + if (tries >= maxTries) { + throw new AssertionError("No committed records in topic " + topic + + ", partition " + partition + " after " + maxTries + " retries."); + } + final long now = System.currentTimeMillis(); + if (now > deadline) { + throw new AssertionError("No committed records in topic " + topic + + ", partition " + partition + " after " + timeoutMs + " ms."); + } + } } private List> computeExpectedResult(final List> input) { From 75ada37afe9533be3989d51f023b7ac5708c65cf Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 20 Dec 2023 14:28:21 +0100 Subject: [PATCH 8/8] Fix checkstyle issues --- checkstyle/suppressions.xml | 2 +- .../apache/kafka/streams/integration/EosIntegrationTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 44fdf634ab0e..7b9fcb6409cf 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -230,7 +230,7 @@ + files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest).java"/> diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index e58840e11cee..688693dbcbf0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -1189,7 +1189,7 @@ private void ensureCommittedRecordsInTopicPartition(final String topic, final int numberOfRecords, final Class> keyDeserializer, final Class> valueDeserializer) throws Exception { - final long timeoutMs = 2*DEFAULT_TIMEOUT; + final long timeoutMs = 2 * DEFAULT_TIMEOUT; final int maxTries = 10; final long deadline = System.currentTimeMillis() + timeoutMs; int tries = 0; @@ -1210,7 +1210,7 @@ private void ensureCommittedRecordsInTopicPartition(final String topic, timeoutMs ); ++tries; - if(consumerRecords.stream().anyMatch(record -> record.partition() == partition)) { + if (consumerRecords.stream().anyMatch(record -> record.partition() == partition)) { return; } if (tries >= maxTries) {