Skip to content

AarambhDevHub/mini-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

9 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Mini Kafka - Distributed Message Queue & Pub/Sub System

A lightweight Kafka-inspired distributed message queue and pub/sub system built in Rust with async concurrency, persistent storage, and distributed logging. Perfect for learning distributed systems concepts or as a lightweight alternative to Apache Kafka for smaller applications.

Benchmark Status Performance Rust License

πŸš€ Features

  • High Performance: Built with Rust and async/await for maximum throughput and low latency
  • Message Queuing: Reliable message delivery with offset tracking and persistence
  • Pub/Sub System: Subscribe to topics and receive real-time message notifications
  • Partitioning: Distribute messages across multiple partitions for horizontal scalability
  • Persistent Storage: Messages are durably stored on disk with configurable retention
  • Consumer Groups: Support for multiple consumer groups with automatic offset management
  • Auto Topic Creation: Topics are automatically created when first accessed
  • Offset Management: Persistent consumer offset tracking with auto-commit support
  • Distributed Logging: Built-in structured logging with search capabilities
  • TCP Protocol: Custom binary protocol for efficient client-broker communication
  • Batch Operations: Send multiple messages efficiently with batch APIs
  • Storage Management: Advanced storage analysis and performance monitoring
  • Timeout Handling: Robust timeout management for reliable operations
  • Error Handling: Comprehensive error types and graceful failure handling
  • Sub-microsecond reads: 347 ns consumption time
  • Fast memory operations: 673 ns message creation
  • Efficient serialization: 4.1 Β΅s encoding/decoding
  • Reliable throughput: 24-453 KiB/s depending on message size

πŸ“¦ Installation

Prerequisites

  • Rust 1.70 or higher
  • Tokio runtime for async operations

Clone and Build

git clone https://github.com/AarambhDevHub/mini-kafka.git
cd mini-kafka
cargo build --release

Run Tests

# Unit tests
cargo test

# Integration tests
cargo test --test integration_tests

# All tests with output
RUST_LOG=debug cargo test -- --nocapture

πŸƒ Quick Start

1. Start the Broker

# Default configuration (localhost:9092)
cargo run

# Custom configuration
cargo run -- --broker-id 1 --address 127.0.0.1:9092

# With environment variables
RUST_LOG=info cargo run

2. Send Messages (Producer)

# Run the simple producer example
cargo run --example simple_producer

Or create your own producer:

use mini_kafka::Producer;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let producer = Producer::new(
        "127.0.0.1:9092".to_string(),
        "my-producer".to_string()
    );

    // Send a message
    let offset = producer.send(
        "my-topic".to_string(),
        Some("key1".to_string()),
        "Hello, Mini Kafka!".as_bytes().to_vec()
    ).await?;

    println!("Message sent at offset: {}", offset);
    Ok(())
}

3. Consume Messages

# Run the simple consumer example
cargo run --example simple_consumer

Or create your own consumer:

use mini_kafka::Consumer;
use tokio::time::Duration;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut consumer = Consumer::new(
        "127.0.0.1:9092".to_string(),
        "my-group".to_string(),
        "my-consumer".to_string()
    )
    .with_auto_commit(true)
    .with_commit_interval(Duration::from_secs(5));

    // Subscribe to topic
    consumer.subscribe("my-topic".to_string(), vec![0, 1, 2]).await?;

    // Start consuming
    consumer.start_consuming(|message| {
        println!("Received: {}", String::from_utf8_lossy(&message.payload));
        true // Continue consuming
    }).await?;

    Ok(())
}

πŸ“– Examples

The project includes comprehensive examples demonstrating various features:

Basic Examples

# Simple producer/consumer pair
cargo run --example simple_producer
cargo run --example simple_consumer

# Pub/Sub demonstration with offset management
cargo run --example pubsub_demo

Advanced Examples

# Distributed setup with multiple brokers
cargo run --example distributed_setup

# Storage management and performance analysis
cargo run --example storage_management_demo

Example Output

πŸš€ Starting Pub/Sub Demo with Offset Management
===============================================
πŸ”” Consumer subscribed successfully and resumed from stored offsets
πŸ‘‚ Consumer waiting for messages...

πŸ“€ Starting producer...
πŸ“€ Publishing: πŸŽ‰ Welcome to Mini Kafka Pub/Sub!
   βœ“ Published at offset 0
πŸ“¨ Notification #1: πŸŽ‰ Welcome to Mini Kafka Pub/Sub! (partition: 1, key: Some("notif-0"))

πŸ“€ Publishing: πŸ“’ System maintenance scheduled for tonight
   βœ“ Published at offset 1
