A sample application which demonstrates how to use KafkaTemplate to send message to Kafka Topic within a transaction and how message is received by the Kafka Consumer.
- It is assumed that Kafka Topic
customand DeadLetterTopiccustom.DLTalready exist instead of being created by the application itself. - For comparison only, a separate producer and consumer are configured and expect a topic with the name
simpleto exist. This topic expects message payload to be a of typeString.
- A HTTP Post request is send to the application with payload in JSON format.
- Upon receiving such request, a message is sent to the topic which includes the payload as message body.
- Consumer is notified when message arrives and begins processing. If error occurs such as message cannot be converted, a retry will happen until
maxFailuretimes. In such case, message is sent to theDLTfor further inspection.
- Instead of processing incoming HTTP request using the same thread, a response is sent back to the caller as soon as the request is being sent to the Kafka Topic. This decoupling allows the application to scale up without the processing becomes the bottleneck.
- A separate thread (consumer) can listen to the topic and begin processing. Processing can be scaled up by adding more consumers.
-
HTTP POST to
/messagewith any string as payload.- this is for working with
simpletopic which expects message to be string
- this is for working with
-
HTTP POST to
/custom-messagewith JSON payload-
this is for working with
customtopic which expects message to have the following format{ "message": "any-message" }
-
Inside postMessage method of CustomMessageProducerService class, we are using kafkaTemplate.executeInTransaction method to wrap around the code which sends message.
Wrapping the code within a transaction is not strictly necessary in order to send the message. It is being used here for illustration purpose.
On the producer side, we set the transaction id prefix to enable transaction.
factory.setTransactionIdPrefix("tx-");
On the consumer side, we configure consumer configuration property isolation.level to be read_committed.
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
We can configure an error handler in the listener container to perform other action.
The SeekToCurrentErrorHandler discards remaining records from the poll() and performs seek operations on the consumer to reset the offsets so that the discarded records are fetched again on the next poll
The failed record can be sent to a DeadLetterTopic for further inspection.
Using ErrorHandlingDeserializer2 can wrap a delegate deserializer such as StringDeserializer and catches any exception during deserialization. Exception is forwareded to the listener container, which is sent to error handler.
A MessageConverter StringJsonMessageConverter is configured to convert from String to custom Java POJO. trusted package and id-class mapping are set up so that message can be converted into POJO.
-
application.yamlhas hard-coded value which most likely doesn't work for you and has to be changed.bootstrapServer: 192.168.1.196:9092