Queue
Background job processing with Memory, Redis, and SQLite backends. Define jobs as classes, dispatch them anywhere, and process them with concurrent workers.
Introduction
@tekir/queue provides a Queue class that dispatches BaseJob subclasses to a backend and a Worker that polls the backend and executes jobs concurrently.
import { service } from '@tekir/core'
import type { Queue } from '@tekir/queue'
export const queue = service<Queue>('queue')Create config/queue.ts to choose a backend. Defaults to MemoryBackend:
export default {
driver: 'memory'
}For production with Redis (requires bun add @tekir/redis):
export default {
driver: 'redis'
}
// Uses config('redis') for connection options: see the Redis pageOr with a database table:
export default {
driver: 'database'
}
// Uses the registered 'db' service: add DatabaseProvider before QueueProviderRegister QueueProvider in your kernel:
import type { TekirApp } from '@tekir/core'
import { QueueProvider } from '@tekir/queue'
export default function({ app }: TekirApp) {
app.registerAll([QueueProvider])
}Defining Jobs
Extend BaseJob and implement handle(): Promise<void>. Any data you store as public properties is automatically serialized to JSON when the job is pushed, and re-hydrated when a worker picks it up. Call queue.register() so workers know which class to instantiate during deserialization.
import { BaseJob } from '@tekir/queue'
// A job is a class with a single async handle() method.
// Any constructor arguments become the job's payload.
class SendWelcomeEmail extends BaseJob {
constructor(
public readonly userId: number,
public readonly email: string,
) {
super()
}
async handle(): Promise<void> {
// Your logic here: runs inside the worker process.
await sendEmail(this.email, 'Welcome!')
}
}
// Register the class so workers can deserialize it by name.
queue.register(SendWelcomeEmail)Dispatching Jobs
queue.dispatch(job, options?) pushes a job onto the backend and returns a JobRecord with the generated id, status, and metadata.
import { queue } from '#services'
// Returns a JobRecord with the generated id and metadata.
const record = await queue.dispatch(new SendWelcomeEmail(42, '[email protected]'))
console.log(record.id) // '1712345678901-x8f3k2'
console.log(record.status) // 'pending'Delay
Pass delay in milliseconds to make a job invisible to workers until the delay has elapsed.
// Delay the job by 5 seconds before it becomes available to a worker.
await queue.dispatch(new SendWelcomeEmail(42, '[email protected]'), {
delay: 5000 // milliseconds
})Retry
Set attempts to allow the job to be retried after a failure. Workers apply exponential back-off: 2^attempt seconds.
// Allow up to 3 attempts before the job is moved to the failed list.
// Workers apply exponential back-off between retries: 2^attempt seconds.
await queue.dispatch(new ProcessPayment(orderId), {
attempts: 3
})Named Queues
Jobs default to the 'default' queue. Pass queue to route a job to a different named queue with its own worker and concurrency.
// Dispatch to a specific queue, useful for prioritisation.
await queue.dispatch(new SendWelcomeEmail(42, '[email protected]'), {
queue: 'emails'
})
await queue.dispatch(new GenerateReport(reportId), {
queue: 'heavy'
})
// Start a worker for each named queue
queue.worker('emails').concurrency(5).start()
queue.worker('heavy').concurrency(1).start()Bulk Dispatch
queue.bulk(jobs, options?) dispatches an array of jobs in parallel.
import { queue, db } from '#services'
// Dispatch an array of jobs in parallel.
const users = await db.query('SELECT id FROM users')
await queue.bulk(
users.map((u: any) => new SendNewsletter(u.id)),
{ queue: 'newsletters', attempts: 2 }
)Worker
queue.worker(queueName) returns the Worker instance for that queue. Call .start() to begin polling. queue.stop() waits for in-flight jobs before resolving.
import { queue } from '#services'
// queue.worker(queueName) returns (or creates) a Worker for that queue.
// Call .start() to begin polling.
queue.worker('default').start()
// Gracefully wait for in-flight jobs before the process exits.
process.on('SIGTERM', async () => {
await queue.stop()
process.exit(0)
})Concurrency
.concurrency(n) controls how many jobs run simultaneously. .pollInterval(ms) sets the time between polls when the queue is empty.
// Process up to 4 jobs simultaneously on the 'emails' queue.
queue.worker('emails').concurrency(4).start()
// Adjust the polling interval (default 500 ms) for less-busy queues.
queue.worker('heavy').concurrency(1).pollInterval(2000).start().concurrency(n) and .pollInterval(ms) reject non-positive and NaN values, so a misconfigured worker fails loudly instead of silently never pulling work.
Delivery Guarantees
Jobs are claimed atomically on the Redis and database backends, so two workers (or one worker with concurrency > 1) can never pick up the same job and process it twice. Retries reuse the existing record rather than re-inserting it, so a retried job never collides with its own id.
Claimed jobs are leased with a visibility timeout. If a worker crashes mid-job, the lease eventually expires and the job is automatically re-queued for another worker to pick up, so an in-flight job is not lost when a process dies.
Events
Queue extends EventEmitter. Listen to 'completed' and 'failed' for observability.
import { queue } from '#services'
// Queue extends EventEmitter: listen to lifecycle events.
queue.on('completed', ({ id, queue: q, payload }) => {
console.log(`Job ${id} on queue "${q}" completed`)
})
queue.on('failed', ({ id, queue: q, reason }) => {
console.error(`Job ${id} failed: ${reason}`)
})Failed Jobs
Jobs that exhaust their retry budget are recorded in a failed list. Inspect, retry, or purge them:
import { queue } from '#services'
// List all jobs that exhausted their retry budget.
const failures = await queue.failed()
for (const job of failures) {
console.log(job.id, job.failedReason)
}
// Re-queue a specific failed job by id.
await queue.retry(failures[0].id)
// Look up any job (pending, completed, or failed) by id.
const record = await queue.find('1712345678901-x8f3k2')
// Remove all pending jobs from a queue.
await queue.purge('emails')
// Count pending jobs.
const pending = await queue.size('default')Backends
Memory
MemoryBackend stores jobs in a Map. Data does not survive restarts. Default backend, ideal for development and tests.
import { Queue, MemoryBackend } from '@tekir/queue'
// The default QueueProvider uses MemoryBackend.
// Create an isolated instance for tests:
const q = new Queue(new MemoryBackend())
q.register(SendWelcomeEmail)
await q.dispatch(new SendWelcomeEmail(1, '[email protected]'))
console.log(await q.size()) // 1Redis
createRedisQueue(options) uses @tekir/redis (Bun's native Redis client). Delayed jobs use a sorted set and are promoted when their timestamp is reached. Job claiming is atomic and leased with a visibility timeout, so jobs are processed once and recovered if a worker crashes. Requires bun add @tekir/redis.
import { createRedisQueue } from '@tekir/queue'
// Uses @tekir/redis (Bun's native Redis client)
const q = await createRedisQueue({ url: 'redis://localhost:6379' })
q.register(SendWelcomeEmail)
await q.dispatch(new SendWelcomeEmail(1, '[email protected]'))
q.worker('default').concurrency(3).start()Database
createDatabaseQueue(db) takes a @tekir/db Database instance and creates the jobs table automatically. Jobs are claimed with an atomic conditional update so concurrent workers never process the same row twice, and a visibility timeout re-queues jobs left behind by a crashed worker. Suitable for single-server deployments without Redis.
import { createDatabaseQueue } from '@tekir/queue'
import { db } from '#services'
// Uses @tekir/db: the jobs table is created automatically on first use.
const q = createDatabaseQueue(db)
q.register(SendWelcomeEmail)
await q.dispatch(new SendWelcomeEmail(1, '[email protected]'))
q.worker('default').start()