Skip to content

ochaton/lymbo

Repository files navigation

Kharon

A Go library for delayed task processing and state reconciliation.

Table of Contents

Features

  • Pluggable Storage: In-memory and PostgreSQL backends
  • Priority Scheduling: Nice values (lower = higher priority)
  • Retry Strategies: Fixed delays or exponential backoff
  • Tubes: Route tickets to separate queues
  • Groups: Track batches of related tickets and query their collective progress
  • Finalizers: Run a ticket automatically after all members of a group have finished
  • Automatic Expiration: Cleanup of completed/expired tickets
  • Concurrent Processing: Configurable worker pools
  • Prometheus Metrics: Per-type and per-tube counters, processing duration and queue wait histograms

Installation

go get github.com/ochaton/lymbo

Quick Start

package main

import (
    "context"
    "log/slog"

    "github.com/ochaton/lymbo"
    "github.com/ochaton/lymbo/store/memory"
)

func main() {
    ctx := context.Background()

    kh := lymbo.NewKharon(memory.NewStore(), lymbo.DefaultSettings(), slog.Default())

    router := lymbo.NewRouter()
    router.HandleFunc("example", func(ctx context.Context, t *lymbo.Ticket) error {
        slog.Info("processing", "id", t.ID, "payload", t.Payload)
        return kh.Ack(ctx, t.ID)
    })

    go kh.Run(ctx, router)

    ticket, _ := lymbo.NewTicket(lymbo.MustID(), "example")
    kh.Put(ctx, *ticket)
}

Usage

Creating Tickets

ticket, err := lymbo.NewTicket(lymbo.MustID(), "task-type")

// With payload (auto-marshaled to JSON)
ticket.WithPayload(map[string]any{"key": "value"})

// With priority and delayed execution
ticket.WithNice(5).WithRunat(time.Now().Add(time.Hour))

// With tube routing
ticket, err := lymbo.NewTubeTicket("emails", lymbo.MustID(), "send-email")

// Put with options
kh.Put(ctx, *ticket,
    lymbo.WithDelay(lymbo.FixedDelay(5*time.Minute)),
    lymbo.WithNice(10),
)

Handling Tickets

router := lymbo.NewRouter()

router.HandleFunc("email", func(ctx context.Context, t *lymbo.Ticket) error {
    if err := sendEmail(t.Payload); err != nil {
        if isTransient(err) {
            return kh.Retry(ctx, t.ID, lymbo.WithDelay(lymbo.FixedDelay(5*time.Minute)))
        }
        return kh.Fail(ctx, t.ID, lymbo.WithErrorReason(err.Error()))
    }
    return kh.Ack(ctx, t.ID)
})

router.NotFoundFunc(func(ctx context.Context, t *lymbo.Ticket) error {
    return kh.Fail(ctx, t.ID, lymbo.WithErrorReason("unsupported type"))
})

Ticket Lifecycle

Method Default Behavior Notes
Ack Removes ticket Use WithKeep() to retain
Done Keeps ticket (status=done) Equivalent to Ack + WithKeep()
Fail Keeps ticket (status=failed) Use WithErrorReason() for details
Cancel Removes ticket Use WithKeep() to retain
Retry Reschedules as pending Increments attempts counter
Delete Removes ticket Permanent removal

All methods accept options:

// Retry with exponential backoff
kh.Retry(ctx, id, lymbo.WithDelay(lymbo.BackoffDelay(1.5, 15*time.Second, 0)))

// Done with TTL (auto-removed after 24h)
kh.Done(ctx, id, lymbo.WithDelay(lymbo.FixedDelay(24*time.Hour)))

// Fail with error reason
kh.Fail(ctx, id,
    lymbo.WithErrorReason("connection timeout"),
    lymbo.WithDelay(lymbo.FixedDelay(7*24*time.Hour)),
)

// Cancel but keep for audit
kh.Cancel(ctx, id, lymbo.WithKeep(), lymbo.WithErrorReason("cancelled by user"))

Options

Option Description
WithDelay(DelayStrategy) Set processing delay or TTL for auto-removal
WithNice(n) Set priority (lower = higher)
WithKeep() Keep ticket in store instead of removing
WithErrorReason(reason) Store error/cancellation reason
WithPayload(v) Set ticket payload
WithTube(tube) Transfer ticket to another tube
WithGroup(id) Assign or transfer ticket to a group
WithUpdate(fn) Custom ticket modification (executed last)
WithResetAttempts() Reset attempt counter
AfterGroup(id) Register ticket as finalizer for a group (blocked until all current members finish)