πŸ“¨ Notification #2: πŸ“’ System maintenance scheduled for tonight (partition: 0, key: Some("notif-1"))

βœ… Consumer finished processing 7 notifications
πŸŽ‰ Pub/Sub demo completed successfully!
   - Messages were published with keys for partitioning
   - Consumer automatically resumed from stored offsets
   - Offsets were automatically committed every 2 seconds

βš™οΈ Configuration

Default Configuration

pub struct Config {
    pub broker: BrokerConfig {
        id: 1,
        data_dir: "data/broker".to_string(),
        default_partitions: 3,
        retention_ms: 604800000, // 7 days
        max_message_size: 1048576, // 1MB
    },
    pub network: NetworkConfig {
        bind_address: "127.0.0.1:9092".parse().unwrap(),
        max_connections: 100,
        request_timeout_ms: 30000,
    },
    pub logging: LoggingConfig {
        level: "info".to_string(),
        log_dir: "data/logs".to_string(),
        max_log_size: 104857600, // 100MB
        retention_days: 7,
    },
}

Command Line Options

USAGE:
    mini-kafka [OPTIONS]

OPTIONS:
    -c, --config <CONFIG>        Configuration file path [default: config.json]
    -b, --broker-id <BROKER_ID>  Broker ID [default: 1]
    -a, --address <ADDRESS>      Bind address [default: 127.0.0.1:9092]
    -h, --help                   Print help information
    -V, --version                Print version information

πŸ“Š Performance Benchmarks

Here are the latest performance benchmarks for Mini Kafka (generated on August 30, 2025):

Production Performance

Benchmark Time Range Throughput
Small Messages (64B) 2.03 - 2.54 ms 24.6 - 30.8 KiB/s
Large Messages (4KB) 8.82 - 12.5 ms 320 - 453 KiB/s

Consumption Performance

Benchmark Time
Sequential Reads ~347 ns

Memory Operations

Benchmark Time
Message Creation ~673 ns
Serialization/Deserialization ~4.1 Β΅s

Performance Summary

  • βœ… Excellent: Sub-microsecond consumption (347 ns)
  • βœ… Very Good: Fast message creation (673 ns)
  • βœ… Good: Efficient serialization (4.1 Β΅s)
  • ⚠️ Optimization Opportunity: Message production (2-12 ms)

Benchmarks run with Criterion.rs on GitHub Actions (Ubuntu latest)


πŸ—οΈ Architecture

System Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    Producer     β”‚    β”‚    Consumer     β”‚    β”‚  Storage Mgmt   β”‚
β”‚                 β”‚    β”‚   (Groups)      β”‚    β”‚     Demo        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
          β”‚                      β”‚                      β”‚
          β”‚        TCP           β”‚          TCP         β”‚
          β”‚     Protocol         β”‚       Protocol       β”‚
          β”‚                      β”‚                      β”‚
          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                 β”‚
                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                     β”‚                       β”‚
                     β”‚    Mini Kafka         β”‚
                     β”‚       Broker          β”‚
                     β”‚                       β”‚
                     β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
                     β”‚  β”‚     Topics      β”‚  β”‚
                     β”‚  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚  β”‚
                     β”‚  β”‚   β”‚Partition 0β”‚ β”‚  β”‚
                     β”‚  β”‚   β”‚Partition 1β”‚ β”‚  β”‚
                     β”‚  β”‚   β”‚Partition 2β”‚ β”‚  β”‚
                     β”‚  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚  β”‚
                     β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
                     β”‚                       β”‚
                     β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
                     β”‚  β”‚ Offset Storage  β”‚  β”‚
                     β”‚  β”‚ Message Storage β”‚  β”‚
                     β”‚  β”‚   (Disk-based)  β”‚  β”‚
                     β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

  • Broker: Central message coordinator managing topics, partitions, and storage
  • Producer: Sends messages to topics with automatic partitioning and batching
  • Consumer: Subscribes to topics with consumer group support and offset management
  • Storage: Persistent disk-based storage for messages and consumer offsets
  • Network: Efficient TCP-based binary protocol for client-broker communication
  • Logging: Distributed logging system with structured logs and search capabilities

Message Flow

  1. Producer connects to broker and sends messages with optional keys
  2. Broker assigns messages to partitions using key-based or round-robin distribution
  3. Messages are persisted to disk with automatic durability guarantees
  4. Consumer polls broker for new messages from subscribed topics and partitions
  5. Offsets are automatically tracked and committed per consumer group
  6. Storage provides persistence for both messages and consumer state

πŸ“š API Reference

Producer API

