Skip to content
This repository was archived by the owner on Dec 7, 2025. It is now read-only.
/ worker Public archive

Log Worker that fills database in bulk

License

Notifications You must be signed in to change notification settings

bueapp/worker

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

37 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Bue Worker

A high-performance Go worker service that fetches events from Redis queues and bulk writes them to PostgreSQL with intelligent batching and error handling.

Features

  • πŸš€ High Performance: Concurrent processing of multiple event queues
  • πŸ“Š Adaptive Tuning: Dynamically adjusts batch sizes and intervals based on queue length
  • πŸ”„ Dead Letter Queue: Automatic retry mechanism for failed events
  • πŸ›‘οΈ Graceful Shutdown: Ensures all pending events are flushed before shutdown
  • πŸ“ JSON Logging: Structured logging with zerolog
  • 🐳 Docker Ready: Includes optimized multi-stage Dockerfile

Architecture

The worker processes three types of Discord events:

  • Event Logs (logs:events) - General Discord events
  • Guardian Logs (logs:guardian) - Moderation/guardian actions
  • Join Logs (logs:join) - User join/leave events

Events are:

  1. Fetched from Redis lists in batches
  2. Base64-decoded and parsed from JSON
  3. Bulk inserted into PostgreSQL tables
  4. Failed events are automatically moved to a DLQ for retry

Prerequisites

  • Go 1.23+
  • PostgreSQL 13+
  • Redis 6+

Installation

# Clone the repository
git clone https://github.com/bueapp/worker.git
cd worker

# Install dependencies
go mod download

# Build
go build -o worker ./cmd/worker

Configuration

Configure via environment variables (or .env file):

# Required
DATABASE_URL=postgresql://user:pass@localhost:5432/dbname
REDIS_URL=redis://default:pass@localhost:6379

# Optional
LOG_LEVEL=info           # debug, info, warn, error (default: info)
BATCH_SIZE=1000          # Base batch size (default: 1000)
FLUSH_INTERVAL=5         # Flush interval in seconds (default: 5)

Usage

Run Locally

# With environment variables
export DATABASE_URL="postgresql://..."
export REDIS_URL="redis://..."
./worker

# Or with .env file
./worker

Run with Docker

# Build image
docker build -t bue-worker .

# Run container
docker run --env-file .env bue-worker

Docker Compose

version: "3.8"

services:
  worker:
    build: .
    environment:
      DATABASE_URL: postgresql://postgres:password@postgres:5432/discord
      REDIS_URL: redis://redis:6379
      LOG_LEVEL: info
    depends_on:
      - postgres
      - redis
    restart: unless-stopped

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_PASSWORD: password
      POSTGRES_DB: discord
    volumes:
      - postgres-data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    volumes:
      - redis-data:/data

volumes:
  postgres-data:
  redis-data:

Database Schema

The worker automatically creates the following tables:

Events Table

CREATE TABLE events (
    id TEXT PRIMARY KEY,
    guild_id TEXT NOT NULL,
    channel_id TEXT,
    user_id TEXT,
    event_type TEXT NOT NULL,
    logged_at TIMESTAMPTZ NOT NULL,
    data JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

Guardian Logs Table

CREATE TABLE guardian_logs (
    id TEXT PRIMARY KEY,
    guild_id TEXT NOT NULL,
    user_id TEXT,
    action TEXT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    data JSONB,
    inserted_at TIMESTAMPTZ DEFAULT NOW()
);

Join Logs Table

CREATE TABLE join_logs (
    id TEXT PRIMARY KEY,
    guild_id TEXT NOT NULL,
    user_id TEXT NOT NULL,
    event_type TEXT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    data JSONB,
    inserted_at TIMESTAMPTZ DEFAULT NOW()
);

Adaptive Tuning

The worker automatically adjusts performance based on queue depth:

Queue Length Batch Size Multiplier Interval Multiplier
> 100,000 4x 0.5x (faster)
> 10,000 2x 0.5x (faster)
> 1,000 1x 1x (normal)
< 1,000 0.5x 2x (slower)

This ensures efficient processing during high load and resource conservation during low load.

Dead Letter Queue (DLQ)

Failed events are automatically:

  1. Base64-encoded and pushed to logs:failed Redis list
  2. Periodically retried by the DLQ reprocessor (every 5 seconds)
  3. Logged with error details

Monitoring

The worker provides structured JSON logs suitable for log aggregation systems:

{
  "level": "info",
  "time": 1700000000,
  "count": 1000,
  "batch_size": 1000,
  "interval": 5000000000,
  "message": "βœ… Flushed event logs"
}

Graceful Shutdown

When receiving SIGTERM or SIGINT:

  1. Stops accepting new work
  2. Flushes all remaining events from Redis
  3. Waits up to 30 seconds for completion
  4. Closes all connections cleanly

Development

Project Structure

.
β”œβ”€β”€ cmd/
β”‚   └── worker/          # Main application
β”‚       └── main.go
β”œβ”€β”€ internal/
β”‚   β”œβ”€β”€ config/          # Configuration
β”‚   β”œβ”€β”€ database/        # PostgreSQL operations
β”‚   β”œβ”€β”€ dlq/             # Dead letter queue handler
β”‚   β”œβ”€β”€ flush/           # Core flush logic
β”‚   β”œβ”€β”€ models/          # Data models
β”‚   β”œβ”€β”€ redis/           # Redis client
β”‚   └── tuning/          # Adaptive tuning
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ go.mod
β”œβ”€β”€ go.sum
└── README.md

Running Tests

go test ./...

Linting

golangci-lint run

License

MIT License - see LICENSE file for details

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

About

Log Worker that fills database in bulk

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published