-
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #7 from radiant-labs/codenikel/collaboration
Adds collaboration support
- Loading branch information
Showing
61 changed files
with
1,698 additions
and
1,273 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
[package] | ||
name = "radiantkit-backend" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
anyhow = "1.0.75" | ||
futures-util = "0.3.29" | ||
log = "0.4.20" | ||
env_logger = "0.10" | ||
tokio = { version = "1.34.0", features = ["rt", "rt-multi-thread", "macros"] } | ||
warp = "0.3.6" | ||
yrs = "0.17.1" | ||
y-sync = { version = "0.4.0", features = ["net"] } | ||
yrs-warp = "0.7.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
use futures_util::{FutureExt, StreamExt}; | ||
use warp::Filter; | ||
use tokio::sync::{RwLock, Mutex}; | ||
use yrs_warp::{AwarenessRef, ws::{WarpSink, WarpStream}}; | ||
use y_sync::awareness::Awareness; | ||
use y_sync::net::BroadcastGroup; | ||
use yrs::Doc; | ||
use std::sync::Arc; | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
let env = env_logger::Env::default() | ||
.filter_or("MY_LOG_LEVEL", "info") | ||
.write_style_or("MY_LOG_STYLE", "always"); | ||
|
||
env_logger::init_from_env(env); | ||
|
||
// We're using a single static document shared among all the peers. | ||
let awareness: AwarenessRef = { | ||
let doc = Doc::with_client_id(1); | ||
{ | ||
// pre-initialize code mirror document with some text | ||
let _map = doc.get_or_insert_map("radiantkit-root"); | ||
} | ||
Arc::new(RwLock::new(Awareness::new(doc))) | ||
}; | ||
|
||
let bcast = Arc::new(BroadcastGroup::new(awareness.clone(), 32).await); | ||
|
||
let routes = warp::path("sync") | ||
.and(warp::ws()) | ||
.and(warp::any().map(move || bcast.clone())) | ||
.map(|ws: warp::ws::Ws, bcast: Arc<BroadcastGroup>| { | ||
ws.on_upgrade(move |websocket| { | ||
log::info!("websocket connection opened"); | ||
let (tx, rx) = websocket.split(); | ||
let sink = Arc::new(Mutex::new(WarpSink::from(tx))); | ||
let stream = WarpStream::from(rx); | ||
let sub = bcast.subscribe(sink, stream); | ||
sub.completed().then(|result| async move { | ||
match result { | ||
Ok(_) => println!("broadcasting for channel finished successfully"), | ||
Err(e) => eprintln!("broadcasting for channel finished abruptly: {}", e), | ||
} | ||
}) | ||
}) | ||
}); | ||
|
||
warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
use radiantkit_core::{RadiantDocumentListener, RadiantDocumentNode, RadiantNode}; | ||
use uuid::Uuid; | ||
use y_sync::awareness::Awareness; | ||
use yrs::*; | ||
use std::sync::{Arc, RwLock}; | ||
|
||
#[cfg(target_arch = "wasm32")] | ||
use crate::wasm_connection::WasmConnection; | ||
#[cfg(not(target_arch = "wasm32"))] | ||
use crate::native_connection::NativeConnection; | ||
|
||
pub struct Collaborator { | ||
#[cfg(target_arch = "wasm32")] | ||
connection: Arc<RwLock<WasmConnection>>, | ||
#[cfg(not(target_arch = "wasm32"))] | ||
connection: Arc<RwLock<NativeConnection>>, | ||
} | ||
|
||
impl Collaborator { | ||
pub async fn new(client_id: u64) -> Result<Self, ()> { | ||
let url = "ws://localhost:8000/sync"; | ||
|
||
let doc = Doc::with_client_id(client_id); | ||
let _map = doc.get_or_insert_map("radiantkit-root"); | ||
|
||
let connection; | ||
|
||
#[cfg(target_arch = "wasm32")] | ||
{ | ||
let awareness = Arc::new(RwLock::new(Awareness::new(doc))); | ||
match WasmConnection::new(awareness.clone(), url) { | ||
Ok(conn) => connection = conn, | ||
Err(_) => return Err(()), | ||
} | ||
} | ||
|
||
#[cfg(not(target_arch = "wasm32"))] | ||
{ | ||
use tokio::sync::RwLock; | ||
let awareness = Arc::new(RwLock::new(Awareness::new(doc))); | ||
match NativeConnection::new(awareness.clone(), url).await { | ||
Ok(conn) => connection = conn, | ||
Err(_) => return Err(()), | ||
} | ||
} | ||
|
||
Ok(Self { | ||
connection, | ||
}) | ||
} | ||
} | ||
|
||
impl<N: RadiantNode> RadiantDocumentListener<N> for Collaborator { | ||
fn on_node_added(&mut self, document: &mut RadiantDocumentNode<N>, id: Uuid) { | ||
let Ok(connection) = self.connection.write() else { return }; | ||
let awareness = connection.awareness(); | ||
let Ok(awareness) = awareness.write() else { return }; | ||
|
||
if let Some(node) = document.get_node(id) { | ||
let doc = awareness.doc(); | ||
let root = doc.get_or_insert_map("radiantkit-root"); | ||
|
||
let mut txn = doc.transact_mut(); | ||
root.insert(&mut txn, id.to_string(), serde_json::to_string(node).unwrap()); | ||
|
||
log::info!("count {}", root.len(&txn)); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,5 @@ | ||
use radiantkit_core::{RadiantDocumentListener, RadiantDocumentNode, RadiantNode}; | ||
use yrs::*; | ||
pub mod collaborator; | ||
pub use collaborator::*; | ||
|
||
pub struct Collaborator { | ||
doc: Doc, | ||
} | ||
|
||
impl Collaborator { | ||
pub fn new() -> Self { | ||
Self { | ||
doc: Doc::new(), | ||
} | ||
} | ||
} | ||
|
||
impl<N: RadiantNode> RadiantDocumentListener<N> for Collaborator { | ||
fn on_node_added(&mut self, document: &mut RadiantDocumentNode<N>, id: u64) { | ||
let root = self.doc.get_or_insert_text("radiantkit-root"); | ||
let mut txn = self.doc.transact_mut(); | ||
if let Some(node) = document.get_node(id) { | ||
root.push(&mut txn, &serde_json::to_string(node).unwrap()); | ||
} | ||
} | ||
} | ||
mod wasm_connection; | ||
mod native_connection; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
#![cfg(not(target_arch = "wasm32"))] | ||
|
||
use futures_util::SinkExt; | ||
use futures_util::{StreamExt, stream::SplitSink, Sink, Stream, ready}; | ||
use futures_util::stream::SplitStream; | ||
use pollster::block_on; | ||
use tokio_tungstenite::{tungstenite, WebSocketStream, MaybeTlsStream}; | ||
use tokio::{sync::RwLock, net::TcpStream, sync::RwLockReadGuard}; | ||
use tungstenite::Message; | ||
use y_sync::awareness::Awareness; | ||
use yrs::UpdateSubscription; | ||
use yrs::updates::encoder::Encode; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
use y_sync::sync::Error; | ||
use y_sync::net::Connection; | ||
use tokio::task; | ||
|
||
struct TungsteniteSink(SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>); | ||
|
||
impl Sink<Vec<u8>> for TungsteniteSink { | ||
type Error = Error; | ||
|
||
fn poll_ready( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<(), Self::Error>> { | ||
let sink = unsafe { Pin::new_unchecked(&mut self.0) }; | ||
let result = ready!(sink.poll_ready(cx)); | ||
match result { | ||
Ok(_) => Poll::Ready(Ok(())), | ||
Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))), | ||
} | ||
} | ||
|
||
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> { | ||
let sink = unsafe { Pin::new_unchecked(&mut self.0) }; | ||
let result = sink.start_send(Message::binary(item)); | ||
match result { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::Other(Box::new(e))), | ||
} | ||
} | ||
|
||
fn poll_flush( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<(), Self::Error>> { | ||
let sink = unsafe { Pin::new_unchecked(&mut self.0) }; | ||
let result = ready!(sink.poll_flush(cx)); | ||
match result { | ||
Ok(_) => Poll::Ready(Ok(())), | ||
Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))), | ||
} | ||
} | ||
|
||
fn poll_close( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<(), Self::Error>> { | ||
let sink = unsafe { Pin::new_unchecked(&mut self.0) }; | ||
let result = ready!(sink.poll_close(cx)); | ||
match result { | ||
Ok(_) => Poll::Ready(Ok(())), | ||
Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))), | ||
} | ||
} | ||
} | ||
|
||
struct TungsteniteStream(SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>); | ||
impl Stream for TungsteniteStream { | ||
type Item = Result<Vec<u8>, Error>; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let stream = unsafe { Pin::new_unchecked(&mut self.0) }; | ||
let result = ready!(stream.poll_next(cx)); | ||
match result { | ||
None => Poll::Ready(None), | ||
Some(Ok(msg)) => Poll::Ready(Some(Ok(msg.into_data()))), | ||
Some(Err(e)) => Poll::Ready(Some(Err(Error::Other(Box::new(e))))), | ||
} | ||
} | ||
} | ||
|
||
pub struct NativeConnection { | ||
connection: Option<Connection<TungsteniteSink, TungsteniteStream>>, | ||
_sub: Option<UpdateSubscription>, | ||
} | ||
|
||
impl NativeConnection { | ||
pub async fn new(awareness: Arc<RwLock<Awareness>>, url: &str) -> Result<Arc<std::sync::RwLock<Self>>, ()> { | ||
let Ok((ws_stream, _)) = tokio_tungstenite::connect_async(url).await else { return Err(()) }; | ||
let (sink, stream) = ws_stream.split(); | ||
let connection = Connection::new(awareness.clone(), TungsteniteSink(sink), TungsteniteStream(stream)); | ||
|
||
let sub = { | ||
let sink = connection.sink(); | ||
let a = connection.awareness().write().await; | ||
let doc = a.doc(); | ||
doc.observe_update_v1(move |_txn, e| { | ||
log::info!("sending update"); | ||
let update = e.update.to_owned(); | ||
if let Some(sink) = sink.upgrade() { | ||
task::spawn(async move { | ||
let msg = | ||
y_sync::sync::Message::Sync(y_sync::sync::SyncMessage::Update(update)) | ||
.encode_v1(); | ||
let mut sink = sink.lock().await; | ||
sink.send(msg).await.unwrap(); | ||
}); | ||
} | ||
}) | ||
.unwrap() | ||
}; | ||
|
||
Ok(Arc::new(std::sync::RwLock::new(Self { | ||
connection: Some(connection), | ||
_sub: Some(sub), | ||
}))) | ||
} | ||
|
||
pub fn awareness(&self) -> Arc<std::sync::RwLock<RwLockReadGuard<Awareness>>> { | ||
let awareness = block_on(self.connection.as_ref().unwrap().awareness().read()); | ||
Arc::new(std::sync::RwLock::new(awareness)) | ||
} | ||
} |
Oops, something went wrong.