impl Producer {
    // Create a new producer with client identification
    pub fn new(broker_address: String, client_id: String) -> Self

    // Send a single message with optional key for partitioning
    pub async fn send(&self, topic: String, key: Option<String>, payload: Vec<u8>) -> KafkaResult<u64>

    // Send multiple messages efficiently in a batch
    pub async fn send_batch(&self, messages: Vec<(String, Option<String>, Vec<u8>)>) -> KafkaResult<Vec<u64>>

    // Get producer client information
    pub fn client_info(&self) -> (&str, &str)
}

Consumer API

impl Consumer {
    // Create a new consumer with group and client identification
    pub fn new(broker_address: String, group_id: String, client_id: String) -> Self

    // Configure auto-commit behavior
    pub fn with_auto_commit(self, auto_commit: bool) -> Self
    pub fn with_commit_interval(self, interval: Duration) -> Self

    // Subscribe to topics with specific partitions
    pub async fn subscribe(&mut self, topic: String, partitions: Vec<u32>) -> KafkaResult<()>

    // Consume a single message
    pub async fn consume_one(&mut self) -> KafkaResult<Option<Message>>

    // Start consuming with callback function
    pub async fn start_consuming<F>(&mut self, callback: F) -> KafkaResult<()>
    where F: FnMut(Message) -> bool + Send

    // Manually commit offsets
    pub async fn commit_offsets(&mut self) -> KafkaResult<()>

    // Get consumer information
    pub fn client_info(&self) -> (&str, &str, &str)
}

Message Structure

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Message {
    pub id: Uuid,                                    // Unique message ID
    pub topic: String,                               // Topic name
    pub partition: u32,                              // Partition number
    pub key: Option<String>,                         // Optional message key
    pub payload: Vec<u8>,                           // Message payload
    pub timestamp: chrono::DateTime<chrono::Utc>,   // Creation timestamp
    pub headers: HashMap<String, String>,           // Custom headers
}

impl Message {
    pub fn new(topic: String, partition: u32, payload: Vec<u8>) -> Self
    pub fn with_key(self, key: String) -> Self
    pub fn with_header(self, key: String, value: String) -> Self
}

πŸ§ͺ Testing

Unit Tests

# Run all unit tests
cargo test

# Run specific test module
cargo test --lib broker::tests

# Run with output
cargo test -- --nocapture

Integration Tests

# Run integration tests
cargo test --test integration_tests

# Run specific integration test
cargo test --test integration_tests test_produce_and_consume

Performance Testing

The project includes performance analysis through the storage management demo:

# Analyze storage performance with different message sizes
cargo run --example storage_management_demo

Example performance output:

πŸ“Š Storage Analysis for 'storage-heavy':
   - Messages processed: 47
   - Total bytes consumed: 48,158 bytes
   - Average message size: 1024.6 bytes
   - Processing time: 26.82 seconds
   - Throughput: 1.8 messages/sec
   - Message types: {"heavy": 46}

πŸ“Š Storage Analysis for 'storage-light':
   - Messages processed: 39
   - Total bytes consumed: 1,628 bytes
   - Average message size: 41.7 bytes
   - Processing time: 26.77 seconds
   - Throughput: 1.5 messages/sec
   - Message types: {"light": 38}

πŸ“Š Monitoring & Logging

Structured Logging

Mini Kafka uses structured logging with multiple levels:

# Debug level (verbose)
RUST_LOG=debug cargo run

# Info level (default)
RUST_LOG=info cargo run

# Warn level (warnings only)
RUST_LOG=warn cargo run

# Error level (errors only)
RUST_LOG=error cargo run

Built-in Metrics

The system provides comprehensive metrics through examples:

  • Message throughput (messages/second)
  • Storage efficiency (bytes per message, total storage usage)
  • Consumer lag (offset tracking per consumer group)
  • Partition distribution (message distribution across partitions)
  • Connection handling (client connection management)
  • Error rates (comprehensive error tracking)

Storage Analysis

The storage management demo provides detailed storage insights:

πŸ—‚οΈ Storage File Analysis:
   πŸ“ Storage location: data/storage_demo
   - Total partition files: 6
   - Total storage size: 102,682 bytes (100.3 KB)

πŸš€ Production Considerations

Deployment Features

  • Persistent Storage: All messages and offsets are stored durably on disk
  • Consumer Groups: Multiple consumer instances with automatic load balancing
  • Offset Management: Reliable offset tracking with auto-commit capabilities
  • Error Recovery: Comprehensive error handling and graceful degradation
  • Performance Monitoring: Built-in performance analysis tools
  • Timeout Management: Robust timeout handling for reliable operations

Configuration Recommendations

