A Python library for task management, service scheduling, and alerting. Convert functions into managed tasks with logging, alerts, and retries. Create systemd services that run on flexible schedules with resource constraints.
- Features
- Installation
- Quick Start
- Tasks
- Services
- Environments
- Resource Constraints
- CLI Reference
- Web UI
- API Server
- Security
- Logging & Monitoring
- Slack Bot
- Environment Variables
-
Tasks: Convert any Python function (sync or async) into a managed task with:
- Automatic retries on failure
- Configurable timeouts
- Alerts via Slack and Email
- Structured logging with Loki integration
- Context tracking with
get_current_task_id()
-
Services: Create systemd services with:
- Calendar-based scheduling (cron-like)
- Periodic scheduling with boot/login triggers
- Service dependencies and relationships
- Configurable restart policies
- Resource constraints (CPU, memory, I/O)
-
Environments: Run services in:
- Conda/Mamba virtual environments
- Docker containers with full configuration
- Named reusable environment configurations
-
Management: Control services via:
- CLI (
tfcommand) - Web UI with JWT authentication
- REST API
- Slack bot with interactive commands
- CLI (
pip install taskflows# Required for systemd integration
sudo apt install dbus libdbus-1-dev
# Enable user services to run without login
loginctl enable-lingerfrom taskflows import task, Alerts
from alerts import Slack
@task(
name="my-task",
retries=3,
timeout=60,
alerts=Alerts(
send_to=Slack(channel="alerts"),
send_on=["start", "error", "finish"]
)
)
async def process_data():
# Your code here
return "Done"
# Execute the task
if __name__ == "__main__":
process_data()from taskflows import Service, Calendar
srv = Service(
name="daily-job",
start_command="python /path/to/script.py",
start_schedule=Calendar("Mon-Fri 09:00 America/New_York"),
enabled=True, # Start on boot
)
srv.create()The @task decorator wraps any function with managed execution:
from taskflows import task, Alerts, get_current_task_id
from alerts import Slack, Email
@task(
name="data-pipeline", # Task identifier (default: function name)
required=True, # Raise exception on failure
retries=3, # Retry attempts on failure
timeout=300, # Timeout in seconds
alerts=Alerts(
send_to=[
Slack(channel="alerts"),
Email(
addr="sender@example.com",
password="...",
receiver_addr=["team@example.com"]
)
],
send_on=["start", "error", "finish"]
)
)
async def run_pipeline():
# Access current task ID for correlation
task_id = get_current_task_id()
print(f"Running task: {task_id}")
# ... your code ...Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
Function name | Unique task identifier |
required |
bool |
False |
If True, exceptions are re-raised after all retries |
retries |
int |
0 |
Number of retry attempts on failure |
timeout |
float |
None |
Execution timeout in seconds |
alerts |
Alerts |
None |
Alert configuration |
logger |
Logger |
Default | Custom logger instance |
Run functions as tasks without the decorator:
from taskflows import run_task
async def my_function(x, y):
return x + y
result = await run_task(
my_function,
name="add-numbers",
retries=2,
timeout=30,
x=1, y=2
)Configure when and where to send alerts:
from taskflows import Alerts
from alerts import Slack, Email
alerts = Alerts(
send_to=[
Slack(channel="critical"),
Email(
addr="sender@gmail.com",
password="app-password",
receiver_addr=["oncall@company.com"]
)
],
send_on=["start", "error", "finish"] # Events to trigger alerts
)Alert Events:
start: Task execution beginserror: An exception occurred (sent per retry)finish: Task execution completed (includes success/failure status)
Alerts include Grafana/Loki URLs for viewing task logs directly.
Services are systemd units that run commands on schedules:
from taskflows import Service, Calendar, Periodic, Venv
srv = Service(
# Identity
name="my-service",
description="Processes daily reports",
# Commands
start_command="python process.py",
stop_command="pkill -f process.py", # Optional
restart_command="python process.py reload", # Optional
# Scheduling
start_schedule=Calendar("Mon-Fri 09:00"),
stop_schedule=Calendar("Mon-Fri 17:00"), # Optional
restart_schedule=Periodic( # Optional
start_on="boot",
period=3600,
relative_to="finish"
),
# Environment
environment=Venv("myenv"), # Or DockerContainer, or named env string
working_directory="/app",
env={"DEBUG": "1"},
env_file="/path/to/.env",
# Behavior
enabled=True, # Auto-start on boot
timeout=300, # Max runtime in seconds
kill_signal="SIGTERM",
restart_policy="on-failure",
)
srv.create()Key Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
Service identifier |
start_command |
str | Callable |
Command or function to execute |
stop_command |
str |
Command to stop the service |
environment |
Venv | DockerContainer | str |
Execution environment |
start_schedule |
Calendar | Periodic |
When to start |
stop_schedule |
Schedule |
When to stop |
restart_schedule |
Schedule |
When to restart |
enabled |
bool |
Start on boot |
timeout |
int |
Max runtime (seconds) |
restart_policy |
str | RestartPolicy |
Restart behavior |
Run at specific times using systemd calendar syntax:
from taskflows import Calendar
# Daily at 2 PM Eastern
Calendar("Mon-Sun 14:00 America/New_York")
# Weekdays at 9 AM
Calendar("Mon-Fri 09:00")
# Specific days and time
Calendar("Mon,Wed,Fri 16:30:30")
# From a datetime object
from datetime import datetime, timedelta
Calendar.from_datetime(datetime.now() + timedelta(hours=1))Calendar Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
schedule |
str |
Required | Calendar expression |
persistent |
bool |
True |
Run on wake if missed |
accuracy |
str |
"1ms" |
Max deviation from scheduled time |
Run at intervals after a trigger:
from taskflows import Periodic
# Every 5 minutes after boot
Periodic(
start_on="boot", # "boot", "login", or "command"
period=300, # Interval in seconds
relative_to="finish", # "start" or "finish"
accuracy="1ms"
)Periodic Parameters:
| Parameter | Type | Description |
|---|---|---|
start_on |
Literal["boot", "login", "command"] |
Initial trigger |
period |
int |
Interval in seconds |
relative_to |
Literal["start", "finish"] |
Measure from start or finish |
accuracy |
str |
Max deviation |
Control service startup order and relationships:
srv = Service(
name="app-server",
start_command="./start.sh",
# Ordering
start_after=["database", "cache"], # Start after these
start_before=["monitoring"], # Start before these
# Dependencies
requires=["database"], # Fail if dependency fails
wants=["cache"], # Start together, don't fail if cache fails
binds_to=["database"], # Stop when database stops
part_of=["app-stack"], # Propagate stop/restart
# Failure handling
on_failure=["alert-service"], # Activate on failure
on_success=["cleanup-service"], # Activate on success
# Mutual exclusion
conflicts=["maintenance-mode"],
)Configure automatic restart behavior:
from taskflows import Service, RestartPolicy
# Simple string policy
srv = Service(
name="worker",
start_command="python worker.py",
restart_policy="always", # "no", "always", "on-failure", "on-abnormal", etc.
)
# Detailed policy
srv = Service(
name="worker",
start_command="python worker.py",
restart_policy=RestartPolicy(
condition="on-failure", # When to restart
delay=10, # Seconds between restarts
max_attempts=5, # Max restarts in window
window=300, # Time window in seconds
),
)Restart Conditions:
no: Never restartalways: Always restarton-success: Restart on clean exiton-failure: Restart on non-zero exiton-abnormal: Restart on signal/timeouton-abort: Restart on abort signalon-watchdog: Restart on watchdog timeout
Manage multiple services together:
from taskflows import Service, ServiceRegistry
registry = ServiceRegistry(
Service(name="web", start_command="./web.sh"),
Service(name="worker", start_command="./worker.sh"),
Service(name="scheduler", start_command="./scheduler.sh"),
)
# Add more services
registry.add(Service(name="monitor", start_command="./monitor.sh"))
# Bulk operations
registry.create() # Create all services
registry.start() # Start all services
registry.stop() # Stop all services
registry.restart() # Restart all services
registry.enable() # Enable all services
registry.disable() # Disable all services
registry.remove() # Remove all services
# Access individual services
registry["web"].logs()Run services in Conda/Mamba environments:
from taskflows import Service, Venv
srv = Service(
name="ml-pipeline",
start_command="python train.py",
environment=Venv("ml-env"), # Conda environment name
)Automatically detects Mamba, Miniforge, or Miniconda installations.
Run services in Docker containers:
from taskflows import Service, DockerContainer, DockerImage, Volume, CgroupConfig
# Using existing image
srv = Service(
name="api-server",
environment=DockerContainer(
image="python:3.11",
command="python app.py",
ports={"8080/tcp": 8080},
volumes=[
Volume(
host_path="/data",
container_path="/app/data",
read_only=False
)
],
environment={"ENV": "production"},
network_mode="bridge",
restart_policy="no", # Let systemd handle restarts
persisted=True, # Keep container between restarts
cgroup_config=CgroupConfig(
memory_limit=1024 * 1024 * 1024, # 1GB
cpu_quota=50000, # 50% CPU
),
),
)
# Building from Dockerfile
srv = Service(
name="custom-app",
environment=DockerContainer(
image=DockerImage(
tag="myapp:latest",
path="/path/to/app",
dockerfile="Dockerfile",
),
command="./start.sh",
),
)DockerContainer Parameters:
| Parameter | Type | Description |
|---|---|---|
image |
str | DockerImage |
Image name or build config |
command |
str | Callable |
Command to run |
name |
str |
Container name (auto-generated if not set) |
persisted |
bool |
Keep container between restarts |
ports |
dict |
Port mappings |
volumes |
list[Volume] |
Volume mounts |
environment |
dict |
Environment variables |
network_mode |
str |
Network mode |
cgroup_config |
CgroupConfig |
Resource limits |
Store reusable environment configurations:
from taskflows import Service
# Reference a named environment by string
srv = Service(
name="my-service",
start_command="python app.py",
environment="production-docker", # Named environment
)Create named environments via the Web UI or API. They store complete Venv or DockerContainer configurations that can be reused across services.
Define services in YAML files for easy configuration management. Types are automatically inferred from content, so you don't need explicit type fields.
Each Python object has an equivalent YAML representation.
YAML files support human-readable memory sizes and time durations for improved readability:
Memory Sizes (for memory_limit, memory_high, amount, etc.):
cgroup_config:
memory_limit: 2GB # Instead of 2147483648
memory_high: 1.5GB # Supports decimals
memory_low: 512MB
memory_min: 256M # Short form also works
startup_requirements:
- amount: 8GB # Human-readable Memory constraint
constraint: ">="Supported units (case-insensitive): B, K/KB, M/MB, G/GB, T/TB
Time Durations (for timeout, period, delay, window, etc.):
timeout: 5m # Instead of 300
restart_schedule:
period: 1h # Instead of 3600
start_on: boot
restart_policy:
condition: on-failure
delay: 30s
window: 5min
max_attempts: 5Supported units (case-insensitive): s/sec/seconds, m/min/minutes, h/hr/hours, d/day/days, w/week/weeks
You can mix human-readable and numeric values in the same file - numeric values are always interpreted as bytes (for memory) or seconds (for time).
Venv (inferred from env_name):
Venv(env_name="myenv")environment:
env_name: myenvDockerContainer (inferred from image):
DockerContainer(
image="python:3.11",
name="my-container",
ports={"8000/tcp": 8000},
volumes=[Volume(host_path="/data", container_path="/app/data")],
environment={"DEBUG": "false"},
)environment:
image: python:3.11
name: my-container
ports:
8000/tcp: 8000
volumes:
- /data:/app/data
environment:
DEBUG: "false"Calendar (inferred from schedule key, or use a string directly):
Calendar(schedule="Mon-Fri 09:00", persistent=True)# As a dict:
start_schedule:
schedule: Mon-Fri 09:00
persistent: true
# Or simply as a string:
start_schedule: "Mon-Fri 09:00"Periodic (inferred from period):
Periodic(start_on="boot", period=3600, relative_to="finish")start_schedule:
start_on: boot
period: 3600
relative_to: finishRestartPolicy (inferred from condition, or use a string for simple policies):
RestartPolicy(condition="on-failure", delay=10, max_attempts=5)# As a dict:
restart_policy:
condition: on-failure
delay: 10
max_attempts: 5
# Or simply as a string:
restart_policy: on-failureCgroupConfig (inferred from resource limit keys):
CgroupConfig(
memory_limit=2147483648, # 2GB
cpu_quota=50000,
pids_limit=100,
)cgroup_config:
memory_limit: 2GB # Human-readable (or 2147483648)
cpu_quota: 50000
pids_limit: 100Volume (string format or dict with host_path + container_path):
Volume(host_path="/data", container_path="/app/data", read_only=False)
Volume(host_path="/config", container_path="/app/config", read_only=True)# Simple string format (like Docker):
volumes:
- /data:/app/data
- /config:/app/config:ro
- /logs:/app/logs:rw
# Or as dicts:
volumes:
- host_path: /data
container_path: /app/data
read_only: falseMemory Constraint (inferred from amount + constraint):
Memory(amount=1073741824, constraint=">=") # 1GBstartup_requirements:
- amount: 1GB # Human-readable (or 1073741824)
constraint: ">="CPUPressure Constraint (inferred from max_percent):
CPUPressure(max_percent=80, timespan="5min")startup_requirements:
- max_percent: 80
timespan: 5minPython:
from taskflows import Service, Venv, CgroupConfig
from taskflows.schedule import Calendar
service = Service(
name="api-server",
start_command="uvicorn app:main",
description="FastAPI backend",
environment=Venv(env_name="api-env"),
start_schedule=Calendar(schedule="Mon-Fri 09:00"),
restart_policy="on-failure",
timeout=300, # 5 minutes
cgroup_config=CgroupConfig(memory_limit=2147483648), # 2GB
env={"DEBUG": "false"},
enabled=True,
)YAML:
services:
- name: api-server
start_command: uvicorn app:main
description: FastAPI backend
environment:
env_name: api-env
start_schedule: "Mon-Fri 09:00"
restart_policy: on-failure
timeout: 5m # Human-readable time
cgroup_config:
memory_limit: 2GB # Human-readable memory
env:
DEBUG: "false"
enabled: truefrom taskflows.serialization import load_services_from_yaml
# Load services from YAML
services = load_services_from_yaml("services.yaml")
# Create all services
for service in services:
await service.create()
# Or use ServiceRegistry for batch operations
from taskflows import ServiceRegistry
registry = ServiceRegistry(*services)
await registry.create()
await registry.start()ServiceRegistry also supports direct serialization:
from taskflows import Service, ServiceRegistry
# Create a registry
registry = ServiceRegistry(
Service(name="web", start_command="python web.py"),
Service(name="worker", start_command="python worker.py"),
)
# Save to YAML file
registry.to_file("services.yaml")
# Load from file
restored = ServiceRegistry.from_file("services.yaml")
# Or serialize to string
yaml_str = registry.to_yaml()
json_str = registry.to_json()Export existing services to YAML:
from taskflows import Service
from taskflows.serialization import save_services_to_yaml
services = [
Service(name="web", start_command="python web.py"),
Service(name="worker", start_command="python worker.py"),
]
save_services_to_yaml(services, "my-services.yaml")Individual services also support serialization:
from taskflows import Service
srv = Service(
name="my-service",
start_command="python app.py",
description="My application",
)
# Serialize to YAML/JSON
yaml_str = srv.to_yaml()
json_str = srv.to_json()
# Save to file (format inferred from extension)
srv.to_file("service.yaml")
srv.to_file("service.json")
# Load from file
restored = Service.from_file("service.yaml")Require minimum hardware before starting:
from taskflows import Service, Memory, CPUs
srv = Service(
name="ml-training",
start_command="python train.py",
startup_requirements=[
Memory(amount=8 * 1024**3, constraint=">="), # 8GB RAM
CPUs(amount=4, constraint=">="), # 4+ CPUs
],
)Constraint Operators: <, <=, =, !=, >=, >
Set silent=True to skip silently instead of failing:
Memory(amount=16 * 1024**3, constraint=">=", silent=True)Wait for system load to be acceptable:
from taskflows import Service, CPUPressure, MemoryPressure, IOPressure
srv = Service(
name="batch-job",
start_command="python process.py",
startup_requirements=[
CPUPressure(max_percent=80, timespan="5min"),
MemoryPressure(max_percent=70, timespan="1min"),
IOPressure(max_percent=90, timespan="10sec"),
],
)Timespan Options: "10sec", "1min", "5min"
Fine-grained resource control for services and containers:
from taskflows import Service, CgroupConfig
srv = Service(
name="limited-service",
start_command="python app.py",
cgroup_config=CgroupConfig(
# CPU limits
cpu_quota=50000, # Microseconds per period (50% of 1 CPU)
cpu_period=100000, # Period in microseconds (default 100ms)
cpu_shares=512, # Relative weight
cpuset_cpus="0-3", # Pin to CPUs 0-3
# Memory limits
memory_limit=2 * 1024**3, # 2GB hard limit
memory_high=1.5 * 1024**3, # 1.5GB soft limit
memory_swap_limit=4 * 1024**3,
# I/O limits
io_weight=100, # I/O priority (1-10000)
device_read_bps={"/dev/sda": 100 * 1024**2}, # 100MB/s read
device_write_bps={"/dev/sda": 50 * 1024**2}, # 50MB/s write
# Process limits
pids_limit=100, # Max processes
# Security
oom_score_adj=500, # OOM killer priority
cap_drop=["NET_RAW"], # Drop capabilities
),
)The tf command provides service management:
# Service discovery
tf list [PATTERN] # List services matching pattern
tf status [-m PATTERN] [--running] [--all] # Show service status
tf history [-l LIMIT] [-m PATTERN] # Show task history
tf logs SERVICE [-n LINES] # View service logs
tf show PATTERN # Show service file contents
# Service control (PATTERN matches service names)
tf create SEARCH_IN [-i INCLUDE] [-e EXCLUDE] # Create services from Python file/directory
tf start PATTERN [-t/--timers] [--services] # Start matching services/timers
tf stop PATTERN [-t/--timers] [--services] # Stop matching services/timers
tf restart PATTERN # Restart matching services
tf enable PATTERN [-t/--timers] [--services] # Enable auto-start
tf disable PATTERN [-t/--timers] [--services] # Disable auto-start
tf remove PATTERN # Remove matching services
# Multi-server (with -s/--server)
tf list -s server1 -s server2
tf status --server prod-host
tf start my-service -s prod-host# Start/stop API server (runs as systemd service)
tf api start
tf api stop
tf api restart
# Setup web UI authentication (interactive, file-based)
tf api setup-ui --username adminTo enable the web UI, set the environment variable before starting:
export TASKFLOWS_ENABLE_UI=1
tf api startAlternatively, use environment variables for Docker/automation:
export TF_JWT_SECRET=$(tf api generate-secret)
export TF_ADMIN_USER=admin
export TF_ADMIN_PASSWORD=yourpassword
export TASKFLOWS_ENABLE_UI=1
tf api startOr run the API directly (not as a service):
_start_srv_api --enable-ui# Setup HMAC authentication
tf api security setup [-r/--regenerate-secret]
tf api security status
tf api security disable
tf api security set-secret SECRETA modern React SPA located in frontend/.
cd frontend
# Install dependencies
npm install
# Development (with hot reload)
npm run dev
# Production build
npm run buildDevelopment mode:
# Terminal 1: Start the API server
tf api start
# Terminal 2: Start React dev server (proxies API to localhost:7777)
cd frontend && npm run devAccess at http://localhost:3000
Production mode:
# Build the frontend
cd frontend && npm run build
# Start API server with UI enabled (serves from frontend/dist/)
export TASKFLOWS_ENABLE_UI=1
tf api startAccess at http://localhost:7777
- React 19 + TypeScript + Vite
- React Router v7 (protected routes)
- Zustand (auth, UI state)
- React Query (server state with polling)
- TailwindCSS 4
See frontend/README.md for detailed documentation.
- Dashboard: Real-time service status with auto-refresh
- Multi-select: Select and operate on multiple services
- Search: Filter services by name
- Batch Operations: Start/stop/restart multiple services
- Log Viewer: Search and auto-scroll logs
- Named Environments: Create and manage reusable environments
The API server provides REST endpoints for service management.
tf api start # Default port 7777
TASKFLOWS_ENABLE_UI=1 tf api start # With web UI| Method | Endpoint | Description |
|---|---|---|
| GET | /services |
List all services |
| GET | /services/{name}/status |
Get service status |
| POST | /services/{name}/start |
Start service |
| POST | /services/{name}/stop |
Stop service |
| POST | /services/{name}/restart |
Restart service |
| GET | /services/{name}/logs |
Get service logs |
| GET | /environments |
List named environments |
| POST | /environments |
Create environment |
The API uses HMAC-SHA256 authentication. Include these headers:
X-HMAC-Signature: <signature>
X-HMAC-Timestamp: <unix-timestamp>
Taskflows implements multiple security layers to protect against common vulnerabilities and unauthorized access.
Secure API communication with HMAC-SHA256 request signing:
# Initial setup
tf api security setup
# View settings
tf api security status
# Regenerate secret (requires client restart)
tf api security setup --regenerate-secretConfiguration stored in ~/.services/security.json.
How it works:
- Shared secret distributed to authorized clients
- Each request signed with HMAC-SHA256(secret, timestamp + body)
- Server validates signature and timestamp (5-minute window)
- Prevents replay attacks and request tampering
Protected Operations:
- Service start/stop/restart
- Service creation/removal
- Environment management
The web UI uses JWT tokens with bcrypt password hashing. There are two methods to configure authentication:
Method 1: File-based (Interactive Setup)
tf api setup-ui --username admin
# Prompts for password interactivelyConfiguration stored in ~/.taskflows/data/ui_config.json and ~/.taskflows/data/users.json.
Method 2: Environment Variables (Docker/Automation)
# Generate a JWT secret
export TF_JWT_SECRET=$(tf api generate-secret)
export TF_ADMIN_USER=admin
export TF_ADMIN_PASSWORD=yourpassword
export TASKFLOWS_ENABLE_UI=1
tf api startEnvironment variables take precedence over file-based configuration.
Token Features:
- Bcrypt hashed passwords (12 rounds) for file-based auth
- 1-hour token expiration
- Automatic refresh on activity
- Secure HTTP-only cookies (when HTTPS enabled)
Taskflows validates all user input to prevent injection attacks:
All file paths (env_file, working directories) are validated:
# ✅ Safe - absolute path validated
Service(name="my-service", env_file="/home/user/app/.env")
# ❌ Blocked - directory traversal attempt
Service(name="bad", env_file="../../../etc/passwd") # Raises SecurityError
# ❌ Blocked - symlink escape
Service(name="bad", env_file="/tmp/link-to-etc-passwd") # Raises SecurityErrorProtection mechanisms:
- Resolves to absolute paths
- Checks against allowed directories
- Detects and blocks symlink escapes
- Prevents
..path components
Service names are sanitized to prevent injection:
# ✅ Safe - alphanumeric, dashes, dots, underscores
Service(name="my-service-v2.0_prod")
# ❌ Blocked - path characters
Service(name="../malicious") # Raises SecurityError
Service(name="/etc/passwd") # Raises SecurityError
# ❌ Blocked - special characters
Service(name="bad; rm -rf /") # Raises SecurityErrorAllowed characters: [a-zA-Z0-9._-]+ only
Docker commands are strictly validated using shell quoting:
# ✅ Safe - properly quoted
DockerContainer(command='python script.py --arg "value with spaces"')
# ❌ Rejected - malformed quotes
DockerContainer(command='python script.py --arg "unterminated') # Raises ValueErrorProtection: Uses Python's shlex.split() with no unsafe fallback
Best Practices:
-
Never commit secrets to version control
# Use .env files (add to .gitignore) echo "API_KEY=secret123" > .env # Reference in service Service(name="app", env_file=".env")
-
Use environment variables for sensitive configuration
import os Service( name="app", environment={ "DB_PASSWORD": os.getenv("DB_PASSWORD"), "API_KEY": os.getenv("API_KEY"), } )
-
Restrict file permissions
chmod 600 ~/.services/security.json chmod 600 .env -
Rotate secrets regularly
tf api security setup --regenerate-secret
When using docker_container, the service accesses Docker's Unix socket (/var/run/docker.sock), which grants:
- Ability to run containers as root
- Access to host filesystem via volume mounts
- Network configuration capabilities
Mitigation strategies:
-
Principle of least privilege - only use Docker when necessary
# Prefer direct execution Service(name="app", exec_start="python app.py") # Only containerize when isolation needed Service(name="app", docker_container=DockerContainer(...))
-
Resource limits - constrain container resources
DockerContainer( name="app", cgroup=CgroupConfig( memory_limit=1 * 1024**3, # 1 GB max cpu_quota=100000, # 1 CPU max pids_limit=100, # Max 100 processes read_only_rootfs=True, # Immutable filesystem ) )
-
Drop capabilities - remove unnecessary Linux capabilities
DockerContainer( name="app", cgroup=CgroupConfig( cap_drop=["ALL"], # Drop all capabilities cap_add=["NET_BIND_SERVICE"], # Only add what's needed ) )
-
Network isolation - use custom networks
DockerContainer(name="app", network_mode="isolated_net")
- HMAC authentication enabled for API
- Strong passwords for web UI (12+ characters)
- Secrets in environment variables or
.envfiles -
.envfiles in.gitignore - File permissions:
chmod 600on sensitive files - Regular secret rotation schedule
- Docker used only when necessary
- Resource limits on all Docker containers
- Capabilities dropped on Docker containers
- Review service permissions (user/group)
For security vulnerabilities, please do not open a public issue. Instead:
- Email security concerns to: [maintainer email]
- Include detailed reproduction steps
- Allow 90 days for patch before disclosure
- OWASP Top 10
- Docker Security Best Practices
- systemd Security Features
- Python Security Best Practices
Application (structlog) → journald → Fluent Bit → Loki → Grafana
from loggers import configure_loki_logging, get_struct_logger
configure_loki_logging(
app_name="my-service",
environment="production",
log_level="INFO",
)
logger = get_struct_logger("my_module")
logger.info("user_action", user_id=123, action="login")# All logs for a service
{service_name=~".*my-service.*"}
# Errors only
{service_name=~".*my-service.*"} |= "ERROR"
# By app and environment
{app="my-service", environment="production"}
# Parse JSON and filter
{app="my-service"} | json | context_duration_ms > 1000
Task alerts include Grafana URLs with pre-configured Loki queries for viewing:
- Task execution logs
- Error traces
- Historical runs
Send task alerts and notifications to Slack channels.
-
Create a Slack app at https://api.slack.com/apps
-
Add OAuth scopes:
chat:write,chat:write.publicfiles:write
-
Install the app to your workspace and get the Bot Token
-
Set the environment variable:
export SLACK_BOT_TOKEN=xoxb-...
from taskflows import task, Alerts
from taskflows.alerts import Slack
@task(
name="my-task",
alerts=Alerts(
send_to=Slack(channel="alerts"),
send_on=["start", "error", "finish"]
)
)
async def my_task():
# Your code here
passfrom taskflows.alerts.slack import send_slack_message
from taskflows.alerts.components import Text, Table
await send_slack_message(
channel="alerts",
subject="Task Complete",
content=[Text("Processing finished successfully")],
)| Variable | Description | Default |
|---|---|---|
TASKFLOWS_ENABLE_UI |
Enable web UI serving | 0 |
TASKFLOWS_DISPLAY_TIMEZONE |
Display timezone | UTC |
TASKFLOWS_FLUENT_BIT |
Fluent Bit endpoint | localhost:24224 |
TASKFLOWS_GRAFANA |
Grafana URL | localhost:3000 |
TASKFLOWS_GRAFANA_API_KEY |
Grafana API key | - |
TASKFLOWS_LOKI_URL |
Loki URL | http://localhost:3100 |
LOKI_HOST |
Loki host | localhost |
LOKI_PORT |
Loki port | 3100 |
ENVIRONMENT |
Environment name | production |
APP_NAME |
Application name | - |
| Variable | Description | Default |
|---|---|---|
SLACK_BOT_TOKEN |
Slack Bot OAuth token | - |
SLACK_ATTACHMENT_MAX_SIZE_MB |
Max attachment size in MB | 20 |
SLACK_INLINE_TABLES_MAX_ROWS |
Max rows for inline tables | 200 |
pytest tests/MIT