Skip to content

Implement Pluggable State Management Backend with RocksDB Support #110

@tarungka

Description

@tarungka

Problem Description

Wire currently uses BadgerDB as its only state storage backend. While BadgerDB is suitable for some use cases, it has limitations for high-throughput stream processing:

  • Limited performance for large state (>10GB)
  • No native support for column families for organizing different state types
  • Memory overhead for large datasets
  • Limited compaction control
  • No built-in state TTL support

For production deployments, Wire needs a pluggable state backend system that supports multiple storage engines optimized for different use cases.

Proposed Solution

Implement a pluggable state management system with the following features:

  1. Storage Backend Interface: Abstract interface for state storage operations
  2. RocksDB Backend: High-performance implementation using RocksDB
  3. State Partitioning: Support for partitioned state across nodes
  4. State Migration: Tools to migrate between different backends
  5. Monitoring: Metrics and health checks for state backends

Implementation Code Structure

// state/backend.go
package state

import (
    "context"
    "time"
)

type BackendType string

const (
    BackendTypeBadger  BackendType = "badger"
    BackendTypeRocksDB BackendType = "rocksdb"
    BackendTypeMemory  BackendType = "memory"
)

type BackendConfig struct {
    Type            BackendType
    DataDir         string
    CacheSize       int64
    WriteBufferSize int64
    MaxOpenFiles    int
    BlockSize       int
    Compression     CompressionType
    EnableTTL       bool
    StatsTTL        time.Duration
    
    // RocksDB specific
    RocksDB         RocksDBConfig
}

type StateBackend interface {
    // Basic operations
    Get(ctx context.Context, key []byte) ([]byte, error)
    Put(ctx context.Context, key, value []byte) error
    Delete(ctx context.Context, key []byte) error
    
    // Batch operations
    BatchGet(ctx context.Context, keys [][]byte) ([][]byte, error)
    BatchPut(ctx context.Context, entries []KVPair) error
    BatchDelete(ctx context.Context, keys [][]byte) error
    
    // Range operations
    Scan(ctx context.Context, start, end []byte, limit int) (Iterator, error)
    
    // Transactions
    NewTransaction(readOnly bool) (Transaction, error)
    
    // Column families (for organizing state)
    CreateColumnFamily(name string, options ColumnFamilyOptions) error
    DropColumnFamily(name string) error
    GetColumnFamily(name string) (ColumnFamily, error)
    
    // Management
    Compact() error
    Backup(ctx context.Context, path string) error
    Restore(ctx context.Context, path string) error
    Stats() (*BackendStats, error)
    Close() error
}

type Transaction interface {
    Get(key []byte) ([]byte, error)
    Put(key, value []byte) error
    Delete(key []byte) error
    Commit() error
    Rollback() error
}

type Iterator interface {
    Valid() bool
    Next()
    Key() []byte
    Value() ([]byte, error)
    Error() error
    Close() error
}

// state/rocksdb/backend.go
package rocksdb

import (
    "github.com/linxGnu/grocksdb"
    "github.com/rs/zerolog"
    "github.com/tarungka/wire/internal/state"
)

type RocksDBConfig struct {
    // Performance tuning
    MemtableSize          int64
    TargetFileSizeBase    int64
    MaxBytesForLevelBase  int64
    NumLevels             int
    Level0FileNumTrigger  int
    
    // Write options
    DisableWAL            bool
    SyncWrites           bool
    WriteBufferNumber    int
    
    // Read options
    BlockCacheSize       int64
    BloomFilterBits      int
    EnableStatistics     bool
    
    // Compaction
    CompactionStyle      string
    CompactionThreads    int
    BottommostCompaction string
}

type RocksDBBackend struct {
    db            *grocksdb.DB
    options       *grocksdb.Options
    readOptions   *grocksdb.ReadOptions
    writeOptions  *grocksdb.WriteOptions
    
    columnFamilies map[string]*grocksdb.ColumnFamilyHandle
    cfOptions      map[string]*grocksdb.Options
    
    logger        zerolog.Logger
    metrics       *BackendMetrics
    
    closed        atomic.Bool
}

