Skip to content

Partha-SUST16/raft-kv

Repository files navigation

RaftKV: A Distributed Key-Value Store with Raft Consensus

A distributed key-value store built from scratch using the Raft consensus algorithm. This system provides strong consistency guarantees, fault tolerance, and high availability through replicated state machine (RSM) architecture.

Table of Contents

Overview

RaftKV is a distributed key-value store that implements the Raft consensus algorithm to ensure strong consistency across a cluster of nodes. The system provides three core operations: Put, Get, and Delete, all of which are replicated through the Raft log to guarantee linearizability.

Key Characteristics

  • Strong Consistency: All operations are linearizable through Raft log replication
  • Fault Tolerance: Can tolerate up to (N-1)/2 node failures in an N-node cluster
  • Durability: Raft log entries are persisted to disk using Badger DB
  • Leader Election: Automatic leader election and failover
  • Stateless Client: Client library with automatic leader discovery and retry logic
  • Production Ready: Comprehensive error handling, logging, and monitoring

Architecture

The system follows a layered architecture:

┌─────────────────────────────────────────────────────────┐
│                    Client Layer                          │
│  (gRPC Client with Leader Discovery & Retry Logic)      │
└────────────────────┬────────────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────────────┐
│                  gRPC Service Layer                      │
│         (KV Service: Put/Get/Delete Handlers)           │
└────────────────────┬────────────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────────────┐
│            Replicated State Machine (RSM)                │
│  (Proposal Queue, State Management, Result Routing)     │
└────────────────────┬────────────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────────────┐
│                    Raft Layer                            │
│  (Leader Election, Log Replication, Persistence)         │
└────────────────────┬────────────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────────────┐
│              Transport Layer (gRPC)                      │
│         (AppendEntries, RequestVote RPCs)               │
└─────────────────────────────────────────────────────────┘

Request Flow

  1. Client Request: Client sends gRPC request (Put/Get/Delete) to any server
  2. gRPC Handler: Server handler generates unique command ID and enqueues to RSM
  3. RSM Proposal: RSM checks leadership, encodes operation to protobuf, and proposes to Raft
  4. Raft Replication: Leader appends entry to log and broadcasts AppendEntries to followers
  5. Follower Processing: Followers validate consistency and append entries, respond to leader
  6. Commit: Leader advances commit index when majority replicates
  7. Apply: Committed entries delivered to RSM via apply channel
  8. State Update: RSM applies operation to in-memory state map
  9. Result Routing: RSM routes result back to waiting gRPC handler via command ID
  10. Client Response: Handler returns result to client

Features

Core Features

  • Three Operations: Put, Get, Delete with full Raft replication
  • Linearizable Reads: All reads go through Raft log for consistency
  • Automatic Leader Election: Nodes automatically elect a leader when needed
  • Log Replication: Leader replicates entries to all followers with consistency checks
  • Persistent Log: Raft log persisted to Badger DB for durability
  • State Recovery: Nodes recover state from disk on restart

Client Features

  • Leader Discovery: Automatically discovers and caches the current leader
  • Retry Logic: Exponential backoff with jitter for failed requests
  • Size Validation: Enforces 1.5 MiB limit on keys and values
  • Stateless Design: No persistent client state, works with any server configuration
  • CLI Tool: Command-line interface for testing and operations

Reliability Features

  • Fault Tolerance: Handles node failures and network partitions
  • Consistency Guarantees: Log Matching Property ensures log consistency
  • Durability: All committed entries persisted before acknowledgment
  • Error Handling: Comprehensive error handling with leader information propagation

System Components

1. Raft Consensus Module (raft/)

The core Raft implementation providing:

  • Leader Election: Randomized election timeouts (150-300ms) prevent split votes
  • Log Replication: Batched AppendEntries RPCs (up to 256 entries)
  • State Persistence: Metadata and log entries saved to Badger DB
  • State Machine: Manages term, votedFor, log entries, commit index, and match indices

Key Files:

  • raft.go: Core Raft state machine and logic
  • message.go: RPC message handling (AppendEntries, RequestVote)
  • service.go: gRPC service implementation
  • network.go: Transport layer for peer communication

