Skip to content

Commit

Permalink
Merge pull request #7 from radiant-labs/codenikel/collaboration
Browse files Browse the repository at this point in the history
Adds collaboration support
  • Loading branch information
codenikel authored Dec 15, 2023
2 parents dbb1d48 + 72d2627 commit 5274658
Show file tree
Hide file tree
Showing 61 changed files with 1,698 additions and 1,273 deletions.
446 changes: 417 additions & 29 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"crates/winit",
"crates/collaboration",
"runtime",
"backend",
"examples/basic",
"examples/egui",
"examples/web/src-tauri"
Expand Down
17 changes: 17 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "radiantkit-backend"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.75"
futures-util = "0.3.29"
log = "0.4.20"
env_logger = "0.10"
tokio = { version = "1.34.0", features = ["rt", "rt-multi-thread", "macros"] }
warp = "0.3.6"
yrs = "0.17.1"
y-sync = { version = "0.4.0", features = ["net"] }
yrs-warp = "0.7.0"
50 changes: 50 additions & 0 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use futures_util::{FutureExt, StreamExt};
use warp::Filter;
use tokio::sync::{RwLock, Mutex};
use yrs_warp::{AwarenessRef, ws::{WarpSink, WarpStream}};
use y_sync::awareness::Awareness;
use y_sync::net::BroadcastGroup;
use yrs::Doc;
use std::sync::Arc;

#[tokio::main]
async fn main() {
let env = env_logger::Env::default()
.filter_or("MY_LOG_LEVEL", "info")
.write_style_or("MY_LOG_STYLE", "always");

env_logger::init_from_env(env);

// We're using a single static document shared among all the peers.
let awareness: AwarenessRef = {
let doc = Doc::with_client_id(1);
{
// pre-initialize code mirror document with some text
let _map = doc.get_or_insert_map("radiantkit-root");
}
Arc::new(RwLock::new(Awareness::new(doc)))
};

let bcast = Arc::new(BroadcastGroup::new(awareness.clone(), 32).await);

let routes = warp::path("sync")
.and(warp::ws())
.and(warp::any().map(move || bcast.clone()))
.map(|ws: warp::ws::Ws, bcast: Arc<BroadcastGroup>| {
ws.on_upgrade(move |websocket| {
log::info!("websocket connection opened");
let (tx, rx) = websocket.split();
let sink = Arc::new(Mutex::new(WarpSink::from(tx)));
let stream = WarpStream::from(rx);
let sub = bcast.subscribe(sink, stream);
sub.completed().then(|result| async move {
match result {
Ok(_) => println!("broadcasting for channel finished successfully"),
Err(e) => eprintln!("broadcasting for channel finished abruptly: {}", e),
}
})
})
});

warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}
34 changes: 33 additions & 1 deletion crates/collaboration/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "radiankit-collaboration"
name = "radiantkit-collaboration"
version = "0.0.1"
edition = "2021"