func NewRocksDBBackend(config state.BackendConfig) (*RocksDBBackend, error) {
    opts := grocksdb.NewDefaultOptions()
    
    // Configure RocksDB options
    opts.SetCreateIfMissing(true)
    opts.SetMaxOpenFiles(config.MaxOpenFiles)
    opts.SetWriteBufferSize(config.RocksDB.MemtableSize)
    opts.SetMaxWriteBufferNumber(config.RocksDB.WriteBufferNumber)
    opts.SetTargetFileSizeBase(config.RocksDB.TargetFileSizeBase)
    opts.SetMaxBytesForLevelBase(config.RocksDB.MaxBytesForLevelBase)
    opts.SetNumLevels(config.RocksDB.NumLevels)
    
    // Block cache for reads
    bbto := grocksdb.NewDefaultBlockBasedTableOptions()
    bbto.SetBlockCache(grocksdb.NewLRUCache(config.RocksDB.BlockCacheSize))
    bbto.SetBlockSize(config.BlockSize)
    
    // Bloom filter for faster lookups
    if config.RocksDB.BloomFilterBits > 0 {
        bbto.SetFilterPolicy(grocksdb.NewBloomFilter(config.RocksDB.BloomFilterBits))
    }
    
    opts.SetBlockBasedTableFactory(bbto)
    
    // Compression
    compressionTypes := []grocksdb.CompressionType{
        grocksdb.NoCompression,
        grocksdb.SnappyCompression,
        grocksdb.ZSTDCompression,
    }
    opts.SetCompressionPerLevel(compressionTypes)
    
    // Statistics
    if config.RocksDB.EnableStatistics {
        opts.EnableStatistics()
    }
    
    // Column families
    cfNames := []string{"default", "metadata", "watermarks", "checkpoints"}
    cfOpts := make([]*grocksdb.Options, len(cfNames))
    for i := range cfOpts {
        cfOpts[i] = opts
    }
    
    // Open database
    db, cfHandles, err := grocksdb.OpenDbColumnFamilies(
        opts,
        config.DataDir,
        cfNames,
        cfOpts,
    )
    if err \!= nil {
        return nil, fmt.Errorf("open rocksdb: %w", err)
    }
    
    backend := &RocksDBBackend{
        db:             db,
        options:        opts,
        readOptions:    grocksdb.NewDefaultReadOptions(),
        writeOptions:   grocksdb.NewDefaultWriteOptions(),
        columnFamilies: make(map[string]*grocksdb.ColumnFamilyHandle),
        cfOptions:      make(map[string]*grocksdb.Options),
        logger:         logger.With().Str("backend", "rocksdb").Logger(),
        metrics:        NewBackendMetrics("rocksdb"),
    }
    
    // Map column family handles
    for i, name := range cfNames {
        backend.columnFamilies[name] = cfHandles[i]
        backend.cfOptions[name] = cfOpts[i]
    }
    
    // Configure write options
    backend.writeOptions.SetSync(config.RocksDB.SyncWrites)
    backend.writeOptions.DisableWAL(config.RocksDB.DisableWAL)
    
    // Start background tasks
    go backend.runCompaction()
    go backend.collectStats()
    
    return backend, nil
}

func (r *RocksDBBackend) Get(ctx context.Context, key []byte) ([]byte, error) {
    if r.closed.Load() {
        return nil, state.ErrBackendClosed
    }
    
    timer := r.metrics.GetLatency.Start()
    defer timer.ObserveDuration()
    
    value, err := r.db.Get(r.readOptions, key)
    if err \!= nil {
        r.metrics.Errors.Inc()
        return nil, fmt.Errorf("rocksdb get: %w", err)
    }
    
    if \!value.Exists() {
        value.Free()
        return nil, state.ErrKeyNotFound
    }
    
    data := value.Data()
    result := make([]byte, len(data))
    copy(result, data)
    value.Free()
    
    r.metrics.Gets.Inc()
    r.metrics.BytesRead.Add(float64(len(result)))
    
    return result, nil
}

