Skip to content

kaiqkt/kobbit

Repository files navigation

kobbit

Kotlin saga pattern library with coroutines support. Orchestrates distributed transactions through a graph of steps and compensations.

Modules

Module Description
core Engine — Orchestrator, Node, Transaction, Signal, Backend
dsl Kotlin DSL for defining sagas
backend-inmemory In-process backend for local use and testing

Installation

Add JitPack to your repositories:

// settings.gradle.kts
dependencyResolutionManagement {
    repositories {
        mavenCentral()
        maven { url = uri("https://jitpack.io") }
    }
}

Add dependencies:

// build.gradle.kts
dependencies {
    implementation("com.github.kaiqkt.kobbit:core:0.1.0")
    implementation("com.github.kaiqkt.kobbit:dsl:0.1.0")

    // optional — in-memory backend for local use and testing
    implementation("com.github.kaiqkt.kobbit:backend-inmemory:0.1.0")
}

Concepts

SagaContext

Shared state passed through all steps. Extend SagaContext to carry your domain data:

data class OrderContext(
    val userId: String,
    val amount: BigDecimal,
    var orderId: String? = null,
    var reservationId: String? = null,
    var chargeId: String? = null
) : SagaContext()

Steps and compensations

Each step has an action (forward work) and an optional compensate (rollback). If a step fails after exhausting retries, the compensations of all previously completed steps run in reverse order.

noReturn

Steps inside noReturn cannot be compensated. If any of them fail, the saga aborts immediately without running any compensation.

Usage

Define a saga

import com.kaiqkt.kobbit.dsl.saga

val createOrderSaga = saga<OrderContext> {

    step("reserve-inventory") {
        action { ctx ->
            ctx.reservationId = inventoryService.reserve(ctx.userId)
        }
        compensate { ctx ->
            inventoryService.release(ctx.reservationId!!)
        }
        maxRetries = 3
    }

    step("create-order") {
        action { ctx ->
            ctx.orderId = orderService.create(ctx.userId, ctx.reservationId!!)
        }
        compensate { ctx ->
            orderService.cancel(ctx.orderId!!)
        }
    }

    step("charge-payment") {
        action { ctx ->
            ctx.chargeId = paymentService.charge(ctx.orderId!!, ctx.amount)
        }
        compensate { ctx ->
            paymentService.refund(ctx.chargeId!!)
        }
        maxRetries = 5
    }

    // past this point — no compensation runs on failure
    noReturn {
        step("fulfill-order") {
            action { ctx ->
                fulfillmentService.dispatch(ctx.orderId!!)
            }
        }
        step("notify-customer") {
            action { ctx ->
                emailService.send(ctx.userId, ctx.orderId!!)
            }
            maxRetries = 1
        }
    }
}

Run with the in-memory backend

import com.kaiqkt.kobbit.core.Orchestrator
import com.kaiqkt.kobbit.backend.inmemory.InMemoryBackend

val backend = InMemoryBackend<OrderContext>()

val orchestrator = Orchestrator(
    backend = backend,
    onFinish = { ctx -> println("Order ${ctx.orderId} completed") },
    onAbort  = { ctx, cause -> println("Order aborted: ${cause?.message}") }
)

// start listening in a coroutine
launch { orchestrator.listen() }

// start a saga — id is optional, defaults to a new UUID
orchestrator.start(
    saga = createOrderSaga,
    context = OrderContext(userId = "u-123", amount = BigDecimal("99.99"))
)

// correlate with an external trace ID
orchestrator.start(
    saga = createOrderSaga,
    context = OrderContext(userId = "u-456", amount = BigDecimal("49.99")),
    id = httpRequest.traceId
)

Custom backend

Implement Backend<C> to integrate with any transport (Kafka, JMS, SQS, etc.):

class KafkaBackend<C : SagaContext>(/* ... */) : Backend<C> {

    override suspend fun publish(signal: Signal<C>) {
        when (signal) {
            is Schedule, is Retry -> kafka.send(serialize(signal.transaction))
            is Abort, is Finish   -> { /* terminal — no re-enqueue */ }
        }
    }

    override suspend fun subscribe(deliver: suspend (Transaction<C>) -> Signal<C>) {
        kafka.consume { record ->
            deliver(deserialize(record))
        }
    }

    override suspend fun unsubscribe() {
        kafka.close()
    }
}

Failure behavior

Scenario Result
Step succeeds Routes to nextOnSuccess
Step fails, retries remaining Retry — backend re-delivers
Step fails, retries exhausted, has compensation Runs compensation chain in reverse
Step fails, retries exhausted, no compensation AbortonAbort called
Incompensable step fails Abort immediately, no compensation

License

MIT

About

Kotlin saga pattern library

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages