A JVM implementation of the Raft distributed consensus algorithm written in Scala. CKite is a consensus library
with an easy to use API intended to be used by distributed applications needing consensus agreement.
It is designed to be agnostic of both the mechanism used to exchange messages between members (RPC)
and the medium to store the Log (Storage)
. CKite has a modular architecture with pluggable RPC
and Storage
implementations. Custom RPCs and Storages can be easily implemented and configured to be used by CKite.
CKite covers all the major topics of Raft including leader election, log replication, log compaction and cluster membership changes. It currently has two implemented modules:
- ckite-finagle: Finagle based RPC module
- ckite-mapdb: MapDB based Storage module
Checkout the latest Release 0.2.1 following the instructions detailed below to start playing with it.
- Leader Election
- Log Replication
- Cluster Membership Changes
- Log Compaction
- Twitter Finagle integration
- MapDB integration
-
ckite-core
- The core of the library. It implements the Raft consensus protocol. It can be configured with RPCs and Storages. -
ckite-finagle
- Twitter Finagle based RPC implementation. It uses a Thrift protocol to exchange Raft messages between members. -
ckite-mapdb
- MapDB based storage implementation. MapDB provides concurrent Maps, Sets and Queues backed by disk storage or off-heap-memory. It is a fast and easy to use embedded Java database engine.
Comming soon: ckite-chronicle, ckite-akka.
The latest release 0.2.1 is in Maven central. Add the following sbt dependency to your project settings:
libraryDependencies += "io.ckite" %% "ckite-core" % "0.2.1"
libraryDependencies += "io.ckite" %% "ckite-finagle" % "0.2.1"
libraryDependencies += "io.ckite" %% "ckite-mapdb" % "0.2.1"
Add the following maven dependency to your pom.xml:
<dependency>
<groupId>io.ckite</groupId>
<artifactId>ckite-core</artifactId>
<version>0.2.1</version>
</dependency>
Example (See KVStore)
//KVStore is an in-memory distributed Map allowing Puts and Gets operations
class KVStore extends StateMachine {
private var map = Map[String, String]()
private var lastIndex: Long = 0
//Called when a consensus has been reached for a WriteCommand
//index associated to the write is provided to implement your own persistent semantics
//see lastAppliedIndex
def applyWrite = {
case (index, Put(key: String, value: String)) => {
map.put(key, value)
lastIndex = index
value
}
}
//called when a read command has been received
def applyRead = {
case Get(key) => map.get(key)
}
//CKite needs to know the last applied write on log replay to
//provide exactly-once semantics
//If no persistence is needed then state machines can just return zero
def getLastAppliedIndex: Long = lastIndex
//called during Log replay on startup and upon installSnapshot requests
def restoreSnapshot(byteBuffer: ByteBuffer) = {
map = Serializer.deserialize[Map[String, String]](byteBuffer.array())
}
//called when Log compaction is required
def takeSnapshot(): ByteBuffer = ByteBuffer.wrap(Serializer.serialize(map))
}
//WriteCommands are replicated under Raft rules
case class Put(key: String, value: String) extends WriteCommand[String]
//ReadCommands are not replicated but forwarded to the Leader
case class Get(key: String) extends ReadCommand[Option[String]]
val ckite = CKiteBuilder().listenAddress("node1:9091").rpc(FinagleThriftRpc) //Finagle based transport
.stateMachine(new KVStore()) //KVStore is an implementation of the StateMachine trait
.bootstrap(true) //bootstraps a new cluster. only needed just the first time for the very first node
.build
val ckite = CKiteBuilder().listenAddress("localhost:9091").rpc(FinagleThriftRpc)
.members(Seq("localhost:9092","localhost:9093")) //optional seeds to join the cluster
.minElectionTimeout(1000).maxElectionTimeout(1500) //optional
.heartbeatsPeriod(250) //optional. period to send heartbeats interval when being Leader
.dataDir("/home/ckite/data") //dataDir for persistent state (log, terms, snapshots, etc...)
.stateMachine(new KVStore()) //KVStore is an implementation of the StateMachine trait
.sync(false) //disables log sync to disk
.flushSize(10) //max batch size when flushing log to disk
.build
ckite.start()
//this Put command is forwarded to the Leader and applied under Raft rules
val writeFuture:Future[String] = ckite.write(Put("key1","value1"))
//consistent read commands are forwarded to the Leader
val readFuture:Future[Option[String]] = ckite.read(Get("key1"))
//as write commands, cluster membership changes are forwarded to the Leader
ckite.addMember("someHost:9094")
//as write commands, cluster membership changes are forwarded to the Leader
ckite.removeMember("someHost:9094")
//alternatively you can read from its local state machine allowing possible stale values
val value = ckite.readLocal(Get("key1"))
//if necessary waits for elections to end
ckite.isLeader()
ckite.stop()
To start a new cluster you have to run the very first node turning on the bootstrap parameter. This will create an initial configuration with just the first node. The next nodes starts by pointing to the existing ones to join the cluster. You can bootstrap the first node using the builder, overriding ckite.bootstrap in your application.conf or by starting your application with a system property -Dckite.bootstrap=true. See KVStore for more details.
val ckite = CKiteBuilder().listenAddress("node1:9091").rpc(FinagleThriftRpc)
.dataDir("/home/ckite/data") //dataDir for persistent state (log, terms, snapshots, etc...)
.stateMachine(new KVStore()) //KVStore is an implementation of the StateMachine trait
.bootstrap(true) //bootstraps a new cluster. only needed just the first time for the very first node
.build
- Built in Scala 2.11.7 and JDK 8.
- Twitter Finagle.
- Thrift.
- Twitter Scrooge.
- MapDB
- Kryo
- Chronicle (to be implemented)
Feel free to contribute to CKite!. Any kind of help will be very welcome. We are happy to receive pull requests, issues, discuss implementation details, analyze the raft algorithm and whatever it makes CKite a better library. Checkout the issues. You can start from there!
To generate the necessary IDE config files first run the following command and then open the project as usual:
sbt gen-idea
To generate the necessary IDE config files first run the following command and then open the project as usual:
sbt eclipse