For production use, consider these settings:

  1. Increase partition count for higher throughput: default_partitions: 6
  2. Tune retention policies: retention_ms: 259200000 (3 days)
  3. Optimize commit intervals: commit_interval: Duration::from_secs(10)
  4. Configure appropriate timeouts: request_timeout_ms: 30000
  5. Set memory limits: max_message_size: 10485760 (10MB)
  6. Use dedicated storage: Fast SSD storage for data directory

πŸ”§ Development

Project Structure

mini-kafka/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ broker/          # Core broker implementation
β”‚   β”‚   β”œβ”€β”€ mod.rs       # Broker coordination
β”‚   β”‚   β”œβ”€β”€ partition.rs # Partition management
β”‚   β”‚   β”œβ”€β”€ topic.rs     # Topic handling
β”‚   β”‚   └── storage.rs   # Persistent storage
β”‚   β”œβ”€β”€ client/          # Client implementations
β”‚   β”‚   β”œβ”€β”€ producer.rs  # Producer client
β”‚   β”‚   └── consumer.rs  # Consumer client
β”‚   β”œβ”€β”€ network/         # Network protocol
β”‚   β”‚   β”œβ”€β”€ server.rs    # TCP server
β”‚   β”‚   └── protocol.rs  # Message protocol
β”‚   β”œβ”€β”€ logging/         # Distributed logging
β”‚   β”‚   └── distributed_log.rs
β”‚   └── config/          # Configuration management
β”‚       └── settings.rs
β”œβ”€β”€ examples/            # Comprehensive examples
β”‚   β”œβ”€β”€ simple_producer.rs
β”‚   β”œβ”€β”€ simple_consumer.rs
β”‚   β”œβ”€β”€ pubsub_demo.rs
β”‚   β”œβ”€β”€ distributed_setup.rs
β”‚   └── storage_management_demo.rs
└── tests/              # Test suite
    β”œβ”€β”€ integration_tests.rs
    └── unit_tests.rs

Key Features Implemented

  • βœ… Message Persistence: Durable storage with bincode serialization
  • βœ… Consumer Groups: Offset tracking per group with auto-commit
  • βœ… Partition Management: Key-based and round-robin message distribution
  • βœ… Network Protocol: Efficient TCP-based binary communication
  • βœ… Error Handling: Comprehensive error types and recovery
  • βœ… Async Operations: Full async/await support with Tokio
  • βœ… Batch Operations: Efficient batch message sending
  • βœ… Auto Topic Creation: Automatic topic creation on first use
  • βœ… Timeout Management: Robust timeout handling in examples
  • βœ… Storage Analysis: Performance monitoring and analysis tools

🀝 Contributing

We welcome contributions! Here's how to get started:

Development Setup

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Make your changes and add tests
  4. Ensure tests pass: cargo test
  5. Run examples: cargo run --example simple_producer
  6. Commit your changes: git commit -m 'Add amazing feature'
  7. Push to the branch: git push origin feature/amazing-feature
  8. Open a pull request

Code Style

  • Follow Rust standard formatting: cargo fmt
  • Run clippy for linting: cargo clippy
  • Add documentation for public APIs
  • Include unit tests for new functionality
  • Add examples for new features

πŸ“œ License

This project is licensed under the MIT License - see the LICENSE file for details.

β˜• Support & Community

If you find Ignitia helpful, consider supporting the project:

Buy Me A Coffee

πŸ™ Acknowledgments

  • Apache Kafka - Inspiration for the design and concepts
  • Tokio - Async runtime for Rust providing excellent performance
  • Rust Community - Amazing ecosystem and development tools
  • Contributors - Everyone who has helped improve this project

πŸ“ž Support & Community

  • Issues: GitHub Issues
  • Discussions: GitHub Discussions
  • Documentation: Run examples to see the system in action
  • Performance: Use storage_management_demo for performance analysis

πŸ—ΊοΈ Roadmap

Completed βœ…

  • Core message queue functionality
  • Producer and consumer clients
  • Persistent storage with offset management
  • Consumer groups with auto-commit
  • Comprehensive examples and documentation
  • Storage performance analysis
  • Timeout and error handling

Next Steps 🎯

  • Consumer group rebalancing
  • Message compression support
  • REST API for administration
  • Cross-broker replication
  • Enhanced monitoring dashboard
  • Security features (TLS, authentication)

Mini Kafka provides a solid foundation for understanding distributed messaging systems while being practical enough for real-world use in smaller applications. The comprehensive examples and storage analysis tools make it an excellent learning platform for distributed systems concepts.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Sponsor this project

  •  

Packages

 
 
 

Contributors

Languages