A lightweight Kafka-inspired distributed message queue and pub/sub system built in Rust with async concurrency, persistent storage, and distributed logging. Perfect for learning distributed systems concepts or as a lightweight alternative to Apache Kafka for smaller applications.
- High Performance: Built with Rust and async/await for maximum throughput and low latency
- Message Queuing: Reliable message delivery with offset tracking and persistence
- Pub/Sub System: Subscribe to topics and receive real-time message notifications
- Partitioning: Distribute messages across multiple partitions for horizontal scalability
- Persistent Storage: Messages are durably stored on disk with configurable retention
- Consumer Groups: Support for multiple consumer groups with automatic offset management
- Auto Topic Creation: Topics are automatically created when first accessed
- Offset Management: Persistent consumer offset tracking with auto-commit support
- Distributed Logging: Built-in structured logging with search capabilities
- TCP Protocol: Custom binary protocol for efficient client-broker communication
- Batch Operations: Send multiple messages efficiently with batch APIs
- Storage Management: Advanced storage analysis and performance monitoring
- Timeout Handling: Robust timeout management for reliable operations
- Error Handling: Comprehensive error types and graceful failure handling
- Sub-microsecond reads: 347 ns consumption time
- Fast memory operations: 673 ns message creation
- Efficient serialization: 4.1 Β΅s encoding/decoding
- Reliable throughput: 24-453 KiB/s depending on message size
- Rust 1.70 or higher
- Tokio runtime for async operations
git clone https://github.com/AarambhDevHub/mini-kafka.git
cd mini-kafka
cargo build --release# Unit tests
cargo test
# Integration tests
cargo test --test integration_tests
# All tests with output
RUST_LOG=debug cargo test -- --nocapture# Default configuration (localhost:9092)
cargo run
# Custom configuration
cargo run -- --broker-id 1 --address 127.0.0.1:9092
# With environment variables
RUST_LOG=info cargo run# Run the simple producer example
cargo run --example simple_producerOr create your own producer:
use mini_kafka::Producer;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let producer = Producer::new(
"127.0.0.1:9092".to_string(),
"my-producer".to_string()
);
// Send a message
let offset = producer.send(
"my-topic".to_string(),
Some("key1".to_string()),
"Hello, Mini Kafka!".as_bytes().to_vec()
).await?;
println!("Message sent at offset: {}", offset);
Ok(())
}# Run the simple consumer example
cargo run --example simple_consumerOr create your own consumer:
use mini_kafka::Consumer;
use tokio::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut consumer = Consumer::new(
"127.0.0.1:9092".to_string(),
"my-group".to_string(),
"my-consumer".to_string()
)
.with_auto_commit(true)
.with_commit_interval(Duration::from_secs(5));
// Subscribe to topic
consumer.subscribe("my-topic".to_string(), vec![0, 1, 2]).await?;
// Start consuming
consumer.start_consuming(|message| {
println!("Received: {}", String::from_utf8_lossy(&message.payload));
true // Continue consuming
}).await?;
Ok(())
}The project includes comprehensive examples demonstrating various features:
# Simple producer/consumer pair
cargo run --example simple_producer
cargo run --example simple_consumer
# Pub/Sub demonstration with offset management
cargo run --example pubsub_demo# Distributed setup with multiple brokers
cargo run --example distributed_setup
# Storage management and performance analysis
cargo run --example storage_management_demoπ Starting Pub/Sub Demo with Offset Management
===============================================
π Consumer subscribed successfully and resumed from stored offsets
π Consumer waiting for messages...
π€ Starting producer...
π€ Publishing: π Welcome to Mini Kafka Pub/Sub!
β Published at offset 0
π¨ Notification #1: π Welcome to Mini Kafka Pub/Sub! (partition: 1, key: Some("notif-0"))
π€ Publishing: π’ System maintenance scheduled for tonight
β Published at offset 1
π¨ Notification #2: π’ System maintenance scheduled for tonight (partition: 0, key: Some("notif-1"))
β
Consumer finished processing 7 notifications
π Pub/Sub demo completed successfully!
- Messages were published with keys for partitioning
- Consumer automatically resumed from stored offsets
- Offsets were automatically committed every 2 seconds
pub struct Config {
pub broker: BrokerConfig {
id: 1,
data_dir: "data/broker".to_string(),
default_partitions: 3,
retention_ms: 604800000, // 7 days
max_message_size: 1048576, // 1MB
},
pub network: NetworkConfig {
bind_address: "127.0.0.1:9092".parse().unwrap(),
max_connections: 100,
request_timeout_ms: 30000,
},
pub logging: LoggingConfig {
level: "info".to_string(),
log_dir: "data/logs".to_string(),
max_log_size: 104857600, // 100MB
retention_days: 7,
},
}USAGE:
mini-kafka [OPTIONS]
OPTIONS:
-c, --config <CONFIG> Configuration file path [default: config.json]
-b, --broker-id <BROKER_ID> Broker ID [default: 1]
-a, --address <ADDRESS> Bind address [default: 127.0.0.1:9092]
-h, --help Print help information
-V, --version Print version informationHere are the latest performance benchmarks for Mini Kafka (generated on August 30, 2025):
| Benchmark | Time Range | Throughput |
|---|---|---|
| Small Messages (64B) | 2.03 - 2.54 ms | 24.6 - 30.8 KiB/s |
| Large Messages (4KB) | 8.82 - 12.5 ms | 320 - 453 KiB/s |
| Benchmark | Time |
|---|---|
| Sequential Reads | ~347 ns |
| Benchmark | Time |
|---|---|
| Message Creation | ~673 ns |
| Serialization/Deserialization | ~4.1 Β΅s |
- β Excellent: Sub-microsecond consumption (347 ns)
- β Very Good: Fast message creation (673 ns)
- β Good: Efficient serialization (4.1 Β΅s)
β οΈ Optimization Opportunity: Message production (2-12 ms)
Benchmarks run with Criterion.rs on GitHub Actions (Ubuntu latest)
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Producer β β Consumer β β Storage Mgmt β
β β β (Groups) β β Demo β
βββββββββββ¬ββββββββ βββββββββββ¬ββββββββ βββββββββββ¬ββββββββ
β β β
β TCP β TCP β
β Protocol β Protocol β
β β β
ββββββββββββββββββββββββΌβββββββββββββββββββββββ
β
βββββββββββββΌββββββββββββ
β β
β Mini Kafka β
β Broker β
β β
β βββββββββββββββββββ β
β β Topics β β
β β βββββββββββββ β β
β β βPartition 0β β β
β β βPartition 1β β β
β β βPartition 2β β β
β β βββββββββββββ β β
β βββββββββββββββββββ β
β β
β βββββββββββββββββββ β
β β Offset Storage β β
β β Message Storage β β
β β (Disk-based) β β
β βββββββββββββββββββ β
βββββββββββββββββββββββββ
- Broker: Central message coordinator managing topics, partitions, and storage
- Producer: Sends messages to topics with automatic partitioning and batching
- Consumer: Subscribes to topics with consumer group support and offset management
- Storage: Persistent disk-based storage for messages and consumer offsets
- Network: Efficient TCP-based binary protocol for client-broker communication
- Logging: Distributed logging system with structured logs and search capabilities
- Producer connects to broker and sends messages with optional keys
- Broker assigns messages to partitions using key-based or round-robin distribution
- Messages are persisted to disk with automatic durability guarantees
- Consumer polls broker for new messages from subscribed topics and partitions
- Offsets are automatically tracked and committed per consumer group
- Storage provides persistence for both messages and consumer state
impl Producer {
// Create a new producer with client identification
pub fn new(broker_address: String, client_id: String) -> Self
// Send a single message with optional key for partitioning
pub async fn send(&self, topic: String, key: Option<String>, payload: Vec<u8>) -> KafkaResult<u64>
// Send multiple messages efficiently in a batch
pub async fn send_batch(&self, messages: Vec<(String, Option<String>, Vec<u8>)>) -> KafkaResult<Vec<u64>>
// Get producer client information
pub fn client_info(&self) -> (&str, &str)
}impl Consumer {
// Create a new consumer with group and client identification
pub fn new(broker_address: String, group_id: String, client_id: String) -> Self
// Configure auto-commit behavior
pub fn with_auto_commit(self, auto_commit: bool) -> Self
pub fn with_commit_interval(self, interval: Duration) -> Self
// Subscribe to topics with specific partitions
pub async fn subscribe(&mut self, topic: String, partitions: Vec<u32>) -> KafkaResult<()>
// Consume a single message
pub async fn consume_one(&mut self) -> KafkaResult<Option<Message>>
// Start consuming with callback function
pub async fn start_consuming<F>(&mut self, callback: F) -> KafkaResult<()>
where F: FnMut(Message) -> bool + Send
// Manually commit offsets
pub async fn commit_offsets(&mut self) -> KafkaResult<()>
// Get consumer information
pub fn client_info(&self) -> (&str, &str, &str)
}#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Message {
pub id: Uuid, // Unique message ID
pub topic: String, // Topic name
pub partition: u32, // Partition number
pub key: Option<String>, // Optional message key
pub payload: Vec<u8>, // Message payload
pub timestamp: chrono::DateTime<chrono::Utc>, // Creation timestamp
pub headers: HashMap<String, String>, // Custom headers
}
impl Message {
pub fn new(topic: String, partition: u32, payload: Vec<u8>) -> Self
pub fn with_key(self, key: String) -> Self
pub fn with_header(self, key: String, value: String) -> Self
}# Run all unit tests
cargo test
# Run specific test module
cargo test --lib broker::tests
# Run with output
cargo test -- --nocapture# Run integration tests
cargo test --test integration_tests
# Run specific integration test
cargo test --test integration_tests test_produce_and_consumeThe project includes performance analysis through the storage management demo:
# Analyze storage performance with different message sizes
cargo run --example storage_management_demoExample performance output:
π Storage Analysis for 'storage-heavy':
- Messages processed: 47
- Total bytes consumed: 48,158 bytes
- Average message size: 1024.6 bytes
- Processing time: 26.82 seconds
- Throughput: 1.8 messages/sec
- Message types: {"heavy": 46}
π Storage Analysis for 'storage-light':
- Messages processed: 39
- Total bytes consumed: 1,628 bytes
- Average message size: 41.7 bytes
- Processing time: 26.77 seconds
- Throughput: 1.5 messages/sec
- Message types: {"light": 38}
Mini Kafka uses structured logging with multiple levels:
# Debug level (verbose)
RUST_LOG=debug cargo run
# Info level (default)
RUST_LOG=info cargo run
# Warn level (warnings only)
RUST_LOG=warn cargo run
# Error level (errors only)
RUST_LOG=error cargo runThe system provides comprehensive metrics through examples:
- Message throughput (messages/second)
- Storage efficiency (bytes per message, total storage usage)
- Consumer lag (offset tracking per consumer group)
- Partition distribution (message distribution across partitions)
- Connection handling (client connection management)
- Error rates (comprehensive error tracking)
The storage management demo provides detailed storage insights:
ποΈ Storage File Analysis:
π Storage location: data/storage_demo
- Total partition files: 6
- Total storage size: 102,682 bytes (100.3 KB)- Persistent Storage: All messages and offsets are stored durably on disk
- Consumer Groups: Multiple consumer instances with automatic load balancing
- Offset Management: Reliable offset tracking with auto-commit capabilities
- Error Recovery: Comprehensive error handling and graceful degradation
- Performance Monitoring: Built-in performance analysis tools
- Timeout Management: Robust timeout handling for reliable operations
For production use, consider these settings:
- Increase partition count for higher throughput:
default_partitions: 6 - Tune retention policies:
retention_ms: 259200000(3 days) - Optimize commit intervals:
commit_interval: Duration::from_secs(10) - Configure appropriate timeouts:
request_timeout_ms: 30000 - Set memory limits:
max_message_size: 10485760(10MB) - Use dedicated storage: Fast SSD storage for data directory
mini-kafka/
βββ src/
β βββ broker/ # Core broker implementation
β β βββ mod.rs # Broker coordination
β β βββ partition.rs # Partition management
β β βββ topic.rs # Topic handling
β β βββ storage.rs # Persistent storage
β βββ client/ # Client implementations
β β βββ producer.rs # Producer client
β β βββ consumer.rs # Consumer client
β βββ network/ # Network protocol
β β βββ server.rs # TCP server
β β βββ protocol.rs # Message protocol
β βββ logging/ # Distributed logging
β β βββ distributed_log.rs
β βββ config/ # Configuration management
β βββ settings.rs
βββ examples/ # Comprehensive examples
β βββ simple_producer.rs
β βββ simple_consumer.rs
β βββ pubsub_demo.rs
β βββ distributed_setup.rs
β βββ storage_management_demo.rs
βββ tests/ # Test suite
βββ integration_tests.rs
βββ unit_tests.rs
- β Message Persistence: Durable storage with bincode serialization
- β Consumer Groups: Offset tracking per group with auto-commit
- β Partition Management: Key-based and round-robin message distribution
- β Network Protocol: Efficient TCP-based binary communication
- β Error Handling: Comprehensive error types and recovery
- β Async Operations: Full async/await support with Tokio
- β Batch Operations: Efficient batch message sending
- β Auto Topic Creation: Automatic topic creation on first use
- β Timeout Management: Robust timeout handling in examples
- β Storage Analysis: Performance monitoring and analysis tools
We welcome contributions! Here's how to get started:
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Make your changes and add tests
- Ensure tests pass:
cargo test - Run examples:
cargo run --example simple_producer - Commit your changes:
git commit -m 'Add amazing feature' - Push to the branch:
git push origin feature/amazing-feature - Open a pull request
- Follow Rust standard formatting:
cargo fmt - Run clippy for linting:
cargo clippy - Add documentation for public APIs
- Include unit tests for new functionality
- Add examples for new features
This project is licensed under the MIT License - see the LICENSE file for details.
If you find Ignitia helpful, consider supporting the project:
- Apache Kafka - Inspiration for the design and concepts
- Tokio - Async runtime for Rust providing excellent performance
- Rust Community - Amazing ecosystem and development tools
- Contributors - Everyone who has helped improve this project
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Documentation: Run examples to see the system in action
- Performance: Use
storage_management_demofor performance analysis
- Core message queue functionality
- Producer and consumer clients
- Persistent storage with offset management
- Consumer groups with auto-commit
- Comprehensive examples and documentation
- Storage performance analysis
- Timeout and error handling
- Consumer group rebalancing
- Message compression support
- REST API for administration
- Cross-broker replication
- Enhanced monitoring dashboard
- Security features (TLS, authentication)
Mini Kafka provides a solid foundation for understanding distributed messaging systems while being practical enough for real-world use in smaller applications. The comprehensive examples and storage analysis tools make it an excellent learning platform for distributed systems concepts.