16 releases (9 breaking)
Uses new Rust 2024
| new 0.10.0 | May 15, 2026 |
|---|---|
| 0.9.0 | Mar 25, 2026 |
| 0.8.0 | Mar 6, 2026 |
| 0.6.1 | Dec 8, 2025 |
| 0.1.0 | Jul 18, 2025 |
#1077 in Concurrency
58 downloads per month
460KB
9K
SLoC
Samod
samod is a library for building collaborative applications which work offlne
and don't require servers (though servers certainly can be useful). This is
achieved by representing data as automerge
documents. samod is wire compatible with the automerge-repo JavaScript library.
What does all that mean?
samod helps you manage automerge "documents", which are hierarchical data
structures composed of maps, lists, text, and primitive values - a little
like JSON. Every change you make to a document is recorded and you can move
back and forth through the history of a document - it's a bit like Git for
JSON. samod takes care of storing changes for you and synchronizing them
with connected peers. The interesting part is that given this very detailed
history which we never discard, we can merge documents with changes which
were made concurrently. This means that we can build applications which
allow multiple users to edit the same document without having to have all
changes go through a server.
How it all works
The library is structured around a Repo, which talks to a Storage
instance and to which you can connect to other peers using
Repo::connect. Once you have a Repo you can
create documents using Repo::create, or look up existing docuements using
Repo::find. In either case you will get back a DocHandle which you can
use to interact with the document.
Typically then, your workflow will look like this:
- Initialize a
Repoat application startup, passing it aRuntimeHandleimplementation andStorageimplementation - Whenever you have connections available (maybe you are connecting to a
sync server, maybe you are receiving peer-to-peer connections) you call
Repo::connectto add connections to the repo. - Create
DocHandles usingRepo::createand look up existing documents usingRepo::find - Modify documents using
DocHandle::with_document
Let's walk through each of those steps.
Initializing a Repo
To initialize a Repo you call [Repo::builder()] to obtain a
RepoBuilder which you use to configure the repo before calling [RepoBuilder::load()]
to actually load the repository. For example:
let repo = samod::Repo::builder(tokio::runtime::Handle::current())
.with_storage(samod::storage::InMemoryStorage::new())
.load()
.await;
})
The first argument to builder is an implementation of RuntimeHandle.
Default implementations are provided for tokio and gio which can be
conveniently used via Repo::build_tokio and Repo::build_gio
respectively. The RuntimeHandle trait is straightforward to implement if
you want to use some other async runtime.
By default samod uses an in-memory storage implementation. This is great
for prototyping but in most cases you do actually want to persist data somewhere.
In this case you'll need an implementation of Storage to pass to
RepoBuilder::with_storage
It is possible to use Storage and AnnouncePolicy implementations which
do not produce Send futures. In this case you will also need a runtime which
can spawn non-Send futures. See the runtimes section for more
details.
Connecting to peers
Connections are managed via dialers and acceptors (listeners).
- A dialer actively connects to a remote endpoint and automatically
reconnects with exponential backoff. Create one with [
Repo::dial()], which takes aBackoffConfigand anArc<dyn Dialer> Repo::findwill wait for any dialers which are in the process of connection to either establish a connection, or start retrying, before marking a document as unavailable. This means that as long as you add theDialerbefore callingRepo::findyou won't have to coordinate the timing of your dialers and document lookups.- An acceptor passively accepts inbound connections. Create one with
[
Repo::make_acceptor()], then feed accepted transports via [AcceptorHandle::accept()].
// In this case we use the built in ChannelDialer which is useful for simple tests
// more useful implementations wrap actual network transports
let bob: Repo = todo!();
let alice: Repo = todo!();
let url = url::Url::parse("samod-channel://my-channel").unwrap();
let acceptor = bob.make_acceptor(url).unwrap();
let channel_dialer = ChannelDialer::new(acceptor);
let dialer_handle = alice.dial(BackoffConfig::default(), Arc::new(channel_dialer)).unwrap();
// Wait for the first successful connection + handshake
let peer_info = dialer_handle.established().await.unwrap();
Dialer handles
The DialerHandle returned by [Repo::dial()] can be used to:
- Wait for the first successful connection with [
DialerHandle::established()] - Observe lifecycle events (connect, disconnect, retry, failure) with
[
DialerHandle::events()] - Check if connected with [
DialerHandle::is_connected()] - Close the dialer with [
DialerHandle::close()]
Acceptor handles
The AcceptorHandle returned by [Repo::make_acceptor()] can be used to:
- Accept inbound connections with [
AcceptorHandle::accept()] - Observe lifecycle events with [
AcceptorHandle::events()] - Check connection count with [
AcceptorHandle::connection_count()] - Close the acceptor with [
AcceptorHandle::close()]
Managing Documents
Once you have a Repo you can use it to manage DocHandles. A
DocHandle represents an automerge document which the Repo
is managing. "managing" here means a few things:
- Any changes made to the document using
DocHandle::with_documentwill be persisted to storage and synchronized with connected peers (subject to theAnnouncePolicy). - Any changes received from connected peers will be applied to the
document and made visible to the application. You can listen for
these changes using
DocHandle::changes.
To create a new document you use Repo::create which will return
once the document has been persisted to storage. To look up an existing
document you use Repo::find. This will first look in storage, then
if the document is not found in storage it will request the document
from all connected peers (again subject to the AnnouncePolicy). If
any peer has the document the future returned by Repo::find will
resolve once we have synchronized with at least one remote peer which
has the document.
You can make changes to a document using DocHandle::with_document.
Announce Policies
By default, samod will announce all the DocHandles it is synchronizing
to all connected peers and will also send requests to any connected peers
when you call Repo::find. This is often not what you want. To customize
this logic you pass an implementation of AnnouncePolicy to
RepoBuilder::with_announce_policy. Note that AnnouncePolicy is implemented
for Fn(&DocumentId) -> bool so you can just pass a closure if you want.
let authorized_peer = samod::PeerId::from("alice");
let repo = samod::Repo::build_tokio().with_announce_policy(move |_doc_id, peer_id| {
// Only announce documents to alice
&peer_id == &authorized_peer
}).load().await;
Runtimes
RuntimeHandle is a trait which is intended to abstract over the various
runtimes available in the rust ecosystem. The most common runtime is tokio.
tokio is a work-stealing runtime which means that the futures spawned on it
must be Send, so that they can be moved between threads. This means that
RuntimeHandle::spawn requires Send futures. This in turn means that
the futures returned by the Storage and AnnouncePolicy traits are
also Send so that they can be spawned onto the RuntimeHandle.
In many cases though, you may have a runtime which doesn't require Send
futures and you may have storage and announce policy implementations which
cannot produce Send futures. This would often be the case in single
threaded runtimes for example. In these cases you can instead implement
LocalRuntimeHandle, which doesn't require Send futures and then
you implement LocalStorage and LocalAnnouncePolicy traits for
your storage and announce policy implementations. You configure all these
things via the RepoBuilder struct. Once you've configured the storage
and announce policy implementations to use local variants you can then
create a local Repo using RepoBuilder::load_local.
Concurrency
Typically samod will be managing many documents. One for each DocHandle
you retrieve via Repo::create or Repo::find but also one for any
sync messages received about a particular document from remote peers (e.g.
a sync server would have no DocHandles open but would still be running
many document processes). By default document tasks will be handled on the
async runtime provided to the RepoBuilder but this can be undesirable.
Document operations can be compute intensive and so responsiveness may
benefit from running them on a separate thread pool. This is the purpose
of the RepoBuilder::with_concurrency method, which allows you to
configure how document operations are processed. If you want to use the
threadpool approach you will need to enable the threadpool feature.
Why not just Automerge?
automerge is a low level library. It provides routines for manipulating
documents in memory and an abstract data sync protocol. It does not actually
hook this up to any kind of network or storage. Most of the work involved
in doing this plumbing is straightforward, but if every application does
it themselves, we don't end up with interoperable applications. In particular
we don't end up with fungible sync servers. One of the core goals of this
library is to allow application authors to be agnostic as to where the
user synchronises data by implementing a generic network and storage layer
which all applications can use.
Example
Here's a somewhat fully featured example of using samod to manage a todo list
across two repos (representing two different devices):
use automerge::{ReadDoc, transaction::{Transactable as _}};
use futures::StreamExt as _;
use samod::{BackoffConfig, transport::channel::ChannelDialer};
use std::sync::Arc;
// Create two repos, representing two different devices
let alice = samod::Repo::build_tokio().load().await;
let bob = samod::Repo::build_tokio().load().await;
// Create an initial skeleton for our todo list on alice
let mut initial_doc = automerge::Automerge::new();
initial_doc.transact::<_, _, automerge::AutomergeError>(|tx| {
let _todos = tx.put_object(automerge::ROOT, "todos", automerge::ObjType::List)?;
Ok(())
}).unwrap();
// Now create a `samod::DocHandle` on alice using `Repo::create`
let alice_handle = alice.create(initial_doc).await.unwrap();
// Bob registers an acceptor; alice dials bob via an in-process ChannelDialer
let url = url::Url::parse("channel://alice-to-bob").unwrap();
let acceptor = bob.make_acceptor(url).unwrap();
let channel_dialer = ChannelDialer::new(acceptor);
let dialer_handle = alice.dial(BackoffConfig::default(), Arc::new(channel_dialer)).unwrap();
// Wait for alice to be connected to bob
dialer_handle.established().await.unwrap();
// Bob can now fetch alice's document
let bob_handle = bob.find(alice_handle.document_id().clone()).await.unwrap().unwrap();
// Make a change on bob's side
bob_handle.with_document(|doc| {
doc.transact(|tx| {
let todos = tx.get(automerge::ROOT, "todos").unwrap()
.expect("todos should exist").1;
tx.insert(todos, 0, "Buy milk")?;
Ok::<_, automerge::AutomergeError>(())
}).unwrap();
});
// Wait for the change to be received on alice's side
alice_handle.changes().next().await.unwrap();
// Alice's document now reflects bob's change
alice_handle.with_document(|doc| {
let todos = doc.get(automerge::ROOT, "todos").unwrap()
.expect("todos should exist").1;
let item = doc.get(todos, 0).unwrap().expect("item should exist").0;
let automerge::Value::Scalar(val) = item else {
panic!("item should be a scalar");
};
let automerge::ScalarValue::Str(s) = val.as_ref() else {
panic!("item should be a string");
};
assert_eq!(s, "Buy milk");
Ok::<_, automerge::AutomergeError>(())
}).unwrap();
Dependencies
~16–38MB
~532K SLoC