Distributed, Redis-backed task queue for modern C++17 applications with worker orchestration, scheduling, and tooling built-in.
- Overview
- Features
- Requirements
- Quickstart
- Example
- Running the Example
- Testing
- CLI
- Web UI
- Project Layout
- License
cppq is a header-only task queue that lets you enqueue work from C++ and execute it asynchronously via Redis. Workers pull jobs, run user supplied handlers, and update task state in Redis. The library focuses on predictable behaviour and ease of adoption—drop the header in your project, link a couple of dependencies, and you have a resilient queue.
- Producers enqueue
Task
objects into Redis lists and hashes. runServer
polls queues by priority, hands tasks to a thread pool, and dispatches registered handlers.- Recovery logic returns stuck tasks to the pending list after configurable timeouts.
- Auxiliary tooling (CLI, web dashboard) surfaces queue state and provides pause / resume controls.
- At-least-once delivery with retry tracking and task recovery.
- Scheduled execution (time-based today, cron support planned).
- Queue-level priorities and pause / unpause switches.
- Header-only API; only Redis, libuuid, and hiredis are required.
- Companion CLI and web dashboard for operations teams.
- Extensive Catch2 test suite covering queue flows and thread pool behaviour.
- Redis 6.0+ available on
redis://127.0.0.1:6379
(configurable).
- C++17-capable compiler (
g++
,clang++
). hiredis
client library.libuuid
for UUID generation.
Install dependencies with your package manager, e.g.
# Debian / Ubuntu
sudo apt install g++ libhiredis-dev uuid-dev
# Arch Linux
sudo pacman -S hiredis util-linux-libs
# macOS (Homebrew)
brew install hiredis ossp-uuid
- Copy
cppq.hpp
into your include path. - Include the header and link against hiredis, uuid, and pthread on POSIX systems:
g++ -std=c++17 your_app.cpp -I/path/to/cppq -lhiredis -luuid -lpthread
- Register task handlers before calling
runServer
. - Start Redis and execute your application.
#include "cppq.hpp"
#include <nlohmann/json.hpp>
// Specify task type name
const std::string TypeEmailDelivery = "email:deliver";
// Define a payload type for your task
typedef struct {
int UserID;
std::string TemplateID;
} EmailDeliveryPayload;
// Provide conversion to JSON (optional, you can use any kind of payload)
void to_json(nlohmann::json& j, const EmailDeliveryPayload& p) {
j = nlohmann::json{{"UserID", p.UserID}, {"TemplateID", p.TemplateID}};
}
// Helper function to create a new task with the given payload
cppq::Task NewEmailDeliveryTask(EmailDeliveryPayload payload) {
nlohmann::json j = payload;
// "10" is maxRetry -- the number of times the task will be retried on exception
return cppq::Task{TypeEmailDelivery, j.dump(), 10};
}
// The actual task code
void HandleEmailDeliveryTask(cppq::Task& task) {
// Fetch the parameters
nlohmann::json parsedPayload = nlohmann::json::parse(task.payload);
int userID = parsedPayload["UserID"];
std::string templateID = parsedPayload["TemplateID"];
// Send the email...
// Return a result
nlohmann::json r;
r["Sent"] = true;
task.result = r.dump();
}
int main(int argc, char* argv[]) {
// Register task types and handlers
cppq::registerHandler(TypeEmailDelivery, &HandleEmailDeliveryTask);
// Create a Redis connection for enqueuing, you can reuse this for subsequent enqueues
redisOptions redisOpts = {0};
REDIS_OPTIONS_SET_TCP(&redisOpts, "127.0.0.1", 6379);
redisContext* c = redisConnectWithOptions(&redisOpts);
if (c == nullptr || c->err) {
std::cerr << "Failed to connect to Redis" << std::endl;
return 1;
}
// Create tasks
cppq::Task task = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 666, .TemplateID = "AH"});
cppq::Task task2 = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 606, .TemplateID = "BH"});
cppq::Task task3 = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 666, .TemplateID = "CH"});
// Enqueue a task on default queue
cppq::enqueue(c, task, "default");
// Enqueue a task on high priority queue
cppq::enqueue(c, task2, "high");
// Enqueue a task on default queue to be run at exactly 1 minute from now
cppq::enqueue(
c,
task3,
"default",
cppq::scheduleOptions(std::chrono::system_clock::now() + std::chrono::minutes(1)));
// Pause queue to stop processing tasks from it
cppq::pause(c, "default");
// Unpause queue to continue processing tasks from it
cppq::unpause(c, "default");
// This call will loop forever checking the pending queue
// before being pushed back to pending queue (i.e. when worker dies in middle of execution).
cppq::runServer(redisOpts, {{"low", 5}, {"default", 10}, {"high", 20}}, 1000);
}
g++ -std=c++17 example.cpp -I. -lhiredis -luuid -lpthread -o example
./example
Ensure Redis is running locally before executing the binary.
The repository ships with Catch2 tests that exercise queue operations, scheduling, recovery, pause logic, and the internal thread pool.
- Install dependencies (besides hiredis/uuid you also need Catch2 headers and nlohmann-json).
- Start a local Redis instance:
redis-server --port 6379
. - Build the tests:
./build_tests.sh
- Run them:
./tests
The script emits useful tags such as [queue]
, [recovery]
, or [threadpool]
that you can pass to Catch2 to focus on specific areas.
The cli/
directory contains a Python-based management tool built with Click, Rich, and Redis-py.
cd cli
pip install -r requirements.txt
python3 main.py --help
python3 main.py queues --format json
Key capabilities:
- Inspect queues, task states, and statistics.
- Pause or resume queues.
- Dump task metadata for debugging.
- Load configuration from environment variables, CLI flags, or
~/.config/cppq/config.json
.
For deeper usage details see cli/README.md
.
A Next.js dashboard lives in web/
. It offers real-time monitoring, task inspection, and queue controls via a modern interface.
cd web
npm install
npm run dev
Visit http://localhost:3000/ and connect to your Redis instance (default redis://localhost:6379
). Read the dedicated web/README.md for screenshots, API routes, and deployment hints.
.
├── cppq.hpp # Header-only queue library
├── example.cpp # Minimal producer/worker demonstration
├── tests.cpp # Catch2 regression suite
├── build_tests.sh # Helper script for building tests
├── cli/ # Python CLI for operations
└── web/ # Next.js dashboard
cppq is released under the MIT License. The bundled thread pool implementation is adapted from https://github.com/bshoshany/thread-pool.