func (r *RocksDBBackend) Put(ctx context.Context, key, value []byte) error {
    if r.closed.Load() {
        return state.ErrBackendClosed
    }
    
    timer := r.metrics.PutLatency.Start()
    defer timer.ObserveDuration()
    
    err := r.db.Put(r.writeOptions, key, value)
    if err \!= nil {
        r.metrics.Errors.Inc()
        return fmt.Errorf("rocksdb put: %w", err)
    }
    
    r.metrics.Puts.Inc()
    r.metrics.BytesWritten.Add(float64(len(key) + len(value)))
    
    return nil
}

func (r *RocksDBBackend) NewTransaction(readOnly bool) (state.Transaction, error) {
    if r.closed.Load() {
        return nil, state.ErrBackendClosed
    }
    
    opts := grocksdb.NewDefaultTransactionOptions()
    txnDB := r.db.TransactionDB()
    
    if readOnly {
        return &RocksDBTransaction{
            txn:      txnDB.BeginTransaction(r.writeOptions, opts, nil),
            readOnly: true,
        }, nil
    }
    
    return &RocksDBTransaction{
        txn:      txnDB.BeginTransaction(r.writeOptions, opts, nil),
        readOnly: false,
    }, nil
}

// State partitioning for scalability
// state/partitioned.go
package state

import (
    "hash/fnv"
    "sync"
)

type PartitionedBackend struct {
    partitions []StateBackend
    numParts   int
    
    partitioner Partitioner
    mu          sync.RWMutex
}

type Partitioner interface {
    GetPartition(key []byte) int
}

type HashPartitioner struct {
    numPartitions int
}

func (h *HashPartitioner) GetPartition(key []byte) int {
    hasher := fnv.New32a()
    hasher.Write(key)
    return int(hasher.Sum32()) % h.numPartitions
}

func NewPartitionedBackend(config BackendConfig, numPartitions int) (*PartitionedBackend, error) {
    partitions := make([]StateBackend, numPartitions)
    
    for i := 0; i < numPartitions; i++ {
        // Create separate data directory for each partition
        partConfig := config
        partConfig.DataDir = fmt.Sprintf("%s/partition-%d", config.DataDir, i)
        
        backend, err := CreateBackend(partConfig)
        if err \!= nil {
            // Cleanup already created partitions
            for j := 0; j < i; j++ {
                partitions[j].Close()
            }
            return nil, fmt.Errorf("create partition %d: %w", i, err)
        }
        
        partitions[i] = backend
    }
    
    return &PartitionedBackend{
        partitions:  partitions,
        numParts:    numPartitions,
        partitioner: &HashPartitioner{numPartitions: numPartitions},
    }, nil
}

func (p *PartitionedBackend) Get(ctx context.Context, key []byte) ([]byte, error) {
    partition := p.partitioner.GetPartition(key)
    return p.partitions[partition].Get(ctx, key)
}

func (p *PartitionedBackend) Put(ctx context.Context, key, value []byte) error {
    partition := p.partitioner.GetPartition(key)
    return p.partitions[partition].Put(ctx, key, value)
}

// State migration tool
// state/migration/migrator.go
package migration

import (
    "context"
    "sync/atomic"
    "golang.org/x/sync/errgroup"
)

type Migrator struct {
    source      state.StateBackend
    target      state.StateBackend
    
    batchSize   int
    parallelism int
    
    progress    atomic.Int64
    total       atomic.Int64
}

func NewMigrator(source, target state.StateBackend, config MigratorConfig) *Migrator {
    return &Migrator{
        source:      source,
        target:      target,
        batchSize:   config.BatchSize,
        parallelism: config.Parallelism,
    }
}

func (m *Migrator) Migrate(ctx context.Context) error {
    // Count total keys
    if err := m.countKeys(ctx); err \!= nil {
        return fmt.Errorf("count keys: %w", err)
    }
    
    // Create worker pool
    g, ctx := errgroup.WithContext(ctx)
    keyChan := make(chan [][]byte, m.parallelism)
    
    // Start workers
    for i := 0; i < m.parallelism; i++ {
        g.Go(func() error {
            return m.worker(ctx, keyChan)
        })
    }
    
    // Scan and distribute keys
    g.Go(func() error {
        defer close(keyChan)
        return m.scanAndDistribute(ctx, keyChan)
    })
    
    return g.Wait()
}