2. Replicated State Machine (rsm/)

Coordinates between client requests and Raft commits:

  • Single Goroutine Design: Ensures sequential processing and eliminates race conditions
  • Proposal Queue: Serializes client requests for backpressure handling
  • Result Routing: Correlates proposals with results using command IDs
  • State Management: In-memory key-value map with thread-safe access

Key Files:

  • rsm.go: Main RSM implementation
  • types.go: Operation and result types

3. KV Service (server/)

gRPC service layer exposing the key-value store API:

  • Request Handling: Generates unique command IDs for each operation
  • Admission Control: Semaphore-based admission control (5120 concurrent requests)
  • Error Propagation: Returns leader information on non-leader errors
  • Timeout Handling: 1-second timeout per request

Key Files:

  • kv_service.go: gRPC service implementation
  • errors.go: Error definitions and handling

4. Client Library (kv_client/)

Go client library with leader discovery and retry logic:

  • Connection Pooling: Maintains gRPC connections to all servers
  • Leader Caching: Caches leader index and term for efficient routing
  • Exponential Backoff: Binary exponential backoff with jitter (100ms base, 5s max)
  • Random Server Selection: When no cached leader exists

Key Files:

  • pkg/client/client.go: Core client implementation
  • cmd/cli/main.go: CLI tool

5. Benchmark Tool (benchmark/)

Performance benchmarking and testing tool:

  • Workload Generation: Write-heavy, read-heavy, mixed, and custom workloads
  • Metrics Collection: Latency, throughput, error rates
  • CSV Reporting: Time-series and summary metrics export
  • Warmup Phase: Optional warmup to stabilize performance

Key Files:

  • main.go: Benchmark runner
  • workload.go: Workload generation logic
  • stats.go: Metrics collection
  • reporter.go: Report generation

Getting Started

Prerequisites

  • Go 1.25.2 or later
  • Docker and Docker Compose (for cluster deployment)
  • Protocol Buffers compiler (protoc)
  • Go plugins for protobuf and gRPC

Installation

  1. Clone the repository:

    git clone <repository-url>
    cd sharded-kv
  2. Install dependencies:

    go mod download
  3. Generate protobuf code:

    export PATH=$PATH:$(go env GOPATH)/bin
    protoc --go_out=. --go-grpc_out=. proto/kvstore/v1/kvstore.proto
    protoc --go_out=. --go-grpc_out=. proto/raft/v1/raft.proto
    protoc --go_out=. proto/rsm/v1/rsm.proto
  4. Build the server:

    go build -o main ./cmd/main.go
  5. Build the client CLI:

    cd kv_client
    go build -o cli ./cmd/cli/

Running the Cluster

The easiest way to run a cluster is using Docker Compose:

docker compose up --build --watch

This starts a 5-node cluster with:

  • Nodes: raft1 through raft5
  • Network: 172.28.0.0/16 subnet
  • Ports: 50051 for gRPC
  • Auto-rebuild on code changes

To stop/start individual nodes:

docker compose stop raft1
docker compose start raft1

Usage

Using the CLI Tool

# Put a key-value pair
./kv_client/cli --servers=172.28.0.11:50051,172.28.0.12:50051 put mykey myvalue

# Get a value
./kv_client/cli --servers=172.28.0.11:50051,172.28.0.12:50051 get mykey

# Delete a key
./kv_client/cli --servers=172.28.0.11:50051,172.28.0.12:50051 delete mykey

Using the Client Library

package main

import (
    "fmt"
    "raftkv-client/pkg/client"
)

func main() {
    addresses := []string{
        "172.28.0.11:50051",
        "172.28.0.12:50051",
        "172.28.0.13:50051",
    }

    kvClient := client.NewClient(addresses)
    defer kvClient.Close()

    // Put operation
    err := kvClient.Put([]byte("key"), []byte("value"))
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }

    // Get operation
    value, err := kvClient.Get([]byte("key"))
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }

    if value == nil {
        fmt.Println("Key not found")
    } else {
        fmt.Printf("Value: %s\n", string(value))
    }

    // Delete operation
    err = kvClient.Delete([]byte("key"))
    if err != nil {
        fmt.Printf("Error: %v\n", err)
    }
}

