Flowdacity Queue (FQ) is an asyncio-friendly, rate-limited job queue built on Redis. It stores jobs per queue type and queue id, enforces per-queue dequeue intervals, automatically requeues expired jobs, and exposes metrics to understand throughput and queue depth.
- Per-queue rate limiting using millisecond intervals.
- Async Redis client with Lua scripts for predictable behavior.
- Automatic retries with configurable limits (including infinite retries).
- Metrics for enqueue/dequeue counts and queue lengths.
- Works with TCP or Unix socket Redis deployments and supports Redis Cluster.
- Python 3.12+
- Redis 7+ (run your own instance or start the bundled dev container)
From PyPI:
pip install flowdacity-queue
From source (editable):
pip install -e .
FQ reads a simple INI config file. Intervals are in milliseconds.
[fq]
job_expire_interval : 5000
job_requeue_interval : 5000
default_job_requeue_limit : -1 ; -1 retries forever, 0 means no retries
[redis]
db : 0
key_prefix : queue_server
conn_type : tcp_sock ; or unix_sock
host : 127.0.0.1
port : 6379
password :
clustered : false
unix_socket_path : /tmp/redis.sock
If you connect via Unix sockets, uncomment the
unixsocketlines in yourredis.conf:unixsocket /var/run/redis/redis.sock unixsocketperm 755
import asyncio
import uuid
from fd import FQ
async def main():
fq = FQ("config.conf")
await fq._initialize() # load config, connect to Redis, register Lua scripts
job_id = str(uuid.uuid4())
await fq.enqueue(
payload={"message": "hello, world"},
interval=1000, # ms between successful dequeues
job_id=job_id,
queue_id="user001",
queue_type="sms",
)
job = await fq.dequeue(queue_type="sms")
if job["status"] == "success":
# ...process job["payload"]...
await fq.finish(
queue_type="sms",
queue_id=job["queue_id"],
job_id=job["job_id"],
)
await fq.close()
asyncio.run(main())Common operations:
await fq.requeue()— move expired jobs back onto their queues.await fq.interval(interval=5000, queue_id="user001", queue_type="sms")— change a queue’s rate limit on the fly.await fq.metrics()— global metrics; passqueue_typeand/orqueue_idfor scoped stats and queue length.await fq.clear_queue(queue_type="sms", queue_id="user001", purge_all=True)— drop queued jobs and their payload/interval metadata.
- Start Redis for local development:
make redis(binds tolocalhost:6380; matchestests/test.conf). - Run the suite:
make test(requires the Redis instance above or another matching your config). - Build a wheel:
make build - Install/uninstall from the build:
make install/make uninstall - Stop the dev Redis container:
make redis-down
MIT — see LICENSE.txt.