Skip to content

zrurf/cart_mq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Cart MQ

A toy-level, learning-oriented message queue written in Go, exploring storage-compute integration, multi-model messaging, and zero-copy data paths.

This is NOT a production-grade system. It is an educational project that implements interesting architectural ideas. Do not deploy it in production.

⚠ AI Programming Note : This project employs AI-assisted programming and should not be used in a production environment.

Overview

CartMQ implements a unified messaging platform supporting five distinct messaging models — Log, Broadcast, Order, Mailbox, and Request — each backed by carefully chosen storage primitives (Stream, Queue, RefCountedStream). The architecture co-locates storage and computation, enabling zero-copy data paths from network ingress through filtering to disk persistence.

Key Highlights

  • Five messaging models covering broadcast logs, competitive queues, multi-tenant inboxes, and RPC semantics
  • Two-phase threshold-separated WAL: small messages land in one I/O, large messages use async segment write
  • Three-layer filtering: topic routing → field index → header bitmap matching without deserialization
  • Off-heap deduplication: mmap-based sharded hash tables with zero GC pressure
  • Hierarchical timing wheel: L1 (10ms) / L2 (1s) / L3 (1min) for delayed delivery and RPC timeouts
  • Tiered storage: Hot/Warm/Cold segments with automatic promotion and demotion
  • Funnel DSL: SQL-like language for declarative topic/schema/pipeline management
  • QUIC-based network layer with FlatBuffers wire format for zero-copy frame decoding

Quick Start

Prerequisites

  • Go 1.24+

Build & Run

cd cartmq

# Build all components
cd cartmq && go build ./... && cd ..
cd server && go build -o ../cartmq-server . && cd ..
cd client && go build -o ../cartmq-client . && cd ..

# Run the server (uses sensible defaults with ~/.cartmq data dir)
./cartmq-server

# Or with a config file
./cartmq-server -config config.json

Using the Client SDK

package main

import (
    "context"
    "fmt"
    cartmq "github.com/zrurf/cart_mq"
    "github.com/zrurf/cart_mq/types"
)

func main() {
    client := cartmq.NewClient("localhost:9000")
    if err := client.Connect(); err != nil {
        panic(err)
    }
    defer client.Close()

    ctx := context.Background()

    // Create a schema and topic with Funnel DSL
    client.ExecuteFunnel(ctx, `
        CREATE SCHEMA OrderSchema (
            order_id STRING,
            amount INT,
            region STRING
        );
        CREATE TOPIC orders
        WITH MODEL = 'Order'
        SCHEMA = 'OrderSchema'
        INDEXED BY (region, amount);
    `)

    // Publish with mandatory idempotency key (Order model)
    msg := &types.Message{
        TopicName:      "orders",
        IdempotencyKey: "txn_001",
        Payload:        []byte(`{"order_id":"123","amount":100,"region":"east"}`),
    }
    result, err := client.Publish(ctx, msg)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Published: ID=%d\n", result.MessageID)

    // Subscribe and pull
    sub := &types.Subscription{ID: "worker_1", TopicName: "orders"}
    client.Subscribe(ctx, sub)

    msgs, err := client.Pull(ctx, "worker_1", 10)
    for _, m := range msgs {
        fmt.Printf("Received: %s\n", string(m.Payload))
        client.Ack(ctx, "worker_1", m.ID)
    }
}

Running Tests

# Unit & integration tests
go test -count=1 -timeout=120s ./...

# Benchmarks
go test -v -bench=. -benchmem -benchtime=3s ./test/

Project Structure

CartMQ/
├── go.work                    # Go workspace
├── README.md
├── docs/                      # Documentation
│   ├── ARCHITECTURE.md        # System architecture deep dive
│   ├── MODELS.md              # Five messaging models guide
│   ├── FUNNEL_DSL.md          # Funnel DSL language reference
│   └── CONFIGURATION.md       # Configuration options & defaults
├── cartmq/                    # Core library (module: github.com/zrurf/cart_mq)
│   ├── broker.go              # Main broker controller
│   ├── config.go              # Configuration & validation
│   ├── errors.go              # Error types
│   ├── internal/              # Internal engine
│   │   ├── models.go          # Five business model implementations
│   │   ├── lexer.go           # Funnel DSL lexer & parser
│   │   ├── executor.go        # Pipeline executor & three-layer filter
│   │   ├── queue.go           # Queue primitive (sliding window)
│   │   ├── stream.go          # Stream primitive (append-only)
│   │   ├── refcounted.go      # RefCountedStream (reference counting)
│   │   ├── offheap.go         # Off-heap dedup hash table
│   │   └── timingwheel.go     # Hierarchical timing wheel
│   ├── storage/               # Storage engine
│   │   ├── wal.go             # Write-Ahead Log
│   │   ├── segment.go         # Tiered data segments
│   │   └── mmap*.go           # Cross-platform mmap
│   ├── network/               # Network layer
│   │   └── protocol.go        # QUIC/TCP frame protocol + FlatBuffers
│   ├── wasm/                  # WASM runtime (stub)
│   ├── metrics/               # Observability
│   ├── types/                 # Shared type definitions
│   └── test/                  # Unit & integration tests + benchmarks
├── server/                    # Server entry point
└── client/                    # Client SDK & example

Messaging Models at a Glance

Model Primitive Idempotency ACK Use Case
Log Stream No No-op IoT telemetry, audit logs
Broadcast Stream No No-op Config push, notifications
Order Queue Required Explicit Task queues, work distribution
Mailbox RefCountedStream No Ref count Multi-tenant inbox, per-user feeds
Request Queue + ReplyPool Required Timeout + Reply RPC, request-reply workflows

See Models Documentation for detailed behavior and code examples.

Documentation

Document Content
Architecture Overview System design, WAL protocol, concurrency model, message lifecycle
Messaging Models Complete guide to all five models with code examples
Funnel DSL Reference Language syntax, token reference, pipeline definition
Configuration Guide Full JSON config reference with defaults and validation rules

Design Philosophy

  1. Primitives over patterns — Queue, Stream, and RefCountedStream are the only three storage primitives. All business models are composed from these.
  2. WAL is the source of truth — durability is established at WAL fsync. Data segments are built asynchronously.
  3. Zero-copy everywhere — FlatBuffers on the wire, mmap for storage, pointer-based field access.
  4. Explicit over implicit — Order and Request models force idempotency keys. No silent deduplication.

Known Limitations

This is a toy project. Notable omissions:

  • No distributed consensus (Raft/Paxos) — single node only
  • QUIC implementation uses a TCP adapter (production would use quic-go)
  • WASM runtime is a stub (no actual sandbox execution)
  • No persistent cursor storage for the Mailbox model
  • No Prometheus metrics export endpoint
  • Limited crash recovery (basic WAL replay only)
  • No authentication, authorization, or TLS
  • No message compaction or TTL enforcement

License

MIT

About

A single-machine toy MQ implemented in Golang, which attempts to integrate DSL and WASM plugins into the MQ.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages