RabbitMQ is a powerful message broker commonly used to decouple and scale microservices. While NestJS has several RabbitMQ libraries, some advanced use-cases (like partial-batch message consumption, dynamic data providers, or per-worker injection) can be tricky to implement with off-the-shelf solutions.
In this post, we’ll explore a custom NestJS RabbitMQ module and worker setup that handle advanced needs:
- Batch-based or individual message processing
- Dynamic module registration of workers and their data providers
- Automatic DLQ (Dead Letter Queue) handling for failed messages
- Partial-batch timer-based flush for leftover items in the queue
We’ll walk through the architecture, installation, usage, and how it compares to existing libraries.
Typical RabbitMQ libraries for NestJS (e.g., @golevelup/nestjs-rabbitmq) can handle most pub/sub needs, but we had extra requirements:
-
Worker Scalability
We wanted multiple “workers” with batch message processing, partial flush intervals, retries, and automatic DLQ for failed messages. -
Per-Worker Injection
Each worker might need different data providers (e.g., a Prisma-based provider or an API-based provider). We needed dynamic modules to handle tokens likePRISMA_SERVICEorAPI_URL. -
Advanced Queue Configuration
We wanted to keep control over queue assertions (max length, TTL, etc.) while still letting each worker do its own advanced logic (like partial-batch flush).
Rather than hack an existing library, we decided to build from scratch using NestJS dynamic modules and the amqp-connection-manager library for stability.
Here’s an overview of how everything fits together:
-
AppModule
- Imports the custom
RabbitMQModule(which sets up RabbitMQ connections and queues). - Optionally passes a “worker module” for advanced worker definitions.
- Imports the custom
-
RabbitMQModule
- Creates a globally available AMQP connection via
amqp-connection-manager. - Declares queue definitions (e.g.,
BENEFICIARY_QUEUE).
- Creates a globally available AMQP connection via
-
WorkerModule
- Dynamically registers workers (e.g.,
BeneficiaryWorker1,BeneficiaryWorker2) plus any needed data providers (PRISMA_SERVICE,API_URL). - Each worker can have a separate or shared data provider.
- Dynamically registers workers (e.g.,
-
BaseWorker
- An abstract class each worker extends.
- Handles batch consumption, requeues, timed partial flush, DLQ publishing, and other advanced logic.
-
DataProviderModule
- A separate dynamic module that can provide tokens like
PRISMA_SERVICEorDATA_PROVIDER. - Each worker can inject these tokens.
- A separate dynamic module that can provide tokens like
-
BeneficiaryWorker (an example worker)
- Extends
BaseWorker. - Overwrites
processItem(...)to handle logic for each message or batch fromBENEFICIARY_QUEUE.
- Extends
Below is a simple flow:
AppModule
└─> RabbitMQModule.register(...) // sets up AMQP_CONNECTION & queues
└─> WorkerModule.register(...)
└─> DataProviderModule (per worker or global)
└─> Workers (BeneficiaryWorker, etc.) each consume messages & process them
Using NestJS dynamic modules, we can create submodules for each worker. For instance:
WorkerModule.register({
globalDataProvider: {
prismaService: PrismaService, // provide PRISMA_SERVICE globally
},
workers: [
{
provide: 'BeneficiaryWorker1',
useClass: BeneficiaryWorker,
// optionally override dataProvider, apiUrl, etc.
},
],
});BaseWorkercollects messages in an array (batch) until either:- The batch hits a certain size (
defaultBatchSize), or - We’re in individual mode (ack each message immediately), or
- A timer flushes partial batches every X seconds if
acknowledgeMode === 'batch'.
- The batch hits a certain size (
- After up to 3 retries, if a message fails processing, it’s published to
dead_letter_queue.
- We can inject
PRISMA_SERVICEfor DB-based logic orAPI_URLfor an HTTP-based provider. - This per-worker approach is especially helpful for multi-tenant or multi-service setups.
- We can define multiple workers (e.g.,
'BeneficiaryWorker1','BeneficiaryWorker2') each pointing to the same or different queue definitions. - The logic that is in
BaseWorkercan be extended in each worker class, making code DRY and consistent.
Assume we publish this code as an npm package (e.g., @your-org/nest-rabbitmq). Then:
npm install @your-org/nest-rabbitmqIn your .env or environment, set:
RABBIT_MQ_URL=amqp://guest:guest@localhost
// app.module.ts
@Module({
imports: [
RabbitMQModule.register({
urls: [process.env.RABBIT_MQ_URL],
ampqProviderName: 'AMQP_CONNECTION',
queues: [{ name: 'BENEFICIARY_QUEUE', durable: true }],
workerModuleProvider: WorkerModule.register({
globalDataProvider: {
prismaService: PrismaService, // share Prisma across workers
},
workers: [
{
provide: 'BeneficiaryWorker1',
useClass: BeneficiaryWorker,
},
],
}),
}),
],
})
export class AppModule {}// beneficiary.rabbitmq.worker.ts
@Injectable()
export class BeneficiaryWorker extends BaseWorker<any> {
constructor(queueUtilsService: QueueUtilsService, @Inject('AMQP_CONNECTION') amqpConnection: any) {
// passing 'BENEFICIARY_QUEUE', defaultBatchSize = 10, mode='batch'
super(queueUtilsService, 'BENEFICIARY_QUEUE', 10, 'batch', amqpConnection);
}
protected async processItem(items: any[]) {
// Custom logic per item/batch
for (const item of items) {
console.log('Processing beneficiary item:', item);
// e.g. call Prisma or an API-based data provider
// ...
}
}
}Use the RabbitMQService to push messages:
// in some service or controller
constructor(private readonly rabbitMQService: RabbitMQService) {}
async createBeneficiary(beneficiary: any) {
await this.rabbitMQService.publishToQueue('BENEFICIARY_QUEUE', beneficiary);
console.log('Beneficiary data pushed to queue');
}Because you passed the queue definitions to RabbitMQModule:
{
name: 'BENEFICIARY_QUEUE',
durable: true,
options: {
// optional queue arguments
},
}The RabbitMQService automatically does channel.assertQueue(...) for each queue on startup.
If you want an API-based provider, e.g.:
@Global()
@Injectable()
export class BeneficiaryApiProvider implements IDataProvider {
constructor(@Inject('API_URL') private readonly apiUrl: string) {
// create axios instance, etc.
}
// ...
}Just pass it via DataProviderModule.register() or in the WorkerModule. Then in your worker, you can @Inject(DATA_PROVIDER) to get the instance.
-
Timer-Based Partial Flush
If you’re in'batch'mode,BaseWorkersets a flush timer. If the queue has <defaultBatchSizeitems for a while, it’ll still process them after the timer triggers. -
Retries and DLQ
Each message can retry up to 3 times. On final failure, it’s published to'dead_letter_queue'by default. You can overridepublishToDLQ(...)in your worker if you want to route it differently. -
Multiple Workers with Different Data Providers
- Worker1 might have
BeneficiaryPrismaProvider, Worker2 might haveBeneficiaryApiProvider. WorkerModule.register(...)can pass a differentworkerDataProviderorprismaServicefor each.
- Worker1 might have
When This Custom Approach Is Better:
- Complex Per-Worker Logic: If you need partial-batch flush, re-tries, DLQ logic baked into the worker, the built-in
BaseWorkerpattern is flexible. - Multiple Data Providers: You can dynamically inject
API_URLorPRISMA_SERVICEin different combos. - Advanced Multi-Module Setup: The “data provider module” approach ensures each worker can have unique config.
When Existing Libraries Are Simpler:
- If you only need a straightforward approach to consumer/producer logic and don’t require partial-batch or advanced injection, existing libraries like
@golevelup/nestjs-rabbitmqmight be faster to set up. - If you want a decorator-based approach (like
@RabbitSubscribe()), you might prefer a library that hides more of the details.
In this post, we demonstrated a custom NestJS + RabbitMQ architecture that solves advanced worker-related use cases:
-
Partial-Batch Logic
- Implement a timer-based flush for leftover messages.
- Expose a config option
acknowledgeMode('individual' | 'batch') in your worker constructor. - Add logic to track accumulated messages (
batch[]) until either:- the batch is full (
>= defaultBatchSize), or - the timer triggers (for partial flush).
- the batch is full (
- Provide a method (
flushBatch()) to easily force a batch flush programmatically, if needed.
-
Dynamic Worker Injection
- In your
WorkerModule.register(), allow each worker to specify adataProvideror tokens likePRISMA_SERVICE,API_URL. - For each worker:
- If
workerDataProvideris given, build a newDataProviderModuleinstance. - Otherwise, fallback to a global or default data provider.
- If
- Ensure the worker can
@Inject(DATA_PROVIDER)or@Inject(PRISMA_SERVICE)in its constructor if requested.
- In your
-
Multiple Retries & DLQ
- Decide on a max retry count (e.g., 3).
- In your
processBatch(), handle per-message retry logic:- If a message fails, increment
retryCount. - If
retryCount >= maxRetries, route the message todead_letter_queue.
- If a message fails, increment
- Provide a method
publishToDLQ()that your worker can override for custom DLQ routing. - Log warnings when messages reach final failures.
-
Concurrency & Prefetch
- In the base worker (
BaseWorker), expose a config forprefetchCount. - In
initializeWorker(), callchannel.prefetch(prefetchCount). - Consider letting each worker set its own concurrency or share a global concurrency policy.
- In the base worker (
-
Timed Flush & Exponential Backoff
- For partial-batch flush, define a
batchFlushIntervalin your worker (default 5 seconds). - If an item fails, implement exponential backoff (e.g., wait
2^attempt * 1000ms) before retrying or acknowledging failure.
- For partial-batch flush, define a
-
Configurable Requeue vs. DLQ
- Let the worker decide whether to immediately requeue or to do a slow requeue (with a delay).
- Provide a helper function for scheduling a delayed requeue, if desired.
-
Per-Worker Logging & Monitoring
- Add logs that include the worker name/ID, batch sizes, flush intervals, etc.
- Optionally expose a metrics interface (e.g., to track messages processed, failures, DLQ counts) for each worker.
-
Graceful Shutdown
- In
onModuleDestroy(), stop the flush timer (clearInterval) if in batch mode. - Wait for in-progress batches to finish before shutting down.
- Close channels & the AMQP connection after all items are processed or after a configurable grace period.
- In
-
Customizable Worker
- Provide an abstract
processItem(items: T | T[])method that the user must override to handle actual domain logic. - Consider hooks like
beforeBatch(items: T[]),afterBatch(items: T[])to allow advanced customization (e.g., stats or logs).
- Provide an abstract
-
Advanced Data Providers
- Support injection tokens for multiple providers (Prisma vs. HTTP API).
- Provide a fallback or default data provider if none is specified.
- Document how to create a custom provider class that implements
IDataProvideror a similar interface.
- Documentation & Examples
- Document each advanced feature (partial-batch flush, timed flush, dynamic injection, etc.) in the README or docs.
- Create a small example repo showing:
- A queue definition
- A custom worker with partial-batch logic
- Prisma or API-based data providers.
- Testing & CI
- Write unit tests for partial-batch logic (simulate fewer items than
defaultBatchSize& flush on timer). - Test DLQ logic by forcing message failures.
- Ensure concurrency (prefetch) tests handle multiple messages in parallel.
- Optionally run integration tests with a local RabbitMQ container (e.g., Docker) in CI.
- Versioning & Maintenance
- Use semantic versioning to manage changes in worker logic or data provider injection.
- Provide a CHANGELOG with each release if you publish as an open-source library.
- Keep the code modular so advanced worker logic is separated from the basic core module.
Completing the tasks in this to-do list will enable advanced worker-related use cases such as:
- Partial-batch or individual acknowledgment
- Retry attempts with exponential backoff and DLQ fallback
- Concurrency control with prefetch settings
- Dynamic injection of multiple data providers
- Timed flush for leftover batch items
- Try It: Check out the GitHub Repository for the full code, examples, and instructions.
- Contribute: If you find an issue or want a feature, open a PR or issue.
- Customization: Extend
BaseWorkerto add your own logic (e.g., scheduling, multiple queues in one worker, etc.).
We hope this helps devs needing advanced NestJS + RabbitMQ functionality beyond what typical libraries provide. Feel free to drop a comment or open an issue if you have questions or suggestions!
Thanks for reading, and happy queueing!