Kotlin saga pattern library with coroutines support. Orchestrates distributed transactions through a graph of steps and compensations.
| Module | Description |
|---|---|
core |
Engine — Orchestrator, Node, Transaction, Signal, Backend |
dsl |
Kotlin DSL for defining sagas |
backend-inmemory |
In-process backend for local use and testing |
Add JitPack to your repositories:
// settings.gradle.kts
dependencyResolutionManagement {
repositories {
mavenCentral()
maven { url = uri("https://jitpack.io") }
}
}Add dependencies:
// build.gradle.kts
dependencies {
implementation("com.github.kaiqkt.kobbit:core:0.1.0")
implementation("com.github.kaiqkt.kobbit:dsl:0.1.0")
// optional — in-memory backend for local use and testing
implementation("com.github.kaiqkt.kobbit:backend-inmemory:0.1.0")
}Shared state passed through all steps. Extend SagaContext to carry your domain data:
data class OrderContext(
val userId: String,
val amount: BigDecimal,
var orderId: String? = null,
var reservationId: String? = null,
var chargeId: String? = null
) : SagaContext()Each step has an action (forward work) and an optional compensate (rollback). If a step fails after exhausting retries, the compensations of all previously completed steps run in reverse order.
Steps inside noReturn cannot be compensated. If any of them fail, the saga aborts immediately without running any compensation.
import com.kaiqkt.kobbit.dsl.saga
val createOrderSaga = saga<OrderContext> {
step("reserve-inventory") {
action { ctx ->
ctx.reservationId = inventoryService.reserve(ctx.userId)
}
compensate { ctx ->
inventoryService.release(ctx.reservationId!!)
}
maxRetries = 3
}
step("create-order") {
action { ctx ->
ctx.orderId = orderService.create(ctx.userId, ctx.reservationId!!)
}
compensate { ctx ->
orderService.cancel(ctx.orderId!!)
}
}
step("charge-payment") {
action { ctx ->
ctx.chargeId = paymentService.charge(ctx.orderId!!, ctx.amount)
}
compensate { ctx ->
paymentService.refund(ctx.chargeId!!)
}
maxRetries = 5
}
// past this point — no compensation runs on failure
noReturn {
step("fulfill-order") {
action { ctx ->
fulfillmentService.dispatch(ctx.orderId!!)
}
}
step("notify-customer") {
action { ctx ->
emailService.send(ctx.userId, ctx.orderId!!)
}
maxRetries = 1
}
}
}import com.kaiqkt.kobbit.core.Orchestrator
import com.kaiqkt.kobbit.backend.inmemory.InMemoryBackend
val backend = InMemoryBackend<OrderContext>()
val orchestrator = Orchestrator(
backend = backend,
onFinish = { ctx -> println("Order ${ctx.orderId} completed") },
onAbort = { ctx, cause -> println("Order aborted: ${cause?.message}") }
)
// start listening in a coroutine
launch { orchestrator.listen() }
// start a saga — id is optional, defaults to a new UUID
orchestrator.start(
saga = createOrderSaga,
context = OrderContext(userId = "u-123", amount = BigDecimal("99.99"))
)
// correlate with an external trace ID
orchestrator.start(
saga = createOrderSaga,
context = OrderContext(userId = "u-456", amount = BigDecimal("49.99")),
id = httpRequest.traceId
)Implement Backend<C> to integrate with any transport (Kafka, JMS, SQS, etc.):
class KafkaBackend<C : SagaContext>(/* ... */) : Backend<C> {
override suspend fun publish(signal: Signal<C>) {
when (signal) {
is Schedule, is Retry -> kafka.send(serialize(signal.transaction))
is Abort, is Finish -> { /* terminal — no re-enqueue */ }
}
}
override suspend fun subscribe(deliver: suspend (Transaction<C>) -> Signal<C>) {
kafka.consume { record ->
deliver(deserialize(record))
}
}
override suspend fun unsubscribe() {
kafka.close()
}
}| Scenario | Result |
|---|---|
| Step succeeds | Routes to nextOnSuccess |
| Step fails, retries remaining | Retry — backend re-delivers |
| Step fails, retries exhausted, has compensation | Runs compensation chain in reverse |
| Step fails, retries exhausted, no compensation | Abort — onAbort called |
| Incompensable step fails | Abort immediately, no compensation |