A lightweight, Redis-backed distributed job queue for Go.
QuantaQ is a simple yet powerful job queue built on top of Redis. It provides reliable job enqueuing, concurrent worker processing, automatic retries, and dead letter queue (DLQ) support — all with minimal setup.
- Redis-backed persistence — Jobs survive restarts and are shared across processes
- Concurrent worker pool — Process jobs in parallel with configurable concurrency
- Automatic retries — Failed jobs are re-enqueued up to a configurable max attempts
- Dead letter queue — Jobs exceeding max attempts are moved to DLQ for inspection
- Batch enqueuing — Enqueue multiple jobs in a single atomic Redis transaction
- Pluggable clock — Inject a custom clock for deterministic testing
- Pluggable metrics — Track enqueued, fetched, acked, nacked, and DLQ counts per queue
- Atomic operations — All state transitions use Redis transactions for consistency
go get github.com/zukreindev/quantaqRequirements: Go 1.25+ and a running Redis instance.
package main
import (
"context"
"fmt"
"log"
"time"
quantaqRedis "github.com/zukreindev/quantaq/internal/redis"
"github.com/zukreindev/quantaq"
)
func main() {
redisClient, err := quantaqRedis.NewClient("localhost:6379", "", 0)
if err != nil {
log.Fatal(err)
}
client := quantaq.NewClient(redisClient)
ctx := context.Background()
job, err := client.Enqueue(ctx, "email", []byte(`{"to":"user@example.com","subject":"Welcome!"}`), quantaq.EnqueueOptions{
MaxAttempts: 5,
RunAt: time.Now().Add(1 * time.Minute),
Metadata: map[string]string{
"content_type": "application/json",
},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Job enqueued: %s\n", job.ID)
}package main
import (
"context"
"fmt"
"log"
"os/signal"
"syscall"
"time"
quantaqRedis "github.com/zukreindev/quantaq/internal/redis"
"github.com/zukreindev/quantaq"
)
func main() {
redisClient, err := quantaqRedis.NewClient("localhost:6379", "", 0)
if err != nil {
log.Fatal(err)
}
client := quantaq.NewClient(redisClient)
worker := quantaq.NewWorker(client, quantaq.WorkerOptions{
Concurrency: 10,
PollInterval: 500 * time.Millisecond,
ShutdownTimeout: 30 * time.Second,
})
worker.RegisterHandler("email", func(ctx context.Context, job *quantaq.Job) error {
fmt.Printf("Processing job %s: %s\n", job.ID, string(job.Payload))
// Your processing logic here
return nil
})
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
fmt.Println("Worker started. Press Ctrl+C to stop.")
worker.Start(ctx)
}jobs := []quantaq.Job{
{Payload: []byte(`{"to":"alice@example.com"}`)},
{Payload: []byte(`{"to":"bob@example.com"}`)},
{Payload: []byte(`{"to":"charlie@example.com"}`)},
}
result, err := client.EnqueueBatch(ctx, "email", jobs, quantaq.EnqueueOptions{
MaxAttempts: 3,
})
// result contains all 3 jobs with generated IDs// Create a new client with optional configuration
client := quantaq.NewClient(redisClient, opts ...quantaq.ClientOption)| Method | Description |
|---|---|
Enqueue(ctx, queue, payload, opts) |
Enqueue a single job |
EnqueueBatch(ctx, queue, jobs, opts) |
Enqueue multiple jobs atomically |
Fetch(ctx, queue) |
Fetch and lease the next available job |
Ack(ctx, queue, jobID) |
Acknowledge a successfully processed job |
Nack(ctx, queue, jobID, errMsg) |
Reject a job (re-enqueue or move to DLQ) |
Cancel(ctx, jobID) |
Cancel a pending or leased job |
GetJob(ctx, jobID) |
Retrieve job details by ID |
QueueStats(ctx, queue) |
Get counts: ready, leased, failed |
PurgeQueue(ctx, queue) |
Delete all jobs in a queue |
// Inject a custom clock (useful for testing)
quantaq.WithClock(myClock)
// Inject a metrics collector
quantaq.WithMetrics(myCollector)worker := quantaq.NewWorker(client, quantaq.WorkerOptions{
Concurrency: 5, // goroutines per queue (default: 5)
PollInterval: time.Second, // polling frequency (default: 1s)
ShutdownTimeout: 30 * time.Second, // graceful shutdown timeout (default: 30s)
})
worker.RegisterHandler("queue_name", handlerFunc)
worker.Start(ctx)| Field | Type | Default | Description |
|---|---|---|---|
MaxAttempts |
int |
3 |
Maximum number of processing attempts |
RunAt |
time.Time |
zero | Scheduled execution time |
Metadata |
map[string]string |
nil |
Arbitrary key-value metadata |
Enqueue
│
▼
┌─────────┐ Fetch ┌─────────┐ Ack ┌─────────┐
│ ready │ ────────► │ leased │ ──────► │ acked │
└─────────┘ └─────────┘ └─────────┘
│
Nack │
▼
┌──────────────┐
│ attempts < max?│
└──────────────┘
yes │ │ no
▼ ▼
┌────────┐ ┌─────┐
│ ready │ │ dlq │
└────────┘ └─────┘
| Status | Description |
|---|---|
ready |
Waiting in queue to be processed |
leased |
Currently being processed by a worker |
acked |
Successfully processed |
failed |
Marked as failed |
canceled |
Canceled before completion |
dlq |
Moved to dead letter queue after exceeding max attempts |
| Key Pattern | Type | Description |
|---|---|---|
queue:{name}:waiting |
LIST | Jobs waiting to be processed |
queue:{name}:processing |
LIST | Jobs currently being processed |
queue:{name}:failed |
LIST | Jobs in the dead letter queue |
job:{id} |
HASH | Job data and status |
| Field | Description |
|---|---|
data |
JSON-serialized job object |
status |
Current job status string |
QuantaQ ships with a pluggable metrics system. Use the built-in InMemoryCollector or implement the Collector interface for custom integrations (Prometheus, StatsD, etc.).
collector := quantaq.NewInMemoryCollector()
client := quantaq.NewClient(redisClient, quantaq.WithMetrics(collector))
// After processing some jobs...
snapshot := collector.Snapshot()
for queue, m := range snapshot {
fmt.Printf("Queue %s: enqueued=%d fetched=%d acked=%d nacked=%d dlq=%d\n",
queue, m.Enqueued, m.Fetched, m.Acked, m.Nacked, m.DLQ)
}type Collector interface {
JobEnqueued(queue string)
JobFetched(queue string)
JobAcked(queue string)
JobNacked(queue string)
JobDLQ(queue string)
JobCanceled(queue string)
Snapshot() map[string]*QueueMetrics
}QuantaQ uses miniredis for in-memory Redis testing — no external Redis instance required.
go test ./... -vInject a mock clock for deterministic time-based tests:
type mockClock struct {
now time.Time
}
func (m *mockClock) Now() time.Time { return m.now }
mc := &mockClock{now: time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)}
client := quantaq.NewClient(redisClient, quantaq.WithClock(mc))quantaq/
├── client.go # Client: Enqueue, Cancel, GetJob, QueueStats, PurgeQueue
├── clock.go # Clock interface and RealClock implementation
├── job.go # Job model and status constants
├── metrics.go # Collector interface, InMemoryCollector, NoopCollector
├── processor.go # Worker pool: NewWorker, RegisterHandler, Start
├── worker.go # Fetch, Ack, Nack operations
├── internal/
│ └── redis/ # Redis client wrapper and key helpers
├── test/ # Unit tests (miniredis-based)
├── go.mod
└── README.md
MIT