Skip to content

Conversation

@mhratson
Copy link
Collaborator

@mhratson mhratson commented Mar 22, 2023

Problem

  • calling _baseConsumer.close(), outside of the thread the consumer is running in, is invalid
  • as docummented in kafka consumer docs1

The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.

The exception thrown

2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service
        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.utils.CloseableLock.<init>(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]

Solution

The recommended solution1 is

  • to use consumer.wakeup(); method
  • but the method is not yet adopted by the KMBaseConsumer interface
  • so for now _baseConsumer.close() is moved into the thread
  • calling stop now only sets _running.compareAndSet(true, false), so the runloop exits

Testing Done

Increased thread.join(5000) timeout as this implementation is slower to stop due to not interrupting the consumer thread.

- ./gradlew test

Footnotes

  1. KafkaConsumer.java 2

@mhratson mhratson force-pushed the LIKAFKA-34677-fix-close-error branch from fddc465 to c9ffcb1 Compare March 22, 2023 02:18
@mhratson mhratson changed the title ConsumeService: fix client closing causing ConcurrentModificationExeption ConsumeService: fix client close() causing ConcurrentModificationExeption Mar 22, 2023
…eption`

## Problem

- calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid
- as docummented in _kafka consumer docs_[^1]

> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.

The exception thrown
```
2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service
        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.utils.CloseableLock.<init>(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]
```

## Solution

The recommended solution[^1] is
- to use `consumer.wakeup();` method
- but the method is not yet adopted by the `KMBaseConsumer` interface
- so for now `_baseConsumer.close()` is moved into the thread
- calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits

[^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502)

## Testing Done

Increased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread.

`- ./gradlew test`
@mhratson mhratson force-pushed the LIKAFKA-34677-fix-close-error branch from c9ffcb1 to 0da972f Compare March 22, 2023 02:24
@mhratson mhratson merged commit 043db64 into master Mar 22, 2023
@mhratson mhratson deleted the LIKAFKA-34677-fix-close-error branch March 22, 2023 21:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants