Queue

Background job processing with Memory, Redis, and SQLite backends. Define jobs as classes, dispatch them anywhere, and process them with concurrent workers.

Introduction

@tekir/queue provides a Queue class that dispatches BaseJob subclasses to a backend and a Worker that polls the backend and executes jobs concurrently.

services.ts
import { service } from '@tekir/core'
import type { Queue } from '@tekir/queue'

export const queue = service<Queue>('queue')

Create config/queue.ts to choose a backend. Defaults to MemoryBackend:

config/queue.ts
export default {
  driver: 'memory'
}

For production with Redis (requires bun add @tekir/redis):

config/queue.ts
export default {
  driver: 'redis'
}

// Uses config('redis') for connection options: see the Redis page

Or with a database table:

config/queue.ts
export default {
  driver: 'database'
}

// Uses the registered 'db' service: add DatabaseProvider before QueueProvider

Register QueueProvider in your kernel:

start/kernel.ts
import type { TekirApp } from '@tekir/core'
import { QueueProvider } from '@tekir/queue'

export default function({ app }: TekirApp) {
  app.registerAll([QueueProvider])
}

Defining Jobs

Extend BaseJob and implement handle(): Promise<void>. Any data you store as public properties is automatically serialized to JSON when the job is pushed, and re-hydrated when a worker picks it up. Call queue.register() so workers know which class to instantiate during deserialization.

import { BaseJob } from '@tekir/queue'

// A job is a class with a single async handle() method.
// Any constructor arguments become the job's payload.
class SendWelcomeEmail extends BaseJob {
  constructor(
    public readonly userId: number,
    public readonly email: string,
  ) {
    super()
  }

  async handle(): Promise<void> {
    // Your logic here: runs inside the worker process.
    await sendEmail(this.email, 'Welcome!')
  }
}

// Register the class so workers can deserialize it by name.
queue.register(SendWelcomeEmail)

Dispatching Jobs

queue.dispatch(job, options?) pushes a job onto the backend and returns a JobRecord with the generated id, status, and metadata.

import { queue } from '#services'

// Returns a JobRecord with the generated id and metadata.
const record = await queue.dispatch(new SendWelcomeEmail(42, '[email protected]'))

console.log(record.id)     // '1712345678901-x8f3k2'
console.log(record.status) // 'pending'

Delay

Pass delay in milliseconds to make a job invisible to workers until the delay has elapsed.

// Delay the job by 5 seconds before it becomes available to a worker.
await queue.dispatch(new SendWelcomeEmail(42, '[email protected]'), {
  delay: 5000 // milliseconds
})

Retry

Set attempts to allow the job to be retried after a failure. Workers apply exponential back-off: 2^attempt seconds.

// Allow up to 3 attempts before the job is moved to the failed list.
// Workers apply exponential back-off between retries: 2^attempt seconds.
await queue.dispatch(new ProcessPayment(orderId), {
  attempts: 3
})

Named Queues

Jobs default to the 'default' queue. Pass queue to route a job to a different named queue with its own worker and concurrency.

// Dispatch to a specific queue, useful for prioritisation.
await queue.dispatch(new SendWelcomeEmail(42, '[email protected]'), {
  queue: 'emails'
})

await queue.dispatch(new GenerateReport(reportId), {
  queue: 'heavy'
})

// Start a worker for each named queue
queue.worker('emails').concurrency(5).start()
queue.worker('heavy').concurrency(1).start()

Bulk Dispatch

queue.bulk(jobs, options?) dispatches an array of jobs in parallel.

import { queue, db } from '#services'

// Dispatch an array of jobs in parallel.
const users = await db.query('SELECT id FROM users')

await queue.bulk(
  users.map((u: any) => new SendNewsletter(u.id)),
  { queue: 'newsletters', attempts: 2 }
)

Worker

queue.worker(queueName) returns the Worker instance for that queue. Call .start() to begin polling. queue.stop() waits for in-flight jobs before resolving.

import { queue } from '#services'