Delay Strategies

// Fixed delay
lymbo.FixedDelay(5 * time.Minute)

// Exponential backoff: base^attempts seconds, capped at maxDelay, with optional jitter
lymbo.BackoffDelay(1.5, 15*time.Second, 0)
lymbo.BackoffDelay(2.0, time.Minute, 500*time.Millisecond)

Groups

A group is a named set of tickets that can be tracked collectively. Submit tickets into a group with WithGroup, then poll the group to see how many are still pending or whether all have reached a terminal state (Done, Failed, or Cancelled).

Groups are optional and persistent — the group_id is stored alongside each ticket so progress survives process restarts.

// Create a group handle (lightweight, no DB write)
g := kh.Group("order-42-notifications")

// Submit tickets into the group
for _, userID := range recipients {
    ticket, _ := lymbo.NewTicket(lymbo.MustID(), "send-notification")
    kh.Put(ctx, *ticket,
        lymbo.WithGroup(g.ID()),
        lymbo.WithPayload(userID),
    )
}

// Poll group progress from anywhere in your code
n, err := g.PendingCount(ctx)   // tickets still pending
ok, err := g.AllTerminal(ctx)   // true when none are pending

Alternatively, set the group directly on the ticket using the builder:

ticket.WithGroup("order-42-notifications")
kh.Put(ctx, *ticket)

Transitioning between groups works the same way as transferring tubes — pass WithGroup to any lifecycle method and the ticket moves to the new group atomically:

// Move a ticket to a different group on retry
kh.Retry(ctx, t.ID,
    lymbo.WithGroup("retry-batch"),
    lymbo.WithDelay(lymbo.FixedDelay(5*time.Minute)),
)

// Clear the group by moving to an empty-string group is not supported;
// use a dedicated "archived" group name instead.

Tickets submitted without WithGroup are ungrouped and never appear in any group query.

Finalizers

A finalizer is a ticket blocked until every pending member of a group has finished. Submit group members first, then submit the finalizer with AfterGroup:

groupID := "order-42-notifications"

for _, userID := range recipients {
    ticket, _ := lymbo.NewTicket(lymbo.MustID(), "send-notification")
    kh.Put(ctx, *ticket, lymbo.WithGroup(groupID), lymbo.WithPayload(userID))
}

finalizer, _ := lymbo.NewTicket(lymbo.MustID(), "notifications-complete")
kh.Put(ctx, *finalizer, lymbo.AfterGroup(groupID))

At submission time, the store snapshots all currently pending group members as dependencies. The finalizer is invisible to workers until every captured dependency reaches a terminal state. Tickets added to the group after the finalizer is submitted are not captured.

Notes:

  • Empty group (no pending members) → finalizer is immediately eligible.
  • WithDelay on the finalizer is evaluated independently of dependencies.
  • AfterGroup("g") + WithGroup("g") on the same Put returns ErrFinalizerInGroup.
  • Re-submitting a finalizer with the same ID is a no-op.

Tubes

Tubes route tickets to separate queues. A Kharon instance only polls tickets whose tube it is subscribed to. Tube routing must be explicitly enabled before subscriptions are allowed — otherwise the instance processes only the "default" tube.

Enable tubes at construction in one of two ways:

// Declaratively, with an initial subscription set:
kh := lymbo.NewKharon(store,
    lymbo.DefaultSettings().WithOnlyTubes("emails", "notifications"),
    slog.Default(),
)

// Or empty, to drive subscriptions dynamically at runtime:
kh := lymbo.NewKharon(store,
    lymbo.DefaultSettings().EnableTubes(),
    slog.Default(),
)

Manage subscriptions at runtime:

if err := kh.Subscribe([]string{"emails", "sms"}); err != nil {
    // returns lymbo.ErrTubesNotEnabled if the instance was built without
    // WithOnlyTubes/EnableTubes
}

if err := kh.Unsubscribe([]string{"sms"}); err != nil { /* ... */ }

current := kh.Tubes()                          // snapshot of subscribed tubes
ok := kh.IsSubscribedTo(lymbo.Tube("emails"))  // membership check

