A Go library for delayed task processing and state reconciliation.
- Pluggable Storage: In-memory and PostgreSQL backends
- Priority Scheduling: Nice values (lower = higher priority)
- Retry Strategies: Fixed delays or exponential backoff
- Tubes: Route tickets to separate queues
- Groups: Track batches of related tickets and query their collective progress
- Finalizers: Run a ticket automatically after all members of a group have finished
- Automatic Expiration: Cleanup of completed/expired tickets
- Concurrent Processing: Configurable worker pools
- Prometheus Metrics: Per-type and per-tube counters, processing duration and queue wait histograms
go get github.com/ochaton/lymbopackage main
import (
"context"
"log/slog"
"github.com/ochaton/lymbo"
"github.com/ochaton/lymbo/store/memory"
)
func main() {
ctx := context.Background()
kh := lymbo.NewKharon(memory.NewStore(), lymbo.DefaultSettings(), slog.Default())
router := lymbo.NewRouter()
router.HandleFunc("example", func(ctx context.Context, t *lymbo.Ticket) error {
slog.Info("processing", "id", t.ID, "payload", t.Payload)
return kh.Ack(ctx, t.ID)
})
go kh.Run(ctx, router)
ticket, _ := lymbo.NewTicket(lymbo.MustID(), "example")
kh.Put(ctx, *ticket)
}ticket, err := lymbo.NewTicket(lymbo.MustID(), "task-type")
// With payload (auto-marshaled to JSON)
ticket.WithPayload(map[string]any{"key": "value"})
// With priority and delayed execution
ticket.WithNice(5).WithRunat(time.Now().Add(time.Hour))
// With tube routing
ticket, err := lymbo.NewTubeTicket("emails", lymbo.MustID(), "send-email")
// Put with options
kh.Put(ctx, *ticket,
lymbo.WithDelay(lymbo.FixedDelay(5*time.Minute)),
lymbo.WithNice(10),
)router := lymbo.NewRouter()
router.HandleFunc("email", func(ctx context.Context, t *lymbo.Ticket) error {
if err := sendEmail(t.Payload); err != nil {
if isTransient(err) {
return kh.Retry(ctx, t.ID, lymbo.WithDelay(lymbo.FixedDelay(5*time.Minute)))
}
return kh.Fail(ctx, t.ID, lymbo.WithErrorReason(err.Error()))
}
return kh.Ack(ctx, t.ID)
})
router.NotFoundFunc(func(ctx context.Context, t *lymbo.Ticket) error {
return kh.Fail(ctx, t.ID, lymbo.WithErrorReason("unsupported type"))
})| Method | Default Behavior | Notes |
|---|---|---|
Ack |
Removes ticket | Use WithKeep() to retain |
Done |
Keeps ticket (status=done) | Equivalent to Ack + WithKeep() |
Fail |
Keeps ticket (status=failed) | Use WithErrorReason() for details |
Cancel |
Removes ticket | Use WithKeep() to retain |
Retry |
Reschedules as pending | Increments attempts counter |
Delete |
Removes ticket | Permanent removal |
All methods accept options:
// Retry with exponential backoff
kh.Retry(ctx, id, lymbo.WithDelay(lymbo.BackoffDelay(1.5, 15*time.Second, 0)))
// Done with TTL (auto-removed after 24h)
kh.Done(ctx, id, lymbo.WithDelay(lymbo.FixedDelay(24*time.Hour)))
// Fail with error reason
kh.Fail(ctx, id,
lymbo.WithErrorReason("connection timeout"),
lymbo.WithDelay(lymbo.FixedDelay(7*24*time.Hour)),
)
// Cancel but keep for audit
kh.Cancel(ctx, id, lymbo.WithKeep(), lymbo.WithErrorReason("cancelled by user"))| Option | Description |
|---|---|
WithDelay(DelayStrategy) |
Set processing delay or TTL for auto-removal |
WithNice(n) |
Set priority (lower = higher) |
WithKeep() |
Keep ticket in store instead of removing |
WithErrorReason(reason) |
Store error/cancellation reason |
WithPayload(v) |
Set ticket payload |
WithTube(tube) |
Transfer ticket to another tube |
WithGroup(id) |
Assign or transfer ticket to a group |
WithUpdate(fn) |
Custom ticket modification (executed last) |
WithResetAttempts() |
Reset attempt counter |
AfterGroup(id) |
Register ticket as finalizer for a group (blocked until all current members finish) |
// Fixed delay
lymbo.FixedDelay(5 * time.Minute)
// Exponential backoff: base^attempts seconds, capped at maxDelay, with optional jitter
lymbo.BackoffDelay(1.5, 15*time.Second, 0)
lymbo.BackoffDelay(2.0, time.Minute, 500*time.Millisecond)A group is a named set of tickets that can be tracked collectively. Submit tickets into a group
with WithGroup, then poll the group to see how many are still pending or whether all have
reached a terminal state (Done, Failed, or Cancelled).
Groups are optional and persistent — the group_id is stored alongside each ticket so progress
survives process restarts.
// Create a group handle (lightweight, no DB write)
g := kh.Group("order-42-notifications")
// Submit tickets into the group
for _, userID := range recipients {
ticket, _ := lymbo.NewTicket(lymbo.MustID(), "send-notification")
kh.Put(ctx, *ticket,
lymbo.WithGroup(g.ID()),
lymbo.WithPayload(userID),
)
}
// Poll group progress from anywhere in your code
n, err := g.PendingCount(ctx) // tickets still pending
ok, err := g.AllTerminal(ctx) // true when none are pendingAlternatively, set the group directly on the ticket using the builder:
ticket.WithGroup("order-42-notifications")
kh.Put(ctx, *ticket)Transitioning between groups works the same way as transferring tubes — pass WithGroup to
any lifecycle method and the ticket moves to the new group atomically:
// Move a ticket to a different group on retry
kh.Retry(ctx, t.ID,
lymbo.WithGroup("retry-batch"),
lymbo.WithDelay(lymbo.FixedDelay(5*time.Minute)),
)
// Clear the group by moving to an empty-string group is not supported;
// use a dedicated "archived" group name instead.Tickets submitted without WithGroup are ungrouped and never appear in any group query.
A finalizer is a ticket blocked until every pending member of a group has finished. Submit group
members first, then submit the finalizer with AfterGroup:
groupID := "order-42-notifications"
for _, userID := range recipients {
ticket, _ := lymbo.NewTicket(lymbo.MustID(), "send-notification")
kh.Put(ctx, *ticket, lymbo.WithGroup(groupID), lymbo.WithPayload(userID))
}
finalizer, _ := lymbo.NewTicket(lymbo.MustID(), "notifications-complete")
kh.Put(ctx, *finalizer, lymbo.AfterGroup(groupID))At submission time, the store snapshots all currently pending group members as dependencies. The finalizer is invisible to workers until every captured dependency reaches a terminal state. Tickets added to the group after the finalizer is submitted are not captured.
Notes:
- Empty group (no pending members) → finalizer is immediately eligible.
WithDelayon the finalizer is evaluated independently of dependencies.AfterGroup("g")+WithGroup("g")on the samePutreturnsErrFinalizerInGroup.- Re-submitting a finalizer with the same ID is a no-op.
Tubes route tickets to separate queues. A Kharon instance only polls tickets whose tube it is
subscribed to. Tube routing must be explicitly enabled before subscriptions are allowed —
otherwise the instance processes only the "default" tube.
Enable tubes at construction in one of two ways:
// Declaratively, with an initial subscription set:
kh := lymbo.NewKharon(store,
lymbo.DefaultSettings().WithOnlyTubes("emails", "notifications"),
slog.Default(),
)
// Or empty, to drive subscriptions dynamically at runtime:
kh := lymbo.NewKharon(store,
lymbo.DefaultSettings().EnableTubes(),
slog.Default(),
)Manage subscriptions at runtime:
if err := kh.Subscribe([]string{"emails", "sms"}); err != nil {
// returns lymbo.ErrTubesNotEnabled if the instance was built without
// WithOnlyTubes/EnableTubes
}
if err := kh.Unsubscribe([]string{"sms"}); err != nil { /* ... */ }
current := kh.Tubes() // snapshot of subscribed tubes
ok := kh.IsSubscribedTo(lymbo.Tube("emails")) // membership checkBehavior notes:
- When tubes are disabled,
Tubes()returns["default"]andIsSubscribedToonly matcheslymbo.DefaultTube.Subscribe/UnsubscribereturnErrTubesNotEnabled. - Empty tube names passed to
Subscribeare dropped; duplicates are deduped. - An
Unsubscribethat races with an in-flight poll is safe: tickets fetched for tubes the instance no longer owns are re-scheduled back to the store instead of being dispatched. - Use
WithTube(tube)on lifecycle methods (e.g.Retry,Ack) to atomically move a ticket between tubes.
See examples/tubes for a two-instance ping-pong setup and examples/pubsub for a dynamic subscription sketch.
settings := lymbo.DefaultSettings().
WithWorkers(10).
WithProcessTime(5 * time.Minute).
WithBackoffBase(2.0).
WithOnlyTubes("emails", "notifications")| Method | Default | Description |
|---|---|---|
WithWorkers(n) |
4 | Concurrent ticket processors |
WithBatchSize(n) |
4 | Deprecated; sizing now follows WithWorkers |
WithProcessTime(d) |
30s | TTR before re-poll |
WithBackoffBase(f) |
1.5 | Exponential backoff base |
WithOnlyTubes(t...) |
"default" |
Tubes to process (also enables tube routing) |
EnableTubes() |
off | Enable tube routing without an initial subscription set |
WithExpiration() |
enabled | Auto-cleanup of expired tickets |
WithoutExpiration() |
- | Disable auto-cleanup |
import "github.com/ochaton/lymbo/store/memory"
store := memory.NewStore()import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ochaton/lymbo/store/postgres"
)
pool, _ := pgxpool.New(ctx, "postgres://user:pass@localhost/dbname")
store := postgres.NewTicketsRepository(pool)
store.Migrate(ctx) // creates schema automaticallyCustom table name:
store, _ := postgres.NewTicketsRepositoryWithConfig(postgres.Config{
TableName: "my_tickets",
Pool: pool,
})
store.Migrate(ctx)Implement the Store interface for your own backend:
type Store interface {
Get(context.Context, TicketId) (Ticket, error)
Put(context.Context, Ticket) error
Delete(context.Context, TicketId) error
Update(context.Context, TicketId, UpdateFunc) error
DeleteBatch(ctx context.Context, ids []TicketId) ([]TransitionInfo, error)
UpdateBatch(ctx context.Context, updates []UpdateSet) ([]TransitionInfo, error)
PollPending(context.Context, PollRequest) (PollResult, error)
ExpireTickets(ctx context.Context, limit int, now time.Time) ([]TransitionInfo, error)
CountPendingInGroup(ctx context.Context, groupID string) (int, error)
PutAfterGroup(ctx context.Context, ticket Ticket, groupID string) error
}The github.com/ochaton/lymbo/prometheus module provides a collector. It is a separate Go module so importing lymbo does not pull in the prometheus client dependency.
go get github.com/ochaton/lymbo/prometheusimport (
"github.com/prometheus/client_golang/prometheus"
lymbo_prom "github.com/ochaton/lymbo/prometheus"
)
// Single instance, default registry:
prometheus.MustRegister(lymbo_prom.NewCollector(kh, nil))
// Multiple instances in the same process:
reg := prometheus.NewRegistry()
reg.MustRegister(lymbo_prom.NewCollector(khPinger, prometheus.Labels{"kharon": "pinger"}))
reg.MustRegister(lymbo_prom.NewCollector(khPonger, prometheus.Labels{"kharon": "ponger"}))Exported metrics:
| Metric | Type | Labels | Description |
|---|---|---|---|
kharon_tickets_total |
counter | operation, type, tube |
Ticket operation counts per (type, tube) pair |
kharon_workers_running |
gauge | - | Current active worker goroutines |
kharon_task_process_duration_seconds |
histogram | type, tube |
Processing duration per (type, tube) pair |
kharon_queue_wait_duration_seconds |
histogram | type, tube |
Queue wait time per (type, tube) pair |
Use sum by (operation) for global totals, sum by (operation, type) for per-type, sum by (operation, tube) for per-tube.
A complete example with Docker Compose (Postgres + Prometheus + Grafana with pre-built dashboards) is in examples/prometheus/.
See examples/ for complete working examples:
- basic — HTTP API, memory or postgres storage, stats logging
- simple — Rate-limited ticket pusher with postgres
- tubes — Ping-pong between two tubes with two Kharon instances
- pubsub — Dynamic tube subscription with
EnableTubes() - prometheus — Full observability stack with Grafana dashboards
MIT