A toy-level, learning-oriented message queue written in Go, exploring storage-compute integration, multi-model messaging, and zero-copy data paths.
This is NOT a production-grade system. It is an educational project that implements interesting architectural ideas. Do not deploy it in production.
⚠ AI Programming Note : This project employs AI-assisted programming and should not be used in a production environment.
CartMQ implements a unified messaging platform supporting five distinct messaging models — Log, Broadcast, Order, Mailbox, and Request — each backed by carefully chosen storage primitives (Stream, Queue, RefCountedStream). The architecture co-locates storage and computation, enabling zero-copy data paths from network ingress through filtering to disk persistence.
- Five messaging models covering broadcast logs, competitive queues, multi-tenant inboxes, and RPC semantics
- Two-phase threshold-separated WAL: small messages land in one I/O, large messages use async segment write
- Three-layer filtering: topic routing → field index → header bitmap matching without deserialization
- Off-heap deduplication: mmap-based sharded hash tables with zero GC pressure
- Hierarchical timing wheel: L1 (10ms) / L2 (1s) / L3 (1min) for delayed delivery and RPC timeouts
- Tiered storage: Hot/Warm/Cold segments with automatic promotion and demotion
- Funnel DSL: SQL-like language for declarative topic/schema/pipeline management
- QUIC-based network layer with FlatBuffers wire format for zero-copy frame decoding
- Go 1.24+
cd cartmq
# Build all components
cd cartmq && go build ./... && cd ..
cd server && go build -o ../cartmq-server . && cd ..
cd client && go build -o ../cartmq-client . && cd ..
# Run the server (uses sensible defaults with ~/.cartmq data dir)
./cartmq-server
# Or with a config file
./cartmq-server -config config.jsonpackage main
import (
"context"
"fmt"
cartmq "github.com/zrurf/cart_mq"
"github.com/zrurf/cart_mq/types"
)
func main() {
client := cartmq.NewClient("localhost:9000")
if err := client.Connect(); err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
// Create a schema and topic with Funnel DSL
client.ExecuteFunnel(ctx, `
CREATE SCHEMA OrderSchema (
order_id STRING,
amount INT,
region STRING
);
CREATE TOPIC orders
WITH MODEL = 'Order'
SCHEMA = 'OrderSchema'
INDEXED BY (region, amount);
`)
// Publish with mandatory idempotency key (Order model)
msg := &types.Message{
TopicName: "orders",
IdempotencyKey: "txn_001",
Payload: []byte(`{"order_id":"123","amount":100,"region":"east"}`),
}
result, err := client.Publish(ctx, msg)
if err != nil {
panic(err)
}
fmt.Printf("Published: ID=%d\n", result.MessageID)
// Subscribe and pull
sub := &types.Subscription{ID: "worker_1", TopicName: "orders"}
client.Subscribe(ctx, sub)
msgs, err := client.Pull(ctx, "worker_1", 10)
for _, m := range msgs {
fmt.Printf("Received: %s\n", string(m.Payload))
client.Ack(ctx, "worker_1", m.ID)
}
}# Unit & integration tests
go test -count=1 -timeout=120s ./...
# Benchmarks
go test -v -bench=. -benchmem -benchtime=3s ./test/CartMQ/
├── go.work # Go workspace
├── README.md
├── docs/ # Documentation
│ ├── ARCHITECTURE.md # System architecture deep dive
│ ├── MODELS.md # Five messaging models guide
│ ├── FUNNEL_DSL.md # Funnel DSL language reference
│ └── CONFIGURATION.md # Configuration options & defaults
├── cartmq/ # Core library (module: github.com/zrurf/cart_mq)
│ ├── broker.go # Main broker controller
│ ├── config.go # Configuration & validation
│ ├── errors.go # Error types
│ ├── internal/ # Internal engine
│ │ ├── models.go # Five business model implementations
│ │ ├── lexer.go # Funnel DSL lexer & parser
│ │ ├── executor.go # Pipeline executor & three-layer filter
│ │ ├── queue.go # Queue primitive (sliding window)
│ │ ├── stream.go # Stream primitive (append-only)
│ │ ├── refcounted.go # RefCountedStream (reference counting)
│ │ ├── offheap.go # Off-heap dedup hash table
│ │ └── timingwheel.go # Hierarchical timing wheel
│ ├── storage/ # Storage engine
│ │ ├── wal.go # Write-Ahead Log
│ │ ├── segment.go # Tiered data segments
│ │ └── mmap*.go # Cross-platform mmap
│ ├── network/ # Network layer
│ │ └── protocol.go # QUIC/TCP frame protocol + FlatBuffers
│ ├── wasm/ # WASM runtime (stub)
│ ├── metrics/ # Observability
│ ├── types/ # Shared type definitions
│ └── test/ # Unit & integration tests + benchmarks
├── server/ # Server entry point
└── client/ # Client SDK & example
| Model | Primitive | Idempotency | ACK | Use Case |
|---|---|---|---|---|
| Log | Stream | No | No-op | IoT telemetry, audit logs |
| Broadcast | Stream | No | No-op | Config push, notifications |
| Order | Queue | Required | Explicit | Task queues, work distribution |
| Mailbox | RefCountedStream | No | Ref count | Multi-tenant inbox, per-user feeds |
| Request | Queue + ReplyPool | Required | Timeout + Reply | RPC, request-reply workflows |
See Models Documentation for detailed behavior and code examples.
| Document | Content |
|---|---|
| Architecture Overview | System design, WAL protocol, concurrency model, message lifecycle |
| Messaging Models | Complete guide to all five models with code examples |
| Funnel DSL Reference | Language syntax, token reference, pipeline definition |
| Configuration Guide | Full JSON config reference with defaults and validation rules |
- Primitives over patterns — Queue, Stream, and RefCountedStream are the only three storage primitives. All business models are composed from these.
- WAL is the source of truth — durability is established at WAL fsync. Data segments are built asynchronously.
- Zero-copy everywhere — FlatBuffers on the wire, mmap for storage, pointer-based field access.
- Explicit over implicit — Order and Request models force idempotency keys. No silent deduplication.
This is a toy project. Notable omissions:
- No distributed consensus (Raft/Paxos) — single node only
- QUIC implementation uses a TCP adapter (production would use
quic-go) - WASM runtime is a stub (no actual sandbox execution)
- No persistent cursor storage for the Mailbox model
- No Prometheus metrics export endpoint
- Limited crash recovery (basic WAL replay only)
- No authentication, authorization, or TLS
- No message compaction or TTL enforcement