Configuration

Configuration is managed through TOML files. See sample_config/dev.toml:

host = 0
address = [
    "172.28.0.11:50051",
    "172.28.0.12:50051",
    "172.28.0.13:50051",
    "172.28.0.14:50051",
    "172.28.0.15:50051"
]
debug_level = "info"  # Options: debug, info, warning, error, fatal

Raft Parameters

Currently hardcoded (configurable in future versions):

  • Election Timeout: 150-300ms (randomized)
  • Heartbeat Interval: 10-20ms
  • gRPC Reconnection: 10-50ms (exponential backoff with jitter)
  • Max Batch Size: 256 entries per AppendEntries RPC

Badger DB Configuration

Configured in cmd/main.go:

  • Num Memtables: 8
  • Num Level Zero Tables: 8
  • Num Compactors: 4
  • Compression: None (for performance)
  • Sync Writes: Enabled (for durability)

Performance Benchmarking

The benchmark tool provides comprehensive performance testing:

Running Benchmarks

cd benchmark
go build -o benchmark .

Example Benchmark Commands

# Write-heavy workload (300 seconds, 1 concurrent client)
./benchmark --duration 300s --concurrency 1 --workload write-heavy

# Read-heavy workload
./benchmark --duration 300s --concurrency 1 --workload read-heavy

# Mixed workload
./benchmark --duration 300s --concurrency 1 --workload mixed

# Custom workload with specified ratios
./benchmark --duration 300s --concurrency 1 --workload custom \
    --put-ratio 0.5 --get-ratio 0.3 --delete-ratio 0.2

# With warmup phase
./benchmark --duration 300s --concurrency 1 --workload write-heavy --warmup

Benchmark Output

The tool generates:

  • Console Summary: Throughput, latency percentiles, error rates
  • CSV Summary: benchmark_summary.csv with aggregate metrics
  • Time-Series CSV: benchmark_timeseries.csv with per-second metrics

Metrics Collected

  • Throughput: Operations per second (total and per operation type)
  • Latency: P50, P95, P99, P99.9 percentiles
  • Error Rates: Per operation type
  • Time-Series Data: Per-second metrics for visualization

Testing

Unit Tests

# Test Raft implementation
go test ./raft/ -v

# Test RSM
go test ./rsm/ -v

# Test client library
go test ./kv_client/pkg/client/ -v

Integration Tests

# Run client integration tests (requires running cluster)
KV_INTEGRATION=1 \
KV_SERVERS=172.28.0.11:50051,172.28.0.12:50051,172.28.0.13:50051 \
go test ./kv_client/pkg/client/ -v -run Live

Manual Testing

  1. Start the cluster:

    docker compose up --build
  2. Test basic operations:

    ./kv_client/cli --servers=172.28.0.11:50051 put testkey testvalue
    ./kv_client/cli --servers=172.28.0.11:50051 get testkey
  3. Test fault tolerance:

    # Stop the leader
    docker compose stop raft1
    
    # Operations should continue (new leader elected)
    ./kv_client/cli --servers=172.28.0.12:50051 get testkey
    
    # Restart the node
    docker compose start raft1

Technical Details

Raft Implementation Details

Leader Election

  • Election Timeout: Randomized between 150-300ms to prevent split votes
  • Vote Granting: Followers grant votes based on:
    • No vote cast in current term OR candidate is the same as previously voted
    • Candidate's log is at least as up-to-date (Log Matching Property)
  • Majority Rule: Candidate needs majority of votes (> N/2) to become leader

Log Replication

  • AppendEntries RPC: Leader sends batches of up to 256 entries
  • Consistency Check: Followers validate previous log index and term match
  • Log Matching Property: Ensures logs are consistent across all nodes
  • Commit Advancement: Leader commits entries when majority replicates
  • Term Safety: Only entries from current term can be committed

