Application spike that uses Kafka + Orchestration style Saga processing to break complex transactions into simple steps using a simple State Machine
You can spin up the entire application using:
make startThis will bring up the following:
- Zookeeper service
- Kafka service
- Postgres DB service
- 4 KafkaSaga instances
POST http://localhost:8181/v1/message
Content-Type: application/json
Accept: application/json
{
"source": "source",
"destination": "kafkasaga1",
"message": "Simple message sent to the first kafka saga microservice"
}The other three servers are running on ports 8182, 8183 and 8184
The Kafka Saga instances use Kafka to transport Messages and manage MessageEvents, any Messages received are persisted into the Postgres Database.
The message flow is:
- KafkaSaga instance receives HTTP request containing a new Message
- The message is written to the
dev.messagekafka topic - A (potentially new) KafkaSaga instance receives the message from the
dev.messagekafka topic - The message is written to the DB
- A MessageEvent is published to
dev.message.event.internalkafka topic with a status of PERSISTED - The PERSISTED event is sent to all Kafka instances and is ignored
- The PERSISTED event is sent to a (potentially new) KafkaSaga
MessageEventHandlerwhich delgates the work to theStandardiseMessageActionclass - (n) The message is standardised written to the database
9 The
StandardiseMessageActionclass returns a new MessageEvent with a status of STANDARDISED - (n+1) The STANDARDISED event is sent to all KafkaSaga instances and the one that handled the http request loads the standardised message from the Database
- (n+2) The KafkaSaga instance returns the http response to the caller
- The STANDARDISED event is sent to the KafkaSaga
MessageEventHandlerwhich delegates the work to thePropagateMessageActionclass - The message is propagated to other external systems
- The
PropagateMessageActionclass returns a new MessageEvent with a status of PROPAGATED - The PROPAGATED event is sent to all KafkaSaga instances and is ignored
- The PROPAGATED event is sent to the KafkaSaga
MessageEventHandlerwhich delegates the work to theProcessingCompleteActionclass - Any cleanup work is performed after processing finishes
- The
ProcessingCompleteActionclass returns a new MessageEvent with a status of PROCESSED MessageEventHandlerwrites a tombstone record intodev.message.event.internal
There are three main parts to the microservice:
MesageControllerREST interface - Provides the controller that receives messages and puts them on thedev.messagetopicMessageHandlerReads the message off thedev.messagetopic, write it to the DB and triggers a PERSISTED event ondev.message.event.internaltopicMessageEventHandlerListens to message events and implements a simple state machine to process the messages
saga1_1 | 2021-03-13 12:57:30.556 INFO 6 --- [a-application-1] i.g.r.k.s.k.c.MessageController : Message Sent: 744259f2-0759-49b6-985f-77948e94096e
saga3_1 | 2021-03-13 12:57:30.556 INFO 6 --- [ntainer#2-1-C-1] i.g.r.k.s.k.listener.MessageHandler : Received message Message(id=null, source=source, destination=destination1, message=sample message) [744259f2-0759-49b6-985f-77948e94096e]
saga3_1 | 2021-03-13 12:57:30.563 INFO 6 --- [ntainer#2-1-C-1] i.g.r.k.s.k.listener.MessageHandler : Wrote message: [744259f2-0759-49b6-985f-77948e94096e]
saga4_1 | 2021-03-13 12:57:30.566 INFO 7 --- [ntainer#1-1-C-1] i.g.r.k.s.k.s.a.StandardiseMessageAction : PERSISTED -> Standardising message: [744259f2-0759-49b6-985f-77948e94096e]
saga4_1 | 2021-03-13 12:57:33.322 INFO 7 --- [ntainer#1-1-C-1] i.g.r.k.s.k.s.a.PropagateMessageAction : STANDARDISED -> Propagating message: [744259f2-0759-49b6-985f-77948e94096e]
saga4_1 | 2021-03-13 12:57:36.279 INFO 7 --- [ntainer#1-1-C-1] i.g.r.k.s.k.s.a.ProcessingCompleteAction : PROPAGATED -> Processing finished: [744259f2-0759-49b6-985f-77948e94096e]
Here we can see from the log that the REST interface received a message on INSTANCE 1.
The Message was received on INSTANCE 3 and was written to the database and a PERSISTED event was triggered
The Saga processor listened to the events on the dev.message.event.internal processing each event and triggering state changes
on INSTANCE 4
The instance that processes the message is determined by the way the broker has allocated partitions and the way the correlation id is mapped to partitions.
NOTE: The MessageController is listening to a reactive stream of MessageEvent objects from the dev.message.event.internal
topic - when it reaches STANDARDISED state the MessageController returns the StandardisedMessage associated with the event.
The saga is represented as a simple state machine where message events are sent to Actions that process them and return a new message event with a potentially different state.
This approach allows failures in Kafka, the Database or KafkaSaga instances to be recoverable. For example if a KafkaSaga instance crashes halfway through processing a MessageEvent when the Kafka broker detects it has failed the topic partitions will be reallocated between the remaining KafaSaga instances and the MessageEvent will be redelivered and reprocessed.
Before the application can be run it needs a local kafka and postgres database running - this can be done using docker:
make start-devThis will start all the services (kafka, zookeeper and postgres)
- Kafka should be available on localhost:9092
- Zookeeper should be available on localhost:2181
- Postgres should be available on localhost:5432
In a separate terminal you can access the database to make sure it is working:
make db-terminalYou can open a SQL terminal using:
psql --host=database --username=admin --dbname=messagesThe password when prompted is password
SELECT * FROM Message;Can be run after the application has started.
make run./gradlew buildexport INSTANCE_ID=one && java -jar build/libs/KafkaSaga-0.0.1-SNAPSHOT.jar --server.port=8181
export INSTANCE_ID=two && java -jar build/libs/KafkaSaga-0.0.1-SNAPSHOT.jar --server.port=8182
export INSTANCE_ID=three && java -jar build/libs/KafkaSaga-0.0.1-SNAPSHOT.jar --server.port=8183
export INSTANCE_ID=four && java -jar build/libs/KafkaSaga-0.0.1-SNAPSHOT.jar --server.port=8184