A high-performance Go worker service that fetches events from Redis queues and bulk writes them to PostgreSQL with intelligent batching and error handling.
- π High Performance: Concurrent processing of multiple event queues
- π Adaptive Tuning: Dynamically adjusts batch sizes and intervals based on queue length
- π Dead Letter Queue: Automatic retry mechanism for failed events
- π‘οΈ Graceful Shutdown: Ensures all pending events are flushed before shutdown
- π JSON Logging: Structured logging with zerolog
- π³ Docker Ready: Includes optimized multi-stage Dockerfile
The worker processes three types of Discord events:
- Event Logs (
logs:events) - General Discord events - Guardian Logs (
logs:guardian) - Moderation/guardian actions - Join Logs (
logs:join) - User join/leave events
Events are:
- Fetched from Redis lists in batches
- Base64-decoded and parsed from JSON
- Bulk inserted into PostgreSQL tables
- Failed events are automatically moved to a DLQ for retry
- Go 1.23+
- PostgreSQL 13+
- Redis 6+
# Clone the repository
git clone https://github.com/bueapp/worker.git
cd worker
# Install dependencies
go mod download
# Build
go build -o worker ./cmd/workerConfigure via environment variables (or .env file):
# Required
DATABASE_URL=postgresql://user:pass@localhost:5432/dbname
REDIS_URL=redis://default:pass@localhost:6379
# Optional
LOG_LEVEL=info # debug, info, warn, error (default: info)
BATCH_SIZE=1000 # Base batch size (default: 1000)
FLUSH_INTERVAL=5 # Flush interval in seconds (default: 5)# With environment variables
export DATABASE_URL="postgresql://..."
export REDIS_URL="redis://..."
./worker
# Or with .env file
./worker# Build image
docker build -t bue-worker .
# Run container
docker run --env-file .env bue-workerversion: "3.8"
services:
worker:
build: .
environment:
DATABASE_URL: postgresql://postgres:password@postgres:5432/discord
REDIS_URL: redis://redis:6379
LOG_LEVEL: info
depends_on:
- postgres
- redis
restart: unless-stopped
postgres:
image: postgres:16-alpine
environment:
POSTGRES_PASSWORD: password
POSTGRES_DB: discord
volumes:
- postgres-data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis-data:/data
volumes:
postgres-data:
redis-data:The worker automatically creates the following tables:
CREATE TABLE events (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
channel_id TEXT,
user_id TEXT,
event_type TEXT NOT NULL,
logged_at TIMESTAMPTZ NOT NULL,
data JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);CREATE TABLE guardian_logs (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
user_id TEXT,
action TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB,
inserted_at TIMESTAMPTZ DEFAULT NOW()
);CREATE TABLE join_logs (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
user_id TEXT NOT NULL,
event_type TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB,
inserted_at TIMESTAMPTZ DEFAULT NOW()
);The worker automatically adjusts performance based on queue depth:
| Queue Length | Batch Size Multiplier | Interval Multiplier |
|---|---|---|
| > 100,000 | 4x | 0.5x (faster) |
| > 10,000 | 2x | 0.5x (faster) |
| > 1,000 | 1x | 1x (normal) |
| < 1,000 | 0.5x | 2x (slower) |
This ensures efficient processing during high load and resource conservation during low load.
Failed events are automatically:
- Base64-encoded and pushed to
logs:failedRedis list - Periodically retried by the DLQ reprocessor (every 5 seconds)
- Logged with error details
The worker provides structured JSON logs suitable for log aggregation systems:
{
"level": "info",
"time": 1700000000,
"count": 1000,
"batch_size": 1000,
"interval": 5000000000,
"message": "β
Flushed event logs"
}When receiving SIGTERM or SIGINT:
- Stops accepting new work
- Flushes all remaining events from Redis
- Waits up to 30 seconds for completion
- Closes all connections cleanly
.
βββ cmd/
β βββ worker/ # Main application
β βββ main.go
βββ internal/
β βββ config/ # Configuration
β βββ database/ # PostgreSQL operations
β βββ dlq/ # Dead letter queue handler
β βββ flush/ # Core flush logic
β βββ models/ # Data models
β βββ redis/ # Redis client
β βββ tuning/ # Adaptive tuning
βββ Dockerfile
βββ go.mod
βββ go.sum
βββ README.md
go test ./...golangci-lint runMIT License - see LICENSE file for details
Contributions are welcome! Please feel free to submit a Pull Request.