Skip to content

Commit

Permalink
Fixes lock issues
Browse files Browse the repository at this point in the history
  • Loading branch information
codenikel committed Dec 16, 2023
1 parent 74509fc commit 8aaa3ae
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 141 deletions.
2 changes: 1 addition & 1 deletion apps/whiteboard/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Toolbar from './components/Toolbar';

const App = () => {
return (
<RadiantKitProvider width={undefined} height={undefined}>
<RadiantKitProvider client_id={BigInt(4)} width={undefined} height={undefined}>
<div style={{ display: 'flex' }}>
<div style={{ zIndex: 1 }}>
<Toolbar />
Expand Down
107 changes: 57 additions & 50 deletions crates/collaboration/src/collaborator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use radiantkit_core::{RadiantDocumentListener, RadiantDocumentNode, RadiantNode};
use uuid::Uuid;
use y_sync::awareness::Awareness;
use yrs::*;
use y_sync::awareness::{Awareness, UpdateSubscription as AwarenessUpdateSubscription};
use yrs::{*, types::{map::MapEvent, EntryChange}};
use std::sync::{Arc, RwLock, Weak};
use pollster::block_on;

#[cfg(target_arch = "wasm32")]
use crate::wasm_connection::WasmConnection;
Expand All @@ -15,21 +16,52 @@ pub struct Collaborator<N: RadiantNode> {
connection: Arc<RwLock<WasmConnection>>,
#[cfg(not(target_arch = "wasm32"))]
connection: Arc<RwLock<NativeConnection>>,
_sub: Option<UpdateSubscription>,
_awareness_sub: Option<AwarenessUpdateSubscription>,
_root_sub: Subscription<Arc<dyn Fn(&TransactionMut, &MapEvent)>>,
}

impl<'a, N: 'static + RadiantNode + serde::de::DeserializeOwned> Collaborator<N> {
pub async fn new(client_id: u64, document: Weak<RwLock<RadiantDocumentNode<N>>>) -> Result<Self, ()> {
let url = "ws://localhost:8000/sync";

let doc = Doc::with_client_id(client_id);
let _map = doc.get_or_insert_map("radiantkit-root");
let mut root = doc.get_or_insert_map("radiantkit-root");
let document_clone = document.clone();
let root_sub = root.observe(move |txn, event| {
log::error!("root event received");
let Some(document) = document_clone.upgrade() else { return };
let Ok(mut document) = document.try_write() else { return };
event.keys(txn).iter().for_each(|(key, change)| {
match change {
EntryChange::Inserted(val) => {
let id = Uuid::parse_str(key).unwrap();
let node: String = val.clone().cast().unwrap();
let mut node: N = serde_json::from_str(&node).unwrap();
node.set_needs_tessellation();
if document.get_node(id).is_none() {
document.add_excluding_listener(node);
}
},
EntryChange::Removed(_val) => {

},
EntryChange::Updated(_old, _new) => {

}
}
});
});

let connection;

let mut awareness = Awareness::new(doc);
let awareness_sub = Some(awareness.on_update(|_a, e| {
log::error!("awareness event {:?}", e);
}));

#[cfg(target_arch = "wasm32")]
{
let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
let awareness = Arc::new(RwLock::new(awareness));
match WasmConnection::new(awareness.clone(), url) {
Ok(conn) => connection = conn,
Err(_) => return Err(()),
Expand All @@ -39,65 +71,40 @@ impl<'a, N: 'static + RadiantNode + serde::de::DeserializeOwned> Collaborator<N>
#[cfg(not(target_arch = "wasm32"))]
{
use tokio::sync::RwLock;
let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
let awareness = Arc::new(RwLock::new(awareness));
match NativeConnection::new(awareness.clone(), url).await {
Ok(conn) => connection = conn,
Err(_) => return Err(()),
}
}

let mut sub = None;
if let Ok(connection) = connection.write() {
let awareness = connection.awareness();
let awareness = awareness.write();
if let Ok(awareness) = awareness {
let doc = awareness.doc();
let document_clone = document.clone();
sub = Some({
doc.observe_update_v1(move |txn, _e| {
log::info!("receiving update");
if let Some(root) = txn.get_map("radiantkit-root") {
let Some(document) = document_clone.upgrade() else { return };
let Ok(mut document) = document.try_write() else { return };
root.iter(txn).for_each(|(id, val)| {
let id = Uuid::parse_str(id).unwrap();
let node: String = val.cast().unwrap();
let mut node: N = serde_json::from_str(&node).unwrap();
node.set_needs_tessellation();
if document.get_node(id).is_none() {
document.add_excluding_listener(node);
}
});
}
})
.unwrap()
});
}
}

Ok(Self {
_document: document,
connection,
_sub: sub,
_awareness_sub: awareness_sub,
_root_sub: root_sub,
})
}
}

impl<N: RadiantNode> RadiantDocumentListener<N> for Collaborator<N> {
fn on_node_added(&mut self, document: &mut RadiantDocumentNode<N>, id: Uuid) {
let Ok(connection) = self.connection.write() else { return };
let awareness = connection.awareness();
let Ok(awareness) = awareness.write() else { return };

if let Some(node) = document.get_node(id) {
let doc = awareness.doc();
let root = doc.get_or_insert_map("radiantkit-root");

let mut txn = doc.transact_mut();
root.insert(&mut txn, id.to_string(), serde_json::to_string(node).unwrap());
txn.commit();

log::info!("count {}", root.len(&txn));
}
block_on(async {
let Ok(connection) = self.connection.write() else { return };
let awareness = connection.awareness();
#[cfg(not(target_arch = "wasm32"))]
let awareness = awareness.write().await;
#[cfg(target_arch = "wasm32")]
let Ok(awareness) = awareness.write() else { return };
if let Some(node) = document.get_node(id) {
let doc = awareness.doc();
let Ok(mut txn) = doc.try_transact_mut() else { log::error!("Failed to transact"); return };
if let Some(root) = txn.get_map("radiantkit-root") {
root.insert(&mut txn, id.to_string(), serde_json::to_string(node).unwrap());
}
txn.commit();
log::error!("Added node {:?}", id);
}
});
}
}
17 changes: 9 additions & 8 deletions crates/collaboration/src/native_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
use futures_util::SinkExt;
use futures_util::{StreamExt, stream::SplitSink, Sink, Stream, ready};
use futures_util::stream::SplitStream;
use pollster::block_on;
use tokio_tungstenite::{tungstenite, WebSocketStream, MaybeTlsStream};
use tokio::{sync::RwLock, net::TcpStream, sync::RwLockReadGuard};
use tokio::{sync::RwLock, net::TcpStream};
use tungstenite::Message;
use y_sync::awareness::Awareness;
use yrs::UpdateSubscription;
Expand Down Expand Up @@ -84,7 +83,8 @@ impl Stream for TungsteniteStream {
}

pub struct NativeConnection {
connection: Option<Connection<TungsteniteSink, TungsteniteStream>>,
_connection: Option<Connection<TungsteniteSink, TungsteniteStream>>,
awareness: Arc<RwLock<Awareness>>,
_sub: Option<UpdateSubscription>,
}

Expand All @@ -99,7 +99,7 @@ impl NativeConnection {
let a = connection.awareness().write().await;
let doc = a.doc();
doc.observe_update_v1(move |_txn, e| {
log::info!("sending update");
log::error!("sending update");
let update = e.update.to_owned();
if let Some(sink) = sink.upgrade() {
task::spawn(async move {
Expand All @@ -115,13 +115,14 @@ impl NativeConnection {
};

Ok(Arc::new(std::sync::RwLock::new(Self {
connection: Some(connection),
_connection: Some(connection),
awareness,
_sub: Some(sub),
})))
}

pub fn awareness(&self) -> Arc<std::sync::RwLock<RwLockReadGuard<Awareness>>> {
let awareness = block_on(self.connection.as_ref().unwrap().awareness().read());
Arc::new(std::sync::RwLock::new(awareness))
pub fn awareness(&self) -> Arc<RwLock<Awareness>> {
self.awareness.clone()

}
}
3 changes: 3 additions & 0 deletions crates/collaboration/src/wasm_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ pub struct Connection {
}

impl Connection {
#[allow(dead_code)]
pub async fn send(&self, msg: Vec<u8>) -> Result<(), Error> {
let _ = self.ws.send_with_u8_array(&msg);
Ok(())
}

#[allow(dead_code)]
pub async fn cslose(self) -> Result<(), Error> {
let _ = self.ws.close();
Ok(())
Expand All @@ -34,6 +36,7 @@ impl Connection {
Self::with_protocol(awareness, DefaultProtocol, &ws).map_err(|_| ())
}

#[allow(dead_code)]
pub fn awareness(&self) -> &Arc<RwLock<Awareness>> {
&self.awareness
}
Expand Down
2 changes: 1 addition & 1 deletion examples/egui/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl RadiantKitAppController {

async fn run(client_id: u64) {
let env = env_logger::Env::default()
.filter_or("MY_LOG_LEVEL", "info")
.filter_or("MY_LOG_LEVEL", "error")
.write_style_or("MY_LOG_STYLE", "always");

env_logger::init_from_env(env);
Expand Down
8 changes: 4 additions & 4 deletions runtime/pkg/radiantkit.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ export interface InitOutput {
readonly __wbindgen_malloc: (a: number, b: number) => number;
readonly __wbindgen_realloc: (a: number, b: number, c: number, d: number) => number;
readonly __wbindgen_export_2: WebAssembly.Table;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h1541be4407e5b54a: (a: number, b: number, c: number) => void;
readonly _dyn_core__ops__function__FnMut_____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h831daedbf394e0b8: (a: number, b: number) => void;
readonly wasm_bindgen__convert__closures__invoke1_mut__h9ed5dc6036647b52: (a: number, b: number, c: number) => void;
readonly wasm_bindgen__convert__closures__invoke0_mut__hc594443bfec64e6f: (a: number, b: number) => void;
readonly _dyn_core__ops__function__FnMut_____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h20324fb4a9880f17: (a: number, b: number) => void;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h0077550e1d4ebdc0: (a: number, b: number, c: number) => void;
readonly wasm_bindgen__convert__closures__invoke1_mut__hb42503db276de2ca: (a: number, b: number, c: number) => void;
readonly wasm_bindgen__convert__closures__invoke0_mut__h259ebfa3249daa59: (a: number, b: number) => void;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h5234d6fbfa4fe934: (a: number, b: number, c: number) => void;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__h7f0e0b13653e5123: (a: number, b: number, c: number) => void;
readonly __wbindgen_free: (a: number, b: number, c: number) => void;
Expand Down
Loading

0 comments on commit 8aaa3ae

Please sign in to comment.