// queue.worker(queueName) returns (or creates) a Worker for that queue.
// Call .start() to begin polling.
queue.worker('default').start()

// Gracefully wait for in-flight jobs before the process exits.
process.on('SIGTERM', async () => {
  await queue.stop()
  process.exit(0)
})

Concurrency

.concurrency(n) controls how many jobs run simultaneously. .pollInterval(ms) sets the time between polls when the queue is empty.

// Process up to 4 jobs simultaneously on the 'emails' queue.
queue.worker('emails').concurrency(4).start()

// Adjust the polling interval (default 500 ms) for less-busy queues.
queue.worker('heavy').concurrency(1).pollInterval(2000).start()

.concurrency(n) and .pollInterval(ms) reject non-positive and NaN values, so a misconfigured worker fails loudly instead of silently never pulling work.

Delivery Guarantees

Jobs are claimed atomically on the Redis and database backends, so two workers (or one worker with concurrency > 1) can never pick up the same job and process it twice. Retries reuse the existing record rather than re-inserting it, so a retried job never collides with its own id.

Claimed jobs are leased with a visibility timeout. If a worker crashes mid-job, the lease eventually expires and the job is automatically re-queued for another worker to pick up, so an in-flight job is not lost when a process dies.

Events

Queue extends EventEmitter. Listen to 'completed' and 'failed' for observability.

import { queue } from '#services'

// Queue extends EventEmitter: listen to lifecycle events.
queue.on('completed', ({ id, queue: q, payload }) => {
  console.log(`Job ${id} on queue "${q}" completed`)
})

queue.on('failed', ({ id, queue: q, reason }) => {
  console.error(`Job ${id} failed: ${reason}`)
})

Failed Jobs

Jobs that exhaust their retry budget are recorded in a failed list. Inspect, retry, or purge them:

import { queue } from '#services'

// List all jobs that exhausted their retry budget.
const failures = await queue.failed()

for (const job of failures) {
  console.log(job.id, job.failedReason)
}

// Re-queue a specific failed job by id.
await queue.retry(failures[0].id)

// Look up any job (pending, completed, or failed) by id.
const record = await queue.find('1712345678901-x8f3k2')

// Remove all pending jobs from a queue.
await queue.purge('emails')

// Count pending jobs.
const pending = await queue.size('default')

Backends

Memory

MemoryBackend stores jobs in a Map. Data does not survive restarts. Default backend, ideal for development and tests.

import { Queue, MemoryBackend } from '@tekir/queue'

// The default QueueProvider uses MemoryBackend.
// Create an isolated instance for tests:
const q = new Queue(new MemoryBackend())
q.register(SendWelcomeEmail)

await q.dispatch(new SendWelcomeEmail(1, '[email protected]'))
console.log(await q.size()) // 1

Redis

createRedisQueue(options) uses @tekir/redis (Bun's native Redis client). Delayed jobs use a sorted set and are promoted when their timestamp is reached. Job claiming is atomic and leased with a visibility timeout, so jobs are processed once and recovered if a worker crashes. Requires bun add @tekir/redis.

import { createRedisQueue } from '@tekir/queue'

// Uses @tekir/redis (Bun's native Redis client)
const q = await createRedisQueue({ url: 'redis://localhost:6379' })

q.register(SendWelcomeEmail)
await q.dispatch(new SendWelcomeEmail(1, '[email protected]'))
q.worker('default').concurrency(3).start()

Database

createDatabaseQueue(db) takes a @tekir/db Database instance and creates the jobs table automatically. Jobs are claimed with an atomic conditional update so concurrent workers never process the same row twice, and a visibility timeout re-queues jobs left behind by a crashed worker. Suitable for single-server deployments without Redis.

import { createDatabaseQueue } from '@tekir/queue'
import { db } from '#services'

// Uses @tekir/db: the jobs table is created automatically on first use.
const q = createDatabaseQueue(db)

q.register(SendWelcomeEmail)
await q.dispatch(new SendWelcomeEmail(1, '[email protected]'))
q.worker('default').start()