Behavior notes:

  • When tubes are disabled, Tubes() returns ["default"] and IsSubscribedTo only matches lymbo.DefaultTube. Subscribe/Unsubscribe return ErrTubesNotEnabled.
  • Empty tube names passed to Subscribe are dropped; duplicates are deduped.
  • An Unsubscribe that races with an in-flight poll is safe: tickets fetched for tubes the instance no longer owns are re-scheduled back to the store instead of being dispatched.
  • Use WithTube(tube) on lifecycle methods (e.g. Retry, Ack) to atomically move a ticket between tubes.

See examples/tubes for a two-instance ping-pong setup and examples/pubsub for a dynamic subscription sketch.

Configuration

settings := lymbo.DefaultSettings().
    WithWorkers(10).
    WithProcessTime(5 * time.Minute).
    WithBackoffBase(2.0).
    WithOnlyTubes("emails", "notifications")
Method Default Description
WithWorkers(n) 4 Concurrent ticket processors
WithBatchSize(n) 4 Deprecated; sizing now follows WithWorkers
WithProcessTime(d) 30s TTR before re-poll
WithBackoffBase(f) 1.5 Exponential backoff base
WithOnlyTubes(t...) "default" Tubes to process (also enables tube routing)
EnableTubes() off Enable tube routing without an initial subscription set
WithExpiration() enabled Auto-cleanup of expired tickets
WithoutExpiration() - Disable auto-cleanup

Storage

In-Memory

import "github.com/ochaton/lymbo/store/memory"

store := memory.NewStore()

PostgreSQL

import (
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/ochaton/lymbo/store/postgres"
)

pool, _ := pgxpool.New(ctx, "postgres://user:pass@localhost/dbname")

store := postgres.NewTicketsRepository(pool)
store.Migrate(ctx) // creates schema automatically

Custom table name:

store, _ := postgres.NewTicketsRepositoryWithConfig(postgres.Config{
    TableName: "my_tickets",
    Pool:      pool,
})
store.Migrate(ctx)

Custom Store

Implement the Store interface for your own backend:

type Store interface {
    Get(context.Context, TicketId) (Ticket, error)
    Put(context.Context, Ticket) error
    Delete(context.Context, TicketId) error
    Update(context.Context, TicketId, UpdateFunc) error
    DeleteBatch(ctx context.Context, ids []TicketId) ([]TransitionInfo, error)
    UpdateBatch(ctx context.Context, updates []UpdateSet) ([]TransitionInfo, error)
    PollPending(context.Context, PollRequest) (PollResult, error)
    ExpireTickets(ctx context.Context, limit int, now time.Time) ([]TransitionInfo, error)
    CountPendingInGroup(ctx context.Context, groupID string) (int, error)
    PutAfterGroup(ctx context.Context, ticket Ticket, groupID string) error
}

Prometheus Metrics

The github.com/ochaton/lymbo/prometheus module provides a collector. It is a separate Go module so importing lymbo does not pull in the prometheus client dependency.

go get github.com/ochaton/lymbo/prometheus
import (
    "github.com/prometheus/client_golang/prometheus"
    lymbo_prom "github.com/ochaton/lymbo/prometheus"
)

// Single instance, default registry:
prometheus.MustRegister(lymbo_prom.NewCollector(kh, nil))

// Multiple instances in the same process:
reg := prometheus.NewRegistry()
reg.MustRegister(lymbo_prom.NewCollector(khPinger, prometheus.Labels{"kharon": "pinger"}))
reg.MustRegister(lymbo_prom.NewCollector(khPonger, prometheus.Labels{"kharon": "ponger"}))

Exported metrics:

Metric Type Labels Description
kharon_tickets_total counter operation, type, tube Ticket operation counts per (type, tube) pair
kharon_workers_running gauge - Current active worker goroutines
kharon_task_process_duration_seconds histogram type, tube Processing duration per (type, tube) pair
kharon_queue_wait_duration_seconds histogram type, tube Queue wait time per (type, tube) pair

Use sum by (operation) for global totals, sum by (operation, type) for per-type, sum by (operation, tube) for per-tube.

A complete example with Docker Compose (Postgres + Prometheus + Grafana with pre-built dashboards) is in examples/prometheus/.

Examples

See examples/ for complete working examples:

  • basic — HTTP API, memory or postgres storage, stats logging
  • simple — Rate-limited ticket pusher with postgres
  • tubes — Ping-pong between two tubes with two Kharon instances
  • pubsub — Dynamic tube subscription with EnableTubes()
  • prometheus — Full observability stack with Grafana dashboards

License

MIT

About

Ticket system

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages