33 releases (12 breaking)
| new 0.14.0 | May 14, 2026 |
|---|---|
| 0.11.0 | Apr 5, 2026 |
| 0.10.0 | Mar 28, 2026 |
| 0.5.2 | Nov 22, 2025 |
| 0.1.1 | Jul 6, 2024 |
#2 in #event-streaming
Used in danube-connect-core
540KB
11K
SLoC
Danube Client
An async Rust client library for interacting with Danube Messaging platform.
🌊 Danube Messaging is an open-source distributed Messaging Broker platform written in Rust. Consult the documentation for supported concepts and the platform architecture.
Features
📤 Producer Capabilities
- Basic Messaging - Send messages with byte payloads and optional key-value attributes
- Key-Based Routing - Tag messages with routing keys via
send_with_key()for key-affinity dispatch - Partitioned Topics - Distribute messages across multiple partitions for horizontal scaling
- Reliable Dispatch - Guaranteed message delivery with persistence (WAL + cloud storage)
- Schema Integration - Type-safe messaging with automatic validation (Bytes, String, Number, Avro, JSON Schema, Protobuf)
📥 Consumer Capabilities
- Flexible Subscriptions - Four subscription types for different use cases:
- Exclusive - Single active consumer, guaranteed ordering
- Shared - Load balancing across multiple consumers, parallel processing
- Failover - High availability with automatic standby promotion
- Key-Shared - Key-affinity routing with per-key ordering and optional key filtering
- Message Acknowledgment - Reliable message processing with at-least-once delivery
- Partitioned Consumption - Automatic handling of messages from all partitions
- Batch Processing - Efficient batch consumption for high-throughput scenarios
- Message Attributes - Access metadata and custom headers
🔐 Schema Registry
- Schema Management - Register, version, and retrieve schemas (JSON Schema, Avro, Protobuf)
- Compatibility Checking - Validate schema evolution (Backward, Forward, Full, None modes)
- Type Safety - Automatic validation against registered schemas
- Schema Evolution - Safe schema updates with compatibility enforcement
- Startup Validation - Validate consumer structs against schemas before processing
🏗️ Client Features
- Async/Await - Built on Tokio for efficient async I/O
- Connection Pooling - Shared connection management across producers/consumers
- Automatic Reconnection - Resilient connection handling
- Topic Namespaces - Organize topics with namespace structure (
/namespace/topic-name)
Example Usage
Check out the example files.
Producer
let client = DanubeClient::builder()
.service_url("http://127.0.0.1:6650")
.build()
.await?;
let topic_name = "/default/test_topic";
let producer_name = "test_prod";
let mut producer = client
.new_producer()
.with_topic(topic_name)
.with_name(producer_name)
.build()?;
producer.create().await?;
println!("The Producer {} was created", producer_name);
let encoded_data = "Hello Danube".as_bytes().to_vec();
let message_id = producer.send(encoded_data, None).await?;
println!("The Message with id {} was sent", message_id);
Consumer
let client = DanubeClient::builder()
.service_url("http://127.0.0.1:6650")
.build()
.await?;
let topic = "/default/test_topic";
let consumer_name = "test_cons";
let subscription_name = "test_subs";
let mut consumer = client
.new_consumer()
.with_topic(topic)
.with_consumer_name(consumer_name)
.with_subscription(subscription_name)
.with_subscription_type(SubType::Exclusive)
.build()?;
// Subscribe to the topic
consumer.subscribe().await?;
println!("The Consumer {} was created", consumer_name);
// Start receiving messages
let mut message_stream = consumer.receive().await?;
while let Some(message) = message_stream.recv().await {
let payload = message.payload;
match String::from_utf8(payload) {
Ok(message_str) => {
println!("Received message: {:?}", message_str);
consumer.ack(&message).await?;
}
Err(e) => println!("Failed to convert Payload to String: {}", e),
}
}
Key-Shared Producer & Consumer
Key-Shared subscriptions route messages by key — all messages with the same routing key are delivered to the same consumer, guaranteeing per-key ordering while distributing load across consumers.
// Producer: tag each message with a routing key
let mut producer = client
.new_producer()
.with_topic("/default/orders")
.with_name("orders_producer")
.with_reliable_dispatch()
.build()?;
producer.create().await?;
// All "payment" messages go to the same consumer
producer.send_with_key(data, None, "payment").await?;
producer.send_with_key(data, None, "shipping").await?;
// Consumer: automatic key distribution (no filters)
let mut consumer = client
.new_consumer()
.with_topic("/default/orders")
.with_consumer_name("worker_1")
.with_subscription("orders_sub")
.with_subscription_type(SubType::KeyShared)
.build()?;
consumer.subscribe().await?;
// Consumer with key filters: only receive matching keys
let mut payments_consumer = client
.new_consumer()
.with_topic("/default/orders")
.with_consumer_name("payments_worker")
.with_subscription("orders_sub")
.with_subscription_type(SubType::KeyShared)
.with_key_filter("payment") // exact match
.with_key_filter("invoice-*") // glob pattern
.build()?;
payments_consumer.subscribe().await?;
Advanced Features
For detailed guides and examples on advanced capabilities:
- Producer Advanced Features - Partitions, reliable dispatch, and schema integration
- Consumer Advanced Features - Partitioned consumption, failover patterns, and batch processing
- Schema Registry Integration - Type-safe messaging with schema validation and evolution
Browse the examples directory for complete working code.
Contribution
Check the documentation on how to setup a Danube Broker.
Dependencies
~18–35MB
~505K SLoC