-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
Problem Description
Wire currently uses BadgerDB as its only state storage backend. While BadgerDB is suitable for some use cases, it has limitations for high-throughput stream processing:
- Limited performance for large state (>10GB)
- No native support for column families for organizing different state types
- Memory overhead for large datasets
- Limited compaction control
- No built-in state TTL support
For production deployments, Wire needs a pluggable state backend system that supports multiple storage engines optimized for different use cases.
Proposed Solution
Implement a pluggable state management system with the following features:
- Storage Backend Interface: Abstract interface for state storage operations
- RocksDB Backend: High-performance implementation using RocksDB
- State Partitioning: Support for partitioned state across nodes
- State Migration: Tools to migrate between different backends
- Monitoring: Metrics and health checks for state backends
Implementation Code Structure
// state/backend.go
package state
import (
"context"
"time"
)
type BackendType string
const (
BackendTypeBadger BackendType = "badger"
BackendTypeRocksDB BackendType = "rocksdb"
BackendTypeMemory BackendType = "memory"
)
type BackendConfig struct {
Type BackendType
DataDir string
CacheSize int64
WriteBufferSize int64
MaxOpenFiles int
BlockSize int
Compression CompressionType
EnableTTL bool
StatsTTL time.Duration
// RocksDB specific
RocksDB RocksDBConfig
}
type StateBackend interface {
// Basic operations
Get(ctx context.Context, key []byte) ([]byte, error)
Put(ctx context.Context, key, value []byte) error
Delete(ctx context.Context, key []byte) error
// Batch operations
BatchGet(ctx context.Context, keys [][]byte) ([][]byte, error)
BatchPut(ctx context.Context, entries []KVPair) error
BatchDelete(ctx context.Context, keys [][]byte) error
// Range operations
Scan(ctx context.Context, start, end []byte, limit int) (Iterator, error)
// Transactions
NewTransaction(readOnly bool) (Transaction, error)
// Column families (for organizing state)
CreateColumnFamily(name string, options ColumnFamilyOptions) error
DropColumnFamily(name string) error
GetColumnFamily(name string) (ColumnFamily, error)
// Management
Compact() error
Backup(ctx context.Context, path string) error
Restore(ctx context.Context, path string) error
Stats() (*BackendStats, error)
Close() error
}
type Transaction interface {
Get(key []byte) ([]byte, error)
Put(key, value []byte) error
Delete(key []byte) error
Commit() error
Rollback() error
}
type Iterator interface {
Valid() bool
Next()
Key() []byte
Value() ([]byte, error)
Error() error
Close() error
}
// state/rocksdb/backend.go
package rocksdb
import (
"github.com/linxGnu/grocksdb"
"github.com/rs/zerolog"
"github.com/tarungka/wire/internal/state"
)
type RocksDBConfig struct {
// Performance tuning
MemtableSize int64
TargetFileSizeBase int64
MaxBytesForLevelBase int64
NumLevels int
Level0FileNumTrigger int
// Write options
DisableWAL bool
SyncWrites bool
WriteBufferNumber int
// Read options
BlockCacheSize int64
BloomFilterBits int
EnableStatistics bool
// Compaction
CompactionStyle string
CompactionThreads int
BottommostCompaction string
}
type RocksDBBackend struct {
db *grocksdb.DB
options *grocksdb.Options
readOptions *grocksdb.ReadOptions
writeOptions *grocksdb.WriteOptions
columnFamilies map[string]*grocksdb.ColumnFamilyHandle
cfOptions map[string]*grocksdb.Options
logger zerolog.Logger
metrics *BackendMetrics
closed atomic.Bool
}
func NewRocksDBBackend(config state.BackendConfig) (*RocksDBBackend, error) {
opts := grocksdb.NewDefaultOptions()
// Configure RocksDB options
opts.SetCreateIfMissing(true)
opts.SetMaxOpenFiles(config.MaxOpenFiles)
opts.SetWriteBufferSize(config.RocksDB.MemtableSize)
opts.SetMaxWriteBufferNumber(config.RocksDB.WriteBufferNumber)
opts.SetTargetFileSizeBase(config.RocksDB.TargetFileSizeBase)
opts.SetMaxBytesForLevelBase(config.RocksDB.MaxBytesForLevelBase)
opts.SetNumLevels(config.RocksDB.NumLevels)
// Block cache for reads
bbto := grocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(grocksdb.NewLRUCache(config.RocksDB.BlockCacheSize))
bbto.SetBlockSize(config.BlockSize)
// Bloom filter for faster lookups
if config.RocksDB.BloomFilterBits > 0 {
bbto.SetFilterPolicy(grocksdb.NewBloomFilter(config.RocksDB.BloomFilterBits))
}
opts.SetBlockBasedTableFactory(bbto)
// Compression
compressionTypes := []grocksdb.CompressionType{
grocksdb.NoCompression,
grocksdb.SnappyCompression,
grocksdb.ZSTDCompression,
}
opts.SetCompressionPerLevel(compressionTypes)
// Statistics
if config.RocksDB.EnableStatistics {
opts.EnableStatistics()
}
// Column families
cfNames := []string{"default", "metadata", "watermarks", "checkpoints"}
cfOpts := make([]*grocksdb.Options, len(cfNames))
for i := range cfOpts {
cfOpts[i] = opts
}
// Open database
db, cfHandles, err := grocksdb.OpenDbColumnFamilies(
opts,
config.DataDir,
cfNames,
cfOpts,
)
if err \!= nil {
return nil, fmt.Errorf("open rocksdb: %w", err)
}
backend := &RocksDBBackend{
db: db,
options: opts,
readOptions: grocksdb.NewDefaultReadOptions(),
writeOptions: grocksdb.NewDefaultWriteOptions(),
columnFamilies: make(map[string]*grocksdb.ColumnFamilyHandle),
cfOptions: make(map[string]*grocksdb.Options),
logger: logger.With().Str("backend", "rocksdb").Logger(),
metrics: NewBackendMetrics("rocksdb"),
}
// Map column family handles
for i, name := range cfNames {
backend.columnFamilies[name] = cfHandles[i]
backend.cfOptions[name] = cfOpts[i]
}
// Configure write options
backend.writeOptions.SetSync(config.RocksDB.SyncWrites)
backend.writeOptions.DisableWAL(config.RocksDB.DisableWAL)
// Start background tasks
go backend.runCompaction()
go backend.collectStats()
return backend, nil
}
func (r *RocksDBBackend) Get(ctx context.Context, key []byte) ([]byte, error) {
if r.closed.Load() {
return nil, state.ErrBackendClosed
}
timer := r.metrics.GetLatency.Start()
defer timer.ObserveDuration()
value, err := r.db.Get(r.readOptions, key)
if err \!= nil {
r.metrics.Errors.Inc()
return nil, fmt.Errorf("rocksdb get: %w", err)
}
if \!value.Exists() {
value.Free()
return nil, state.ErrKeyNotFound
}
data := value.Data()
result := make([]byte, len(data))
copy(result, data)
value.Free()
r.metrics.Gets.Inc()
r.metrics.BytesRead.Add(float64(len(result)))
return result, nil
}
func (r *RocksDBBackend) Put(ctx context.Context, key, value []byte) error {
if r.closed.Load() {
return state.ErrBackendClosed
}
timer := r.metrics.PutLatency.Start()
defer timer.ObserveDuration()
err := r.db.Put(r.writeOptions, key, value)
if err \!= nil {
r.metrics.Errors.Inc()
return fmt.Errorf("rocksdb put: %w", err)
}
r.metrics.Puts.Inc()
r.metrics.BytesWritten.Add(float64(len(key) + len(value)))
return nil
}
func (r *RocksDBBackend) NewTransaction(readOnly bool) (state.Transaction, error) {
if r.closed.Load() {
return nil, state.ErrBackendClosed
}
opts := grocksdb.NewDefaultTransactionOptions()
txnDB := r.db.TransactionDB()
if readOnly {
return &RocksDBTransaction{
txn: txnDB.BeginTransaction(r.writeOptions, opts, nil),
readOnly: true,
}, nil
}
return &RocksDBTransaction{
txn: txnDB.BeginTransaction(r.writeOptions, opts, nil),
readOnly: false,
}, nil
}
// State partitioning for scalability
// state/partitioned.go
package state
import (
"hash/fnv"
"sync"
)
type PartitionedBackend struct {
partitions []StateBackend
numParts int
partitioner Partitioner
mu sync.RWMutex
}
type Partitioner interface {
GetPartition(key []byte) int
}
type HashPartitioner struct {
numPartitions int
}
func (h *HashPartitioner) GetPartition(key []byte) int {
hasher := fnv.New32a()
hasher.Write(key)
return int(hasher.Sum32()) % h.numPartitions
}
func NewPartitionedBackend(config BackendConfig, numPartitions int) (*PartitionedBackend, error) {
partitions := make([]StateBackend, numPartitions)
for i := 0; i < numPartitions; i++ {
// Create separate data directory for each partition
partConfig := config
partConfig.DataDir = fmt.Sprintf("%s/partition-%d", config.DataDir, i)
backend, err := CreateBackend(partConfig)
if err \!= nil {
// Cleanup already created partitions
for j := 0; j < i; j++ {
partitions[j].Close()
}
return nil, fmt.Errorf("create partition %d: %w", i, err)
}
partitions[i] = backend
}
return &PartitionedBackend{
partitions: partitions,
numParts: numPartitions,
partitioner: &HashPartitioner{numPartitions: numPartitions},
}, nil
}
func (p *PartitionedBackend) Get(ctx context.Context, key []byte) ([]byte, error) {
partition := p.partitioner.GetPartition(key)
return p.partitions[partition].Get(ctx, key)
}
func (p *PartitionedBackend) Put(ctx context.Context, key, value []byte) error {
partition := p.partitioner.GetPartition(key)
return p.partitions[partition].Put(ctx, key, value)
}
// State migration tool
// state/migration/migrator.go
package migration
import (
"context"
"sync/atomic"
"golang.org/x/sync/errgroup"
)
type Migrator struct {
source state.StateBackend
target state.StateBackend
batchSize int
parallelism int
progress atomic.Int64
total atomic.Int64
}
func NewMigrator(source, target state.StateBackend, config MigratorConfig) *Migrator {
return &Migrator{
source: source,
target: target,
batchSize: config.BatchSize,
parallelism: config.Parallelism,
}
}
func (m *Migrator) Migrate(ctx context.Context) error {
// Count total keys
if err := m.countKeys(ctx); err \!= nil {
return fmt.Errorf("count keys: %w", err)
}
// Create worker pool
g, ctx := errgroup.WithContext(ctx)
keyChan := make(chan [][]byte, m.parallelism)
// Start workers
for i := 0; i < m.parallelism; i++ {
g.Go(func() error {
return m.worker(ctx, keyChan)
})
}
// Scan and distribute keys
g.Go(func() error {
defer close(keyChan)
return m.scanAndDistribute(ctx, keyChan)
})
return g.Wait()
}
func (m *Migrator) worker(ctx context.Context, keyChan <-chan [][]byte) error {
for keys := range keyChan {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Batch get from source
values, err := m.source.BatchGet(ctx, keys)
if err \!= nil {
return fmt.Errorf("batch get: %w", err)
}
// Prepare batch for target
entries := make([]state.KVPair, 0, len(keys))
for i, key := range keys {
if values[i] \!= nil {
entries = append(entries, state.KVPair{
Key: key,
Value: values[i],
})
}
}
// Batch put to target
if err := m.target.BatchPut(ctx, entries); err \!= nil {
return fmt.Errorf("batch put: %w", err)
}
m.progress.Add(int64(len(entries)))
// Log progress
if m.progress.Load()%10000 == 0 {
logger.Info().
Int64("progress", m.progress.Load()).
Int64("total", m.total.Load()).
Float64("percent", float64(m.progress.Load())/float64(m.total.Load())*100).
Msg("migration progress")
}
}
return nil
}
// Integration with Wire components
// Modify store.go to use pluggable backends
func NewNodeStore(config StoreConfig) (*NodeStore, error) {
// Create state backend based on configuration
backend, err := state.CreateBackend(config.StateBackend)
if err \!= nil {
return nil, fmt.Errorf("create state backend: %w", err)
}
store := &NodeStore{
stateBackend: backend,
// ... other fields
}
// Use backend for all state operations
return store, nil
}Implementation Steps
-
Define Backend Interface (Week 1)
- Create abstract StateBackend interface
- Define transaction and iterator interfaces
- Implement error types and metrics
-
RocksDB Implementation (Week 2)
- Integrate grocksdb library
- Implement all backend operations
- Add column family support
- Configure performance options
-
Partitioned Backend (Week 1)
- Implement partitioning strategy
- Create hash-based partitioner
- Handle cross-partition operations
-
State Migration Tool (Week 1)
- Build migration framework
- Implement parallel migration
- Add progress tracking
-
BadgerDB Adapter (Week 1)
- Adapt existing BadgerDB to new interface
- Ensure backward compatibility
- Add migration path
-
Integration (Week 1)
- Update NodeStore to use backends
- Modify pipeline state management
- Update configuration
-
Testing and Benchmarking (Week 1)
- Unit tests for all backends
- Performance benchmarks
- Migration testing
Acceptance Criteria
- RocksDB backend fully implements StateBackend interface
- Performance improvement of at least 2x for large state (>10GB)
- State can be partitioned across multiple backend instances
- Zero-downtime migration between different backends
- Column families properly organize different state types
- Compaction can be controlled and monitored
- Memory usage remains stable under high load
- Backup and restore operations work correctly
- Metrics exposed for all backend operations
- Documentation covers backend selection and tuning
Testing Requirements
-
Unit Tests
- All backend operations tested
- Transaction correctness verified
- Iterator behavior validated
-
Integration Tests
- Backend switching without data loss
- Concurrent access patterns
- Failure recovery scenarios
-
Performance Tests
- Benchmark vs BadgerDB baseline
- Large state handling (100GB+)
- Write/read throughput under load
-
Stress Tests
- Long-running stability tests
- Memory leak detection
- Concurrent access patterns
Documentation Requirements
- Backend selection guide
- Performance tuning guide for each backend
- Migration procedures
- Monitoring and troubleshooting guide
Performance Targets
- Read latency: < 1ms p99
- Write latency: < 5ms p99
- Throughput: > 100k ops/sec per node
- State size: Support up to 1TB per node
- Recovery time: < 5 minutes for 100GB state
Dependencies
- grocksdb for RocksDB bindings
- RocksDB C++ library must be installed on deployment systems
- Sufficient disk I/O performance for target throughput
References
🤖 Generated with Claude Code
Co-Authored-By: Claude noreply@anthropic.com
Metadata
Metadata
Assignees
Labels
No labels