Persistence

  • Metadata: Term, votedFor, lastLogIdx persisted on every change
  • Log Entries: Persisted with prefix-based keys (log-00000000000000000001)
  • Recovery: On startup, nodes load metadata and reconstruct log from entries
  • Atomic Writes: Badger transactions ensure atomic persistence

RSM Implementation Details

Single Goroutine Design

The RSM uses a single goroutine that selects on:

  • Proposal Queue: Client requests waiting to be proposed
  • Apply Channel: Committed entries from Raft ready to be applied

This ensures:

  • Sequential processing eliminates race conditions
  • No locks needed for state access
  • Predictable ordering of operations

Command ID Correlation

Each client operation gets a unique command ID:

  1. Generated by gRPC handler (8 random bytes, hex encoded)
  2. Included in protobuf log entry
  3. Used to route results back to waiting handler
  4. Cleaned up after result delivery

State Management

  • In-Memory Map: map[string][]byte for key-value storage
  • Thread Safety: Single goroutine owns all state mutations
  • Copy on Read: Values copied before returning to prevent aliasing

Client Implementation Details

Leader Discovery

  • Caching: Client caches leader index and term
  • Invalidation: Cache invalidated on:
    • Higher term received
    • Timeout (after 1 second)
  • Fallback: Random server selection when no cached leader

Retry Logic

  • Per-Server Retries: 3 attempts per server before trying next
  • Exponential Backoff: Binary exponential backoff with jitter
    • Base delay: 100ms
    • Max delay: 5 seconds
    • Jitter: ±25% random variation
  • No Backoff: Immediate retry with new leader for leader discovery

Error Handling

  • Size Validation: Keys and values limited to 1.5 MiB
  • gRPC Errors: Properly handled with retry logic
  • Timeout Handling: 1-second timeout per request
  • Leader Information: Non-leader errors include leader index

Network and Transport

gRPC Configuration

  • Protocol: gRPC over HTTP/2
  • Credentials: Insecure (for development)
  • Timeouts: 50ms for Raft RPCs, 1s for KV operations
  • Connection Pooling: Client maintains connections to all servers

Message Types

  • RequestVote: Used during leader election
  • AppendEntries: Used for log replication and heartbeats
  • KV Operations: Put, Get, Delete requests/responses

Project Structure

sharded-kv/
├── cmd/
│   └── main.go                 # Server entry point
├── raft/                       # Raft consensus implementation
│   ├── raft.go                 # Core Raft state machine
│   ├── message.go              # RPC message handling
│   ├── service.go              # gRPC service implementation
│   ├── network.go              # Transport layer
│   └── utility.go              # Helper functions
├── rsm/                        # Replicated State Machine
│   ├── rsm.go                  # RSM implementation
│   └── types.go                # Operation and result types
├── server/                     # gRPC service layer
│   ├── kv_service.go           # KV service implementation
│   └── errors.go               # Error definitions
├── kv_client/                  # Client library
│   ├── pkg/client/             # Client implementation
│   │   ├── client.go
│   │   └── client_test.go
│   └── cmd/cli/                # CLI tool
│       └── main.go
├── benchmark/                  # Benchmark tool
│   ├── main.go                 # Benchmark runner
│   ├── workload.go             # Workload generation
│   ├── stats.go                # Metrics collection
│   └── reporter.go             # Report generation
├── proto/                      # Protocol buffer definitions
│   ├── kvstore/v1/             # KV service protos
│   ├── raft/v1/                # Raft RPC protos
│   └── rsm/v1/                 # RSM log entry protos
├── config/                     # Configuration handling
│   └── config.go
├── sample_config/              # Example configurations
│   └── dev.toml
├── docker/                     # Docker configuration
│   └── Dockerfile
├── docker-compose.yml          # Cluster orchestration
├── go.mod                      # Go module definition
└── README.md                   # This file

License

See LICENSE file for details.

Acknowledgments

This implementation follows the Raft consensus algorithm as described in "In Search of an Understandable Consensus Algorithm" by Diego Ongaro and John Ousterhout.

About

A distributed key-value store built from scratch using the Raft consensus algorithm. This system provides strong consistency guarantees, fault tolerance, and high availability through replicated state machine (RSM) architecture.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors