tsink is a lightweight, high-performance time-series database engine written in Rust. It provides efficient storage and retrieval of time-series data with automatic compression, time-based partitioning, and thread-safe operations.
- 🚀 High Performance: Gorilla compression achieves ~1.37 bytes per data point
- 🔒 Thread-Safe: Lock-free reads and concurrent writes with configurable worker pools
- 💾 Flexible Storage: Choose between in-memory or persistent disk storage
- 📊 Time Partitioning: Automatic data organization by configurable time ranges
- 🏷️ Label Support: Multi-dimensional metrics with key-value labels
- 📝 WAL Support: Write-ahead logging for durability and crash recovery
- 🗑️ Auto-Retention: Configurable automatic data expiration
- 🐳 Container-Aware: cgroup support for optimal resource usage in containers
- ⚡ Zero-Copy Reads: Memory-mapped files for efficient disk operations
Add tsink to your Cargo.toml
:
[dependencies]
tsink = "0.3.1"
use tsink::{DataPoint, Row, StorageBuilder, Storage, TimestampPrecision};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create storage with default settings
let storage = StorageBuilder::new()
.with_timestamp_precision(TimestampPrecision::Seconds)
.build()?;
// Insert data points
let rows = vec![
Row::new("cpu_usage", DataPoint::new(1600000000, 45.5)),
Row::new("cpu_usage", DataPoint::new(1600000060, 47.2)),
Row::new("cpu_usage", DataPoint::new(1600000120, 46.8)),
];
storage.insert_rows(&rows)?;
// Note: Using timestamp 0 will automatically use the current timestamp
// let row = Row::new("cpu_usage", DataPoint::new(0, 50.0)); // timestamp = current time
// Query data points
let points = storage.select("cpu_usage", &[], 1600000000, 1600000121)?;
for point in points {
println!("Timestamp: {}, Value: {}", point.timestamp, point.value);
}
storage.close()?;
Ok(())
}
use tsink::{StorageBuilder, Storage};
use std::time::Duration;
let storage = StorageBuilder::new()
.with_data_path("./tsink-data") // Enable disk persistence
.with_partition_duration(Duration::from_secs(3600)) // 1-hour partitions
.with_retention(Duration::from_secs(7 * 24 * 3600)) // 7-day retention
.with_wal_buffer_size(8192) // 8KB WAL buffer
.build()?;
use tsink::{DataPoint, Label, Row};
// Create metrics with labels for detailed categorization
let rows = vec![
Row::with_labels(
"http_requests",
vec![
Label::new("method", "GET"),
Label::new("status", "200"),
Label::new("endpoint", "/api/users"),
],
DataPoint::new(1600000000, 150.0),
),
Row::with_labels(
"http_requests",
vec![
Label::new("method", "POST"),
Label::new("status", "201"),
Label::new("endpoint", "/api/users"),
],
DataPoint::new(1600000000, 25.0),
),
];
storage.insert_rows(&rows)?;
// Query specific label combinations
let points = storage.select(
"http_requests",
&[
Label::new("method", "GET"),
Label::new("status", "200"),
],
1600000000,
1600000100,
)?;
// Query all label combinations for a metric
let all_results = storage.select_all("http_requests", 1600000000, 1600000100)?;
for (labels, points) in all_results {
println!("Labels: {:?}, Points: {}", labels, points.len());
}
tsink uses a linear-order partition model that divides time-series data into time-bounded chunks:
┌─────────────────────────────────────────┐
│ tsink Storage │
├─────────────────────────────────────────┤
│ │
│ ┌───────────────┐ Active Partition │
│ │ Memory Part. │◄─ (Writable) │
│ └───────────────┘ │
│ │
│ ┌───────────────┐ Buffer Partition │
│ │ Memory Part. │◄─ (Out-of-order) │
│ └───────────────┘ │
│ │
│ ┌───────────────┐ │
│ │ Disk Part. 1 │◄─ Read-only │
│ └───────────────┘ (Memory-mapped) │
│ │
│ ┌───────────────┐ │
│ │ Disk Part. 2 │◄─ Read-only │
│ └───────────────┘ │
│ ... │
└─────────────────────────────────────────┘
- Active Partition: Accepts new writes, kept in memory
- Buffer Partition: Handles out-of-order writes within recent time window
- Flushing: When active partition is full, it's flushed to disk
- Disk Partitions: Read-only, memory-mapped for efficient queries
- Expiration: Old partitions are automatically removed based on retention
- Fast Queries: Skip irrelevant partitions based on time range
- Efficient Memory: Only recent data stays in RAM
- Low Write Amplification: Sequential writes, no compaction needed
- SSD-Friendly: Minimal random I/O patterns
Option | Description | Default |
---|---|---|
with_data_path |
Directory for persistent storage | None (in-memory) |
with_retention |
How long to keep data | 14 days |
with_timestamp_precision |
Timestamp precision (ns/μs/ms/s) | Nanoseconds |
with_max_writers |
Maximum concurrent write workers | CPU count |
with_write_timeout |
Timeout for write operations | 30 seconds |
with_partition_duration |
Time range per partition | 1 hour |
with_wal_enabled |
Enable write-ahead logging | true |
with_wal_buffer_size |
WAL buffer size in bytes | 4096 |
let storage = StorageBuilder::new()
.with_data_path("/var/lib/tsink")
.with_retention(Duration::from_secs(30 * 24 * 3600)) // 30 days
.with_timestamp_precision(TimestampPrecision::Milliseconds)
.with_max_writers(16)
.with_write_timeout(Duration::from_secs(60))
.with_partition_duration(Duration::from_secs(6 * 3600)) // 6 hours
.with_wal_buffer_size(16384) // 16KB
.build()?;
tsink uses the Gorilla compression algorithm, which is specifically designed for time-series data:
- Delta-of-delta encoding for timestamps
- XOR compression for floating-point values
- Typical compression ratio: ~1.37 bytes per data point
This means a data point that would normally take 16 bytes (8 bytes timestamp + 8 bytes value) is compressed to less than 2 bytes on average.
Benchmarks on AMD Ryzen 7940HS (single core):
Operation | Throughput | Latency |
---|---|---|
Insert single point | 10M ops/sec | ~100ns |
Batch insert (1000) | 15M points/sec | ~67μs/batch |
Select 1K points | 4.5M queries/sec | ~220ns |
Select 1M points | 3.4M queries/sec | ~290ns |
Run benchmarks yourself:
cargo bench
Module | Description |
---|---|
storage |
Main storage engine with builder pattern configuration |
partition |
Time-based data partitioning (memory and disk implementations) |
encoding |
Gorilla compression for efficient time-series storage |
wal |
Write-ahead logging for durability and crash recovery |
label |
Multi-dimensional metric labeling and marshaling |
Module | Description |
---|---|
cgroup |
Container-aware CPU and memory limit detection |
mmap |
Platform-optimized memory-mapped file operations |
concurrency |
Worker pools, semaphores, and rate limiters |
bstream |
Bit-level streaming for compression algorithms |
list |
Thread-safe partition list management |
Module | Description |
---|---|
error |
Comprehensive error types with context |
tsink provides querying capabilities for metrics with labels:
use tsink::{DataPoint, Label, Row};
// Insert metrics with various label combinations
let rows = vec![
Row::with_labels(
"cpu_usage",
vec![Label::new("host", "server1"), Label::new("region", "us-west")],
DataPoint::new(1600000000, 45.5),
),
Row::with_labels(
"cpu_usage",
vec![Label::new("host", "server2"), Label::new("region", "us-east")],
DataPoint::new(1600000000, 52.1),
),
Row::with_labels(
"cpu_usage",
vec![Label::new("host", "server3")], // Different label set
DataPoint::new(1600000000, 48.3),
),
];
storage.insert_rows(&rows)?;
// Method 1: Query with exact label match
let points = storage.select(
"cpu_usage",
&[Label::new("host", "server1"), Label::new("region", "us-west")],
1600000000,
1600000100,
)?;
// Method 2: Query all label combinations (discovers all variations)
let all_results = storage.select_all("cpu_usage", 1600000000, 1600000100)?;
// Process results grouped by labels
for (labels, points) in all_results {
// Find which hosts have the most data points
if let Some(host_label) = labels.iter().find(|l| l.name == "host") {
println!("Host {} has {} data points", host_label.value, points.len());
}
// Aggregate metrics across regions
if let Some(region_label) = labels.iter().find(|l| l.name == "region") {
let avg: f64 = points.iter().map(|p| p.value).sum::<f64>() / points.len() as f64;
println!("Region {} average: {:.2}", region_label.value, avg);
}
}
tsink is designed for high-concurrency scenarios:
use std::thread;
use std::sync::Arc;
let storage = Arc::new(StorageBuilder::new().build()?);
// Spawn multiple writer threads
let mut handles = vec![];
for worker_id in 0..10 {
let storage = storage.clone();
let handle = thread::spawn(move || {
for i in 0..1000 {
let row = Row::new(
"concurrent_metric",
DataPoint::new(1600000000 + i, i as f64),
);
storage.insert_rows(&[row]).unwrap();
}
});
handles.push(handle);
}
// Wait for all threads
for handle in handles {
handle.join().unwrap();
}
tsink handles out-of-order data points automatically:
// Insert data points in random order
let rows = vec![
Row::new("metric", DataPoint::new(1600000500, 5.0)),
Row::new("metric", DataPoint::new(1600000100, 1.0)), // Earlier timestamp
Row::new("metric", DataPoint::new(1600000300, 3.0)),
Row::new("metric", DataPoint::new(1600000200, 2.0)), // Out of order
];
storage.insert_rows(&rows)?;
// Query returns points in correct chronological order
let points = storage.select("metric", &[], 1600000000, 1600001000)?;
assert!(points.windows(2).all(|w| w[0].timestamp <= w[1].timestamp));
tsink automatically detects container resource limits:
// tsink reads cgroup limits automatically
let storage = StorageBuilder::new()
.with_max_writers(0) // 0 = auto-detect from cgroup
.build()?;
// In a container with 2 CPU limit, this will use 2 workers
// even if the host has 16 CPUs
After a crash, tsink automatically recovers from WAL:
// First run - data is written to WAL
let storage = StorageBuilder::new()
.with_data_path("/data/tsink")
.build()?;
storage.insert_rows(&rows)?;
// Crash happens here...
// Next run - data is recovered from WAL automatically
let storage = StorageBuilder::new()
.with_data_path("/data/tsink") // Same path
.build()?; // Recovery happens here
// Previously inserted data is available
let points = storage.select("metric", &[], 0, i64::MAX)?;
Run the comprehensive example showcasing all features:
cargo run --example comprehensive
Other examples:
basic_usage
- Simple insert and query operationspersistent_storage
- Disk-based storage with WALproduction_example
- Production-ready configuration
Run the test suite:
# Run all tests
cargo test
# Run with verbose output
cargo test -- --nocapture
# Run specific test module
cargo test storage::tests
Contributions are welcome! Please feel free to submit a Pull Request.
# Clone the repository
git clone https://github.com/h2337/tsink.git
cd tsink
# Run tests
cargo test
# Run benchmarks
cargo bench
# Check formatting
cargo fmt -- --check
# Run clippy
cargo clippy -- -D warnings
- MIT License