func (m *Migrator) worker(ctx context.Context, keyChan <-chan [][]byte) error {
    for keys := range keyChan {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
        
        // Batch get from source
        values, err := m.source.BatchGet(ctx, keys)
        if err \!= nil {
            return fmt.Errorf("batch get: %w", err)
        }
        
        // Prepare batch for target
        entries := make([]state.KVPair, 0, len(keys))
        for i, key := range keys {
            if values[i] \!= nil {
                entries = append(entries, state.KVPair{
                    Key:   key,
                    Value: values[i],
                })
            }
        }
        
        // Batch put to target
        if err := m.target.BatchPut(ctx, entries); err \!= nil {
            return fmt.Errorf("batch put: %w", err)
        }
        
        m.progress.Add(int64(len(entries)))
        
        // Log progress
        if m.progress.Load()%10000 == 0 {
            logger.Info().
                Int64("progress", m.progress.Load()).
                Int64("total", m.total.Load()).
                Float64("percent", float64(m.progress.Load())/float64(m.total.Load())*100).
                Msg("migration progress")
        }
    }
    
    return nil
}

// Integration with Wire components
// Modify store.go to use pluggable backends
func NewNodeStore(config StoreConfig) (*NodeStore, error) {
    // Create state backend based on configuration
    backend, err := state.CreateBackend(config.StateBackend)
    if err \!= nil {
        return nil, fmt.Errorf("create state backend: %w", err)
    }
    
    store := &NodeStore{
        stateBackend: backend,
        // ... other fields
    }
    
    // Use backend for all state operations
    return store, nil
}

Implementation Steps

  1. Define Backend Interface (Week 1)

    • Create abstract StateBackend interface
    • Define transaction and iterator interfaces
    • Implement error types and metrics
  2. RocksDB Implementation (Week 2)

    • Integrate grocksdb library
    • Implement all backend operations
    • Add column family support
    • Configure performance options
  3. Partitioned Backend (Week 1)

    • Implement partitioning strategy
    • Create hash-based partitioner
    • Handle cross-partition operations
  4. State Migration Tool (Week 1)

    • Build migration framework
    • Implement parallel migration
    • Add progress tracking
  5. BadgerDB Adapter (Week 1)

    • Adapt existing BadgerDB to new interface
    • Ensure backward compatibility
    • Add migration path
  6. Integration (Week 1)

    • Update NodeStore to use backends
    • Modify pipeline state management
    • Update configuration
  7. Testing and Benchmarking (Week 1)

    • Unit tests for all backends
    • Performance benchmarks
    • Migration testing

Acceptance Criteria

  • RocksDB backend fully implements StateBackend interface
  • Performance improvement of at least 2x for large state (>10GB)
  • State can be partitioned across multiple backend instances
  • Zero-downtime migration between different backends
  • Column families properly organize different state types
  • Compaction can be controlled and monitored
  • Memory usage remains stable under high load
  • Backup and restore operations work correctly
  • Metrics exposed for all backend operations
  • Documentation covers backend selection and tuning

Testing Requirements

  1. Unit Tests

    • All backend operations tested
    • Transaction correctness verified
    • Iterator behavior validated
  2. Integration Tests

    • Backend switching without data loss
    • Concurrent access patterns
    • Failure recovery scenarios
  3. Performance Tests

    • Benchmark vs BadgerDB baseline
    • Large state handling (100GB+)
    • Write/read throughput under load
  4. Stress Tests

    • Long-running stability tests
    • Memory leak detection
    • Concurrent access patterns

Documentation Requirements

  • Backend selection guide
  • Performance tuning guide for each backend
  • Migration procedures
  • Monitoring and troubleshooting guide

Performance Targets

  • Read latency: < 1ms p99
  • Write latency: < 5ms p99
  • Throughput: > 100k ops/sec per node
  • State size: Support up to 1TB per node
  • Recovery time: < 5 minutes for 100GB state

Dependencies

  • grocksdb for RocksDB bindings
  • RocksDB C++ library must be installed on deployment systems
  • Sufficient disk I/O performance for target throughput

References

🤖 Generated with Claude Code

Co-Authored-By: Claude noreply@anthropic.com

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions