A production-ready LLM observability platform built with Go and Redpanda (Kafka). StreamLens ingests telemetry events from LLM-powered applications, processes them via stream processing to compute real-time metrics, and exposes a query API for dashboards.
Portfolio Project: This project demonstrates stream processing, microservices architecture, and real-time data pipelines for LLM observability.
┌─────────────────┐
│ LLM Apps │
└────────┬────────┘
│ HTTP
▼
┌─────────────────┐ ┌──────────────┐
│ Ingestion API │─────▶│ Redpanda │
│ (Port 8080) │ │ (Kafka API) │
└─────────────────┘ └──────┬───────┘
│
▼
┌───────────────────────┐
│ Metrics Processor │
│ - Join requests/resp │
│ - Window aggregation │
│ - Compute metrics │
└───────┬───────────────┘
│
┌───────┴────────┐
▼ ▼
┌──────────────┐ ┌──────────┐
│ Postgres │ │ Redpanda │
│ (Metrics) │ │ (Metrics)│
└──────┬───────┘ └──────────┘
│
▼
┌─────────────────┐
│ Metrics API │
│ (Port 8081) │
└─────────────────┘
- Ingestion: LLM apps send request/response events to Ingestion API
- Streaming: Events flow through Redpanda topics (
llm.requests,llm.responses) - Processing: Metrics Processor joins events and aggregates into 1-minute windows
- Storage: Computed metrics stored in Postgres and published to
llm.metricstopic - Query: Metrics API serves aggregated metrics for dashboards
- Docker & Docker Compose
- Go 1.22+ (for local development)
- Make (optional, for convenience)
# Start all services (Redpanda, Postgres, and Go services)
make docker-up
# Or manually:
docker compose -f deploy/docker-compose.yml up -d
# View logs
make docker-logs# Wait for services to be ready (~30 seconds), then:
make generate-traffic
# Or with custom parameters:
NUM_REQUESTS=50 BASE_URL=http://localhost:8080 ./scripts/generate-traffic.shAfter ~2 minutes (to allow windowing), query the metrics:
# Get metrics for tenant-1
curl 'http://localhost:8081/v1/metrics?tenant_id=tenant-1&limit=10' | jq
# Filter by route and model
curl 'http://localhost:8081/v1/metrics?tenant_id=tenant-1&route=chat_support_v1&model=gpt-4.1-mini' | jq-
Start infrastructure (Redpanda + Postgres):
# In docker-compose, comment out the Go services and run: docker compose -f deploy/docker-compose.yml up redpanda postgres -
Install dependencies:
make deps
-
Run services locally (in separate terminals):
# Terminal 1: Ingestion API make run-ingestion # Terminal 2: Metrics Processor make run-processor # Terminal 3: Metrics API make run-metrics-api
make build
# Binaries will be in ./bin/
./bin/ingestion-api
./bin/metrics-processor
./bin/metrics-apiIngest an LLM request event.
Request Body:
{
"request_id": "uuid",
"tenant_id": "acme-corp",
"route": "chat_support_v2",
"model": "gpt-4.1-mini",
"timestamp": "2025-11-19T10:00:00.000Z",
"prompt_tokens": 321,
"user_id_hash": "sha256-hash",
"metadata": {
"experiment": "prompt_v3",
"country": "SE"
}
}Ingest an LLM response event.
Request Body:
{
"request_id": "uuid",
"timestamp": "2025-11-19T10:00:01.123Z",
"latency_ms": 1123,
"completion_tokens": 512,
"finish_reason": "stop",
"error": null
}Query aggregated metrics.
Query Parameters:
tenant_id(required): Filter by tenantroute(optional): Filter by routemodel(optional): Filter by modellimit(optional): Number of windows to return (default: 60)
Response:
{
"metrics": [
{
"tenant_id": "acme-corp",
"route": "chat_support_v2",
"model": "gpt-4.1-mini",
"window_start": "2025-11-19T10:00:00Z",
"window_end": "2025-11-19T10:01:00Z",
"requests": 1234,
"errors": 12,
"avg_latency_ms": 732.4,
"p95_latency_ms": 1234.0,
"avg_prompt_tokens": 300.1,
"avg_completion_tokens": 420.6,
"estimated_cost_usd": 2.31
}
],
"count": 1
}All services are configured via environment variables:
| Variable | Description | Default |
|---|---|---|
KAFKA_BROKERS |
Comma-separated Kafka broker addresses | localhost:19092 |
POSTGRES_DSN |
PostgreSQL connection string | postgres://streamlens:streamlens@localhost:5432/streamlens?sslmode=disable |
HTTP_PORT |
HTTP server port | 8080 (ingestion), 8081 (metrics) |
CONSUMER_GROUP |
Kafka consumer group name | metrics-processor-group |
streamlens/
├── cmd/
│ ├── ingestion-api/ # HTTP ingestion service
│ ├── metrics-processor/ # Stream processor
│ └── metrics-api/ # HTTP metrics query service
├── internal/
│ ├── config/ # Configuration management
│ ├── handlers/ # HTTP handlers
│ ├── kafka/ # Kafka producer/consumer wrappers
│ ├── models/ # Event schemas
│ ├── processor/ # Stream processing logic
│ └── store/ # Postgres storage layer
├── deploy/
│ ├── docker-compose.yml # Docker Compose config
│ ├── Dockerfile # Multi-service Dockerfile
│ └── init.sql # Postgres schema
├── scripts/
│ └── generate-traffic.sh # Sample data generator
├── Makefile # Development tasks
└── README.md
# Run all tests
make test
# Test specific package
go test -v ./internal/processor- Stream Processing: Real-time joining of requests/responses by
request_id - Windowed Aggregation: 1-minute tumbling windows per (tenant, route, model)
- Metrics Computation:
- Request count
- Error count
- Average & P95 latency
- Token usage (prompt/completion)
- Estimated cost
- Dual Sink: Metrics written to both Kafka topic and Postgres
- Graceful Shutdown: All services handle SIGTERM/SIGINT correctly
- Production-Ready: Proper error handling, logging, and connection pooling
# Ingestion API
curl http://localhost:8080/health
# Metrics API
curl http://localhost:8081/healthAccess Redpanda metrics and topics at:
- Admin API: http://localhost:19644
# All services
docker compose -f deploy/docker-compose.yml logs -f
# Specific service
docker compose -f deploy/docker-compose.yml logs -f metrics-processor# Stop services
make docker-down
# Stop and remove volumes
make cleanFor a quick demo, follow the Quick Start Guide:
# Start all services
make docker-up
# Generate sample traffic
make generate-traffic
# Query metrics (wait ~2 minutes for windowing)
curl 'http://localhost:8081/v1/metrics?tenant_id=tenant-1' | jqSample Output:
{
"metrics": [
{
"tenant_id": "tenant-1",
"route": "chat_support_v1",
"model": "gpt-4.1-mini",
"window_start": "2025-11-20T10:00:00Z",
"window_end": "2025-11-20T10:01:00Z",
"requests": 1234,
"errors": 12,
"avg_latency_ms": 732.4,
"p95_latency_ms": 1234.0,
"avg_prompt_tokens": 300.1,
"avg_completion_tokens": 420.6,
"estimated_cost_usd": 2.31
}
]
}- Add
llm.evaluationstopic support - Implement distributed tracing (OpenTelemetry)
- Add Prometheus metrics endpoint
- Dashboard frontend (Grafana/React)
- Multi-region deployment support
- Schema registry integration
- Rate limiting on ingestion API
- Backfilling for historical data
Contributions are welcome! Please check out our Contributing Guide to get started.
Please read our Code of Conduct before contributing.
This project is licensed under the MIT License - see the LICENSE file for details.
- Redpanda - Kafka-compatible streaming platform
- franz-go - High-performance Kafka client for Go
- OpenLLMetry - OpenTelemetry for LLMs