Expand All @@ -9,3 +9,35 @@ edition = "2021"
radiantkit-core = { version = "0.0.1", path = "../core" }
serde_json = "1.0.108"
yrs = "0.17.1"
pollster = "0.3"
futures-util = "0.3.29"
log = "0.4"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.34.0", features = ["full"] }
tokio-tungstenite = "0.20.1"
y-sync = { version = "0.4.0", features = ["net"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
y-sync = "0.4.0"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4.30"
js-sys = "0.3.64"
web-sys = { version = "0.3", features = [
"Window",
"BinaryType",
"Blob",
"ErrorEvent",
"FileReader",
"MessageEvent",
"ProgressEvent",
"WebSocket",
"console"
]}

[dependencies.uuid]
version = "1.6.1"
features = [
"wasm-bindgen",
"serde",
]
69 changes: 69 additions & 0 deletions crates/collaboration/src/collaborator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use radiantkit_core::{RadiantDocumentListener, RadiantDocumentNode, RadiantNode};
use uuid::Uuid;
use y_sync::awareness::Awareness;
use yrs::*;
use std::sync::{Arc, RwLock};

#[cfg(target_arch = "wasm32")]
use crate::wasm_connection::WasmConnection;
#[cfg(not(target_arch = "wasm32"))]
use crate::native_connection::NativeConnection;

pub struct Collaborator {
#[cfg(target_arch = "wasm32")]
connection: Arc<RwLock<WasmConnection>>,
#[cfg(not(target_arch = "wasm32"))]
connection: Arc<RwLock<NativeConnection>>,
}

impl Collaborator {
pub async fn new(client_id: u64) -> Result<Self, ()> {
let url = "ws://localhost:8000/sync";

let doc = Doc::with_client_id(client_id);
let _map = doc.get_or_insert_map("radiantkit-root");

let connection;

#[cfg(target_arch = "wasm32")]
{
let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
match WasmConnection::new(awareness.clone(), url) {
Ok(conn) => connection = conn,
Err(_) => return Err(()),
}
}

#[cfg(not(target_arch = "wasm32"))]
{
use tokio::sync::RwLock;
let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
match NativeConnection::new(awareness.clone(), url).await {
Ok(conn) => connection = conn,
Err(_) => return Err(()),
}
}

Ok(Self {
connection,
})
}
}

impl<N: RadiantNode> RadiantDocumentListener<N> for Collaborator {
fn on_node_added(&mut self, document: &mut RadiantDocumentNode<N>, id: Uuid) {
let Ok(connection) = self.connection.write() else { return };
let awareness = connection.awareness();
let Ok(awareness) = awareness.write() else { return };

if let Some(node) = document.get_node(id) {
let doc = awareness.doc();
let root = doc.get_or_insert_map("radiantkit-root");

let mut txn = doc.transact_mut();
root.insert(&mut txn, id.to_string(), serde_json::to_string(node).unwrap());

log::info!("count {}", root.len(&txn));
}
}
}
27 changes: 4 additions & 23 deletions crates/collaboration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,5 @@
use radiantkit_core::{RadiantDocumentListener, RadiantDocumentNode, RadiantNode};
use yrs::*;
pub mod collaborator;
pub use collaborator::*;

pub struct Collaborator {
doc: Doc,
}

impl Collaborator {
pub fn new() -> Self {
Self {
doc: Doc::new(),
}
}
}

impl<N: RadiantNode> RadiantDocumentListener<N> for Collaborator {
fn on_node_added(&mut self, document: &mut RadiantDocumentNode<N>, id: u64) {
let root = self.doc.get_or_insert_text("radiantkit-root");
let mut txn = self.doc.transact_mut();
if let Some(node) = document.get_node(id) {
root.push(&mut txn, &serde_json::to_string(node).unwrap());
}
}
}
mod wasm_connection;
mod native_connection;
127 changes: 127 additions & 0 deletions crates/collaboration/src/native_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#![cfg(not(target_arch = "wasm32"))]

use futures_util::SinkExt;
use futures_util::{StreamExt, stream::SplitSink, Sink, Stream, ready};
use futures_util::stream::SplitStream;
use pollster::block_on;
use tokio_tungstenite::{tungstenite, WebSocketStream, MaybeTlsStream};
use tokio::{sync::RwLock, net::TcpStream, sync::RwLockReadGuard};
use tungstenite::Message;
use y_sync::awareness::Awareness;
use yrs::UpdateSubscription;
use yrs::updates::encoder::Encode;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use y_sync::sync::Error;
use y_sync::net::Connection;
use tokio::task;

struct TungsteniteSink(SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>);

impl Sink<Vec<u8>> for TungsteniteSink {
type Error = Error;

fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let sink = unsafe { Pin::new_unchecked(&mut self.0) };
let result = ready!(sink.poll_ready(cx));
match result {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
}
}

fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let sink = unsafe { Pin::new_unchecked(&mut self.0) };
let result = sink.start_send(Message::binary(item));
match result {
Ok(_) => Ok(()),
Err(e) => Err(Error::Other(Box::new(e))),
}
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let sink = unsafe { Pin::new_unchecked(&mut self.0) };
let result = ready!(sink.poll_flush(cx));
match result {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
}
}

fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let sink = unsafe { Pin::new_unchecked(&mut self.0) };
let result = ready!(sink.poll_close(cx));
match result {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
}
}
}

struct TungsteniteStream(SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>);
impl Stream for TungsteniteStream {
type Item = Result<Vec<u8>, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let stream = unsafe { Pin::new_unchecked(&mut self.0) };
let result = ready!(stream.poll_next(cx));
match result {
None => Poll::Ready(None),
Some(Ok(msg)) => Poll::Ready(Some(Ok(msg.into_data()))),
Some(Err(e)) => Poll::Ready(Some(Err(Error::Other(Box::new(e))))),
}
}
}

pub struct NativeConnection {
connection: Option<Connection<TungsteniteSink, TungsteniteStream>>,
_sub: Option<UpdateSubscription>,
}

impl NativeConnection {
pub async fn new(awareness: Arc<RwLock<Awareness>>, url: &str) -> Result<Arc<std::sync::RwLock<Self>>, ()> {
let Ok((ws_stream, _)) = tokio_tungstenite::connect_async(url).await else { return Err(()) };
let (sink, stream) = ws_stream.split();
let connection = Connection::new(awareness.clone(), TungsteniteSink(sink), TungsteniteStream(stream));

let sub = {
let sink = connection.sink();
let a = connection.awareness().write().await;
let doc = a.doc();
doc.observe_update_v1(move |_txn, e| {
log::info!("sending update");
let update = e.update.to_owned();
if let Some(sink) = sink.upgrade() {
task::spawn(async move {
let msg =
y_sync::sync::Message::Sync(y_sync::sync::SyncMessage::Update(update))
.encode_v1();
let mut sink = sink.lock().await;
sink.send(msg).await.unwrap();
});
}
})
.unwrap()
};

Ok(Arc::new(std::sync::RwLock::new(Self {
connection: Some(connection),
_sub: Some(sub),
})))
}

pub fn awareness(&self) -> Arc<std::sync::RwLock<RwLockReadGuard<Awareness>>> {
let awareness = block_on(self.connection.as_ref().unwrap().awareness().read());
Arc::new(std::sync::RwLock::new(awareness))
}
}
Loading

0 comments on commit 5274658

Please sign in to comment.