From 2f74d52a3277dd0938d205fd17d1e600f61978e1 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Fri, 21 Mar 2025 13:18:50 +0100 Subject: [PATCH 1/8] doc: Add missing constructed chain up --- aardvark-doc/src/document.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aardvark-doc/src/document.rs b/aardvark-doc/src/document.rs index fd2d76fd..7dde0546 100644 --- a/aardvark-doc/src/document.rs +++ b/aardvark-doc/src/document.rs @@ -102,6 +102,8 @@ mod imp { } fn constructed(&self) { + self.parent_constructed(); + let service = self.service.get().unwrap(); let (network_tx, mut rx) = if let Some(id) = self.id.get() { service.join_document(Hash::from_str(id).expect("Invalid document id")) From 835d4626fddc5ec9407a3ff6a4d01ec139c6f8b8 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Fri, 21 Mar 2025 13:23:31 +0100 Subject: [PATCH 2/8] chore: Add config.rs to gitignore --- .gitignore | 3 ++- aardvark-app/src/config.rs | 5 ----- 2 files changed, 2 insertions(+), 6 deletions(-) delete mode 100644 aardvark-app/src/config.rs diff --git a/.gitignore b/.gitignore index 4f5f6e2d..a9003d09 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target -.flatpak-builder \ No newline at end of file +.flatpak-builder +aardvark-app/src/config.rs diff --git a/aardvark-app/src/config.rs b/aardvark-app/src/config.rs deleted file mode 100644 index d443ba54..00000000 --- a/aardvark-app/src/config.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub static APP_ID: &str = "org.p2panda.aardvark"; -pub static VERSION: &str = "0.1.0"; -pub static GETTEXT_PACKAGE: &str = "aardvark"; -pub static LOCALEDIR: &str = "/app/share/locale"; -pub static PKGDATADIR: &str = "/app/share/aardvark"; \ No newline at end of file From 3cfceb2c51ed097f4e4322c0574ec632192b1bd5 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Sat, 15 Mar 2025 12:37:43 +0100 Subject: [PATCH 3/8] chore: Run cargo fmt --- aardvark-app/src/components/mod.rs | 4 ++-- .../src/components/multiline_entry.rs | 20 +++++++++---------- aardvark-app/src/main.rs | 2 +- aardvark-doc/src/document.rs | 4 ++-- aardvark-node/src/network.rs | 4 ++-- aardvark-node/src/operation.rs | 4 ++-- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/aardvark-app/src/components/mod.rs b/aardvark-app/src/components/mod.rs index 28ded4d2..24b45d70 100644 --- a/aardvark-app/src/components/mod.rs +++ b/aardvark-app/src/components/mod.rs @@ -1,5 +1,5 @@ -mod zoom_level_selector; mod multiline_entry; +mod zoom_level_selector; -pub use self::zoom_level_selector::ZoomLevelSelector; pub use self::multiline_entry::MultilineEntry; +pub use self::zoom_level_selector::ZoomLevelSelector; diff --git a/aardvark-app/src/components/multiline_entry.rs b/aardvark-app/src/components/multiline_entry.rs index 1a8d71d0..51e47d2c 100644 --- a/aardvark-app/src/components/multiline_entry.rs +++ b/aardvark-app/src/components/multiline_entry.rs @@ -1,4 +1,4 @@ -use gtk::{glib, subclass::prelude::*, prelude::*, gdk}; +use gtk::{gdk, glib, prelude::*, subclass::prelude::*}; mod imp { use super::*; @@ -18,20 +18,18 @@ mod imp { } impl ObjectImpl for MultilineEntry { - fn constructed (&self) { + fn constructed(&self) { let key_events = gtk::EventControllerKey::new(); key_events.connect_key_pressed(|controller, key, _, modifier| { - if modifier.is_empty() && (key == gdk::Key::Return || key == gdk::Key::KP_Enter) - { - if let Some(widget) = controller.widget() { - widget.activate_default(); - } - glib::Propagation::Stop - } else { - glib::Propagation::Proceed + if modifier.is_empty() && (key == gdk::Key::Return || key == gdk::Key::KP_Enter) { + if let Some(widget) = controller.widget() { + widget.activate_default(); } + glib::Propagation::Stop + } else { + glib::Propagation::Proceed } - ); + }); self.obj().add_controller(key_events); } } diff --git a/aardvark-app/src/main.rs b/aardvark-app/src/main.rs index 5e57f524..aa005edc 100644 --- a/aardvark-app/src/main.rs +++ b/aardvark-app/src/main.rs @@ -29,8 +29,8 @@ use std::path::PathBuf; use gettextrs::{bind_textdomain_codeset, bindtextdomain, textdomain}; use gtk::prelude::*; use gtk::{gio, glib}; -use tracing_subscriber::prelude::*; use tracing_subscriber::EnvFilter; +use tracing_subscriber::prelude::*; use self::application::AardvarkApplication; use self::config::{GETTEXT_PACKAGE, LOCALEDIR, PKGDATADIR}; diff --git a/aardvark-doc/src/document.rs b/aardvark-doc/src/document.rs index 7dde0546..ca41bde3 100644 --- a/aardvark-doc/src/document.rs +++ b/aardvark-doc/src/document.rs @@ -5,9 +5,9 @@ use std::sync::OnceLock; use aardvark_node::NodeCommand; use anyhow::Result; use glib::prelude::*; -use glib::subclass::prelude::*; use glib::subclass::Signal; -use glib::{clone, Properties}; +use glib::subclass::prelude::*; +use glib::{Properties, clone}; use p2panda_core::Hash; use crate::crdt::{TextCrdt, TextCrdtEvent, TextDelta}; diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index aa681a68..7618346f 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -6,12 +6,12 @@ use p2panda_net::{FromNetwork, NetworkBuilder, SyncConfiguration, ToNetwork}; use p2panda_stream::{DecodeExt, IngestExt}; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; +use tokio_stream::wrappers::ReceiverStream; use tracing::error; use crate::document::Document; -use crate::operation::{decode_gossip_message, encode_gossip_operation, AardvarkExtensions}; +use crate::operation::{AardvarkExtensions, decode_gossip_message, encode_gossip_operation}; use crate::store::OperationStore; #[derive(Clone, Debug)] diff --git a/aardvark-node/src/operation.rs b/aardvark-node/src/operation.rs index 1d125057..d388d537 100644 --- a/aardvark-node/src/operation.rs +++ b/aardvark-node/src/operation.rs @@ -1,11 +1,11 @@ use std::hash::Hash as StdHash; use std::time::SystemTime; -use anyhow::{bail, Result}; +use anyhow::{Result, bail}; use p2panda_core::cbor::{decode_cbor, encode_cbor}; use p2panda_core::{Body, Extension, Extensions, Header, Operation, PrivateKey, PruneFlag}; use p2panda_store::LogStore; -use p2panda_stream::operation::{ingest_operation, IngestResult}; +use p2panda_stream::operation::{IngestResult, ingest_operation}; use serde::{Deserialize, Serialize}; use crate::document::Document; From ed7d96053197d1951cf900b462c35ce7a8149705 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Tue, 18 Mar 2025 12:18:02 +0100 Subject: [PATCH 4/8] doc: Use boxed DocumentId instead of a hex string for the document id This also renames the `aardvark-node::Document` to `aardvark-node::DocumentId`, since we now have the `SubscribableDocument` trait it should return a `DocumentId` not a `Document` that is actually only an id. --- aardvark-app/src/application.rs | 9 ++++--- aardvark-app/src/window.rs | 42 ++++++++++++++++++++++----------- aardvark-doc/src/document.rs | 35 +++++++++++++++++++++------ aardvark-doc/src/service.rs | 12 ++++++---- aardvark-node/src/document.rs | 31 +++++++++++++++++------- aardvark-node/src/lib.rs | 2 +- aardvark-node/src/network.rs | 13 +++++----- aardvark-node/src/node.rs | 31 ++++++++++++------------ aardvark-node/src/operation.rs | 18 +++++++------- aardvark-node/src/store.rs | 14 +++++------ 10 files changed, 130 insertions(+), 77 deletions(-) diff --git a/aardvark-app/src/application.rs b/aardvark-app/src/application.rs index c4855ca7..57e24394 100644 --- a/aardvark-app/src/application.rs +++ b/aardvark-app/src/application.rs @@ -18,7 +18,7 @@ * SPDX-License-Identifier: GPL-3.0-or-later */ -use aardvark_doc::service::Service; +use aardvark_doc::{document::DocumentId, service::Service}; use adw::prelude::*; use adw::subclass::prelude::*; use gettextrs::gettext; @@ -89,11 +89,14 @@ impl AardvarkApplication { .build() } - pub fn window_for_document_id(&self, document_id: &str) -> Option { + pub fn window_for_document_id( + &self, + document_id: &DocumentId, + ) -> Option { self.windows() .into_iter() .filter_map(|window| window.downcast::().ok()) - .find(|window| window.document().id() == document_id) + .find(|window| &window.document().id() == document_id) } fn setup_gactions(&self) { diff --git a/aardvark-app/src/window.rs b/aardvark-app/src/window.rs index d6f68b5b..cc87b3c4 100644 --- a/aardvark-app/src/window.rs +++ b/aardvark-app/src/window.rs @@ -19,8 +19,12 @@ */ use std::cell::{Cell, OnceCell, RefCell}; +use std::str::FromStr; -use aardvark_doc::{document::Document, service::Service}; +use aardvark_doc::{ + document::{Document, DocumentId}, + service::Service, +}; use adw::prelude::AdwDialogExt; use adw::subclass::prelude::*; use gettextrs::gettext; @@ -215,15 +219,18 @@ mod imp { self, move |_| { let open_document_buffer = this.open_document_entry.buffer(); - let document_id: String = open_document_buffer - .text( - &open_document_buffer.start_iter(), - &open_document_buffer.end_iter(), - false, - ) - .chars() - .filter(|c| c.is_digit(16)) - .collect(); + let document_id = DocumentId::from_str( + &open_document_buffer + .text( + &open_document_buffer.start_iter(), + &open_document_buffer.end_iter(), + false, + ) + .chars() + .filter(|c| c.is_digit(16)) + .collect::(), + ) + .expect("valid document id"); let app = this .obj() @@ -253,15 +260,21 @@ mod imp { .filter(|c| c.is_digit(16)) .collect(); - this.open_document_button.set_sensitive(input.len() == 64); + let document_id = if input.len() == 64 { + DocumentId::from_str(&input).ok() + } else { + None + }; + this.open_document_button + .set_sensitive(document_id.is_some()); - let existing = if input.len() == 64 { + let existing = if let Some(document_id) = document_id { let app = this .obj() .application() .and_then(|app| app.downcast::().ok()) .expect("Application needs to be a AardvarkApplication"); - app.window_for_document_id(&input) + app.window_for_document_id(&document_id) } else { None }; @@ -359,8 +372,9 @@ mod imp { self.obj().notify("document"); } - fn format_document_id(document_id: &str) -> String { + fn format_document_id(document_id: &DocumentId) -> String { document_id + .to_string() .chars() .enumerate() .flat_map(|(i, c)| { diff --git a/aardvark-doc/src/document.rs b/aardvark-doc/src/document.rs index ca41bde3..0b98105d 100644 --- a/aardvark-doc/src/document.rs +++ b/aardvark-doc/src/document.rs @@ -1,18 +1,39 @@ use std::cell::{Cell, OnceCell, RefCell}; +use std::fmt; use std::str::FromStr; use std::sync::OnceLock; use aardvark_node::NodeCommand; + +use aardvark_node::document::DocumentId as DocumentIdNode; use anyhow::Result; use glib::prelude::*; use glib::subclass::Signal; use glib::subclass::prelude::*; use glib::{Properties, clone}; -use p2panda_core::Hash; +use p2panda_core::HashError; use crate::crdt::{TextCrdt, TextCrdtEvent, TextDelta}; use crate::service::Service; +#[derive(Clone, Debug, PartialEq, Eq, glib::Boxed)] +#[boxed_type(name = "AardvarkDocumentId", nullable)] +pub struct DocumentId(pub DocumentIdNode); + +impl FromStr for DocumentId { + type Err = HashError; + + fn from_str(value: &str) -> Result { + Ok(DocumentId(DocumentIdNode::from_str(value)?)) + } +} + +impl fmt::Display for DocumentId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + mod imp { use std::rc::Rc; @@ -24,7 +45,7 @@ mod imp { #[property(name = "text", get = Self::text, type = String)] crdt_doc: Rc>>, #[property(get, construct_only, set = Self::set_id)] - id: OnceCell, + id: OnceCell, #[property(get, set)] ready: Cell, #[property(get, construct_only)] @@ -46,7 +67,7 @@ mod imp { .to_string() } - pub fn set_id(&self, id: Option) { + fn set_id(&self, id: Option) { if let Some(id) = id { self.id.set(id).expect("Document id can only be set once"); } @@ -105,11 +126,11 @@ mod imp { self.parent_constructed(); let service = self.service.get().unwrap(); - let (network_tx, mut rx) = if let Some(id) = self.id.get() { - service.join_document(Hash::from_str(id).expect("Invalid document id")) + let (network_tx, mut rx) = if let Some(document_id) = self.id.get() { + service.join_document(document_id) } else { let (document_id, network_tx, rx) = service.create_document(); - self.set_id(Some(document_id.to_hex())); + self.set_id(Some(document_id)); (network_tx, rx) }; @@ -200,7 +221,7 @@ glib::wrapper! { pub struct Document(ObjectSubclass); } impl Document { - pub fn new(service: &Service, id: Option<&str>) -> Self { + pub fn new(service: &Service, id: Option<&DocumentId>) -> Self { glib::Object::builder() .property("service", service) .property("id", id) diff --git a/aardvark-doc/src/service.rs b/aardvark-doc/src/service.rs index e771fb27..a10ce136 100644 --- a/aardvark-doc/src/service.rs +++ b/aardvark-doc/src/service.rs @@ -4,6 +4,8 @@ use tracing::info; use aardvark_node::{Node, NodeReceiver, NodeSender}; +use crate::document::DocumentId; + mod imp { use super::*; @@ -43,20 +45,20 @@ impl Service { self.imp().node.shutdown(); } - pub(crate) fn create_document(&self) -> (Hash, NodeSender, NodeReceiver) { + pub(crate) fn create_document(&self) -> (DocumentId, NodeSender, NodeReceiver) { let (document_id, tx, rx) = self .imp() .node .create_document() .expect("to create document"); - info!("created new document: {}", document_id.to_hex()); - (document_id, tx, rx) + info!("created new document: {}", document_id); + (DocumentId(document_id), tx, rx) } - pub(crate) fn join_document(&self, document_id: Hash) -> (NodeSender, NodeReceiver) { + pub(crate) fn join_document(&self, document_id: &DocumentId) -> (NodeSender, NodeReceiver) { self.imp() .node - .join_document(document_id) + .join_document(document_id.0) .expect("to join document") } diff --git a/aardvark-node/src/document.rs b/aardvark-node/src/document.rs index 80af9ef6..2bd36b26 100644 --- a/aardvark-node/src/document.rs +++ b/aardvark-node/src/document.rs @@ -1,36 +1,51 @@ use std::fmt; use std::hash::Hash as StdHash; +use std::str::FromStr; -use p2panda_core::Hash; +use p2panda_core::{Hash, HashError}; use p2panda_net::TopicId; use p2panda_sync::TopicQuery; use serde::{Deserialize, Serialize}; #[derive(Copy, Clone, Debug, PartialEq, Eq, StdHash, Serialize, Deserialize)] -pub struct Document(Hash); +pub struct DocumentId(Hash); -impl TopicQuery for Document {} +impl TopicQuery for DocumentId {} -impl TopicId for Document { +impl TopicId for DocumentId { fn id(&self) -> [u8; 32] { *self.0.as_bytes() } } -impl From for Document { +impl From for DocumentId { fn from(document_id: Hash) -> Self { Self(document_id) } } -impl From<&Document> for Hash { - fn from(value: &Document) -> Self { +impl From for Hash { + fn from(document: DocumentId) -> Self { + document.0 + } +} + +impl From<&DocumentId> for Hash { + fn from(value: &DocumentId) -> Self { value.0 } } -impl fmt::Display for Document { +impl fmt::Display for DocumentId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } } + +impl FromStr for DocumentId { + type Err = HashError; + + fn from_str(value: &str) -> Result { + Ok(Hash::from_str(value)?.into()) + } +} diff --git a/aardvark-node/src/lib.rs b/aardvark-node/src/lib.rs index dfa72c5f..961b4231 100644 --- a/aardvark-node/src/lib.rs +++ b/aardvark-node/src/lib.rs @@ -1,4 +1,4 @@ -mod document; +pub mod document; mod network; mod node; mod operation; diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index 7618346f..a2c745d5 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -1,3 +1,6 @@ +use crate::document::DocumentId; +use crate::operation::{AardvarkExtensions, decode_gossip_message, encode_gossip_operation}; +use crate::store::OperationStore; use anyhow::Result; use p2panda_core::{Hash, Operation, PrivateKey}; use p2panda_discovery::mdns::LocalDiscovery; @@ -10,21 +13,17 @@ use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use tracing::error; -use crate::document::Document; -use crate::operation::{AardvarkExtensions, decode_gossip_message, encode_gossip_operation}; -use crate::store::OperationStore; - #[derive(Clone, Debug)] pub struct Network { operation_store: OperationStore, - network: p2panda_net::Network, + network: p2panda_net::Network, } impl Network { pub async fn spawn( network_id: Hash, private_key: PrivateKey, - sync_config: SyncConfiguration, + sync_config: SyncConfiguration, operation_store: OperationStore, ) -> Result { let network = NetworkBuilder::new(network_id.into()) @@ -54,7 +53,7 @@ impl Network { pub async fn subscribe( &self, - document: Document, + document: DocumentId, ) -> Result<( mpsc::Sender>, mpsc::Receiver>, diff --git a/aardvark-node/src/node.rs b/aardvark-node/src/node.rs index eede64d3..273fd221 100644 --- a/aardvark-node/src/node.rs +++ b/aardvark-node/src/node.rs @@ -10,7 +10,7 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; use tracing::warn; -use crate::document::Document; +use crate::document::DocumentId; use crate::network::Network; use crate::operation::{LogType, create_operation, validate_operation}; use crate::store::{DocumentStore, OperationStore}; @@ -88,7 +88,7 @@ impl Node { self.inner.document_store.clone(), self.inner.operation_store.clone(), ); - SyncConfiguration::::new(sync) + SyncConfiguration::::new(sync) }; let operation_store = self.inner.operation_store.clone(); @@ -118,7 +118,7 @@ impl Node { }); } - pub fn create_document(&self) -> Result<(Hash, NodeSender, NodeReceiver)> { + pub fn create_document(&self) -> Result<(DocumentId, NodeSender, NodeReceiver)> { let private_key = self.inner.private_key.get().expect("private key"); let mut operation_store = self.inner.operation_store.clone(); @@ -134,32 +134,31 @@ impl Node { .await })?; - let document: Document = operation + let document_id: DocumentId = operation .header .extension() .expect("document id from our own logs"); - let document_id = (&document).into(); // Add ourselves as an author to the document store. self.inner.runtime.block_on(async { self.inner .document_store - .add_author(document, private_key.public_key()) + .add_author(document_id, private_key.public_key()) .await })?; - let (tx, rx) = self.subscribe(document)?; + let (tx, rx) = self.subscribe(document_id)?; Ok((document_id, tx, rx)) } - pub fn join_document(&self, document_id: Hash) -> Result<(NodeSender, NodeReceiver)> { + pub fn join_document(&self, document_id: DocumentId) -> Result<(NodeSender, NodeReceiver)> { let document = document_id.into(); let (tx, rx) = self.subscribe(document)?; Ok((tx, rx)) } - fn subscribe(&self, document: Document) -> Result<(NodeSender, NodeReceiver)> { + fn subscribe(&self, document_id: DocumentId) -> Result<(NodeSender, NodeReceiver)> { let (to_network, mut from_app) = mpsc::channel::(512); let (to_app, from_network) = mpsc::channel(512); @@ -169,7 +168,7 @@ impl Node { self.inner.runtime.block_on(async { self.inner .document_store - .add_author(document, private_key.public_key()) + .add_author(document_id, private_key.public_key()) .await })?; @@ -184,7 +183,7 @@ impl Node { }) .await; - let (document_tx, mut document_rx) = network.subscribe(document).await?; + let (document_tx, mut document_rx) = network.subscribe(document_id).await?; // Process the operations and forward application messages to app layer. This is where // we "materialize" our application state from incoming "application events". @@ -192,7 +191,7 @@ impl Node { let _result: JoinHandle> = tokio::task::spawn(async move { while let Some(operation) = document_rx.recv().await { // Validation for our custom "document" extension. - if let Err(err) = validate_operation(&operation, &document) { + if let Err(err) = validate_operation(&operation, &document_id) { warn!( public_key = %operation.header.public_key, seq_num = %operation.header.seq_num, @@ -203,7 +202,7 @@ impl Node { // When we discover a new author we need to add them to our document store. document_store - .add_author(document, operation.header.public_key) + .add_author(document_id, operation.header.public_key) .await?; // Forward the payload up to the app. @@ -227,7 +226,7 @@ impl Node { &mut operation_store, &private_key, LogType::Delta, - Some(document), + Some(document_id), Some(&bytes), false, ) @@ -249,7 +248,7 @@ impl Node { &mut operation_store, &private_key, LogType::Snapshot, - Some(document), + Some(document_id), Some(&snapshot_bytes), true, ) @@ -266,7 +265,7 @@ impl Node { &mut operation_store, &private_key, LogType::Delta, - Some(document), + Some(document_id), Some(&delta_bytes), true, ) diff --git a/aardvark-node/src/operation.rs b/aardvark-node/src/operation.rs index d388d537..aac29a98 100644 --- a/aardvark-node/src/operation.rs +++ b/aardvark-node/src/operation.rs @@ -8,7 +8,7 @@ use p2panda_store::LogStore; use p2panda_stream::operation::{IngestResult, ingest_operation}; use serde::{Deserialize, Serialize}; -use crate::document::Document; +use crate::document::DocumentId; use crate::store::{LogId, OperationStore}; /// Custom extensions for p2panda header. @@ -42,7 +42,7 @@ pub struct AardvarkExtensions { /// Can be `None` if this operation indicates that we are creating a new document. In this case /// we take the hash of the header itself to derive the document id. #[serde(rename = "d")] - pub document: Option, + pub document: Option, } #[derive(Copy, Clone, Default, Debug, PartialEq, Eq, StdHash, Serialize, Deserialize)] @@ -70,8 +70,8 @@ impl Extension for AardvarkExtensions { } } -impl Extension for AardvarkExtensions { - fn extract(header: &Header) -> Option { +impl Extension for AardvarkExtensions { + fn extract(header: &Header) -> Option { // Check if header mentions an document. let document = header .extensions @@ -102,7 +102,7 @@ impl Extension for AardvarkExtensions { impl Extension for AardvarkExtensions { fn extract(header: &Header) -> Option { let log_type: Option = header.extension(); - let document: Option = header.extension(); + let document: Option = header.extension(); if let (Some(log_type), Some(document)) = (log_type, document) { Some(LogId::new(log_type, &document)) @@ -120,7 +120,7 @@ pub async fn create_operation( store: &mut OperationStore, private_key: &PrivateKey, log_type: LogType, - document: Option, + document: Option, body: Option<&[u8]>, prune_flag: bool, ) -> Result> { @@ -165,7 +165,7 @@ pub async fn create_operation( }; header.sign(private_key); - let document: Document = header.extension().expect("document id from our own logs"); + let document: DocumentId = header.extension().expect("document id from our own logs"); let log_id = LogId::new(log_type, &document); let result = ingest_operation( @@ -188,9 +188,9 @@ pub async fn create_operation( /// Custom validation for our own operation headers. pub fn validate_operation( operation: &Operation, - expected_document: &Document, + expected_document: &DocumentId, ) -> Result<()> { - let given_document: Option = operation.header.extension(); + let given_document: Option = operation.header.extension(); match given_document { Some(given_document) => { if &given_document != expected_document { diff --git a/aardvark-node/src/store.rs b/aardvark-node/src/store.rs index 1661d906..f784ad31 100644 --- a/aardvark-node/src/store.rs +++ b/aardvark-node/src/store.rs @@ -10,7 +10,7 @@ use p2panda_sync::log_sync::TopicLogMap; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; -use crate::document::Document; +use crate::document::DocumentId; use crate::operation::{AardvarkExtensions, LogType}; #[derive(Clone, Debug)] @@ -20,7 +20,7 @@ pub struct DocumentStore { #[derive(Debug)] struct DocumentStoreInner { - authors: HashMap>, + authors: HashMap>, } impl DocumentStore { @@ -32,7 +32,7 @@ impl DocumentStore { } } - pub async fn add_author(&self, document: Document, public_key: PublicKey) -> Result<()> { + pub async fn add_author(&self, document: DocumentId, public_key: PublicKey) -> Result<()> { let mut store = self.inner.write().await; store .authors @@ -46,17 +46,17 @@ impl DocumentStore { } #[derive(Clone, Debug, PartialEq, Eq, StdHash, Serialize, Deserialize)] -pub struct LogId(LogType, Document); +pub struct LogId(LogType, DocumentId); impl LogId { - pub fn new(log_type: LogType, document: &Document) -> Self { + pub fn new(log_type: LogType, document: &DocumentId) -> Self { Self(log_type, *document) } } #[async_trait] -impl TopicLogMap for DocumentStore { - async fn get(&self, topic: &Document) -> Option>> { +impl TopicLogMap for DocumentStore { + async fn get(&self, topic: &DocumentId) -> Option>> { let store = &self.inner.read().await; let mut result = HashMap::>::new(); From 9a3a6d424145f1af30ff08550e42c0372fab880b Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Wed, 19 Mar 2025 17:47:14 +0100 Subject: [PATCH 5/8] Use trait instead of channel for API between node and doc crate --- aardvark-doc/src/crdt.rs | 1 + aardvark-doc/src/document.rs | 147 +++++++++------- aardvark-doc/src/service.rs | 21 +-- aardvark-node/src/document.rs | 6 +- aardvark-node/src/lib.rs | 3 +- aardvark-node/src/node.rs | 307 ++++++++++++++++------------------ aardvark-node/src/store.rs | 22 ++- 7 files changed, 270 insertions(+), 237 deletions(-) diff --git a/aardvark-doc/src/crdt.rs b/aardvark-doc/src/crdt.rs index 1322cf0f..3bf19410 100644 --- a/aardvark-doc/src/crdt.rs +++ b/aardvark-doc/src/crdt.rs @@ -20,6 +20,7 @@ pub type EventReceiver = async_channel::Receiver; /// Internally this uses a text CRDT implementation by [Loro](https://www.loro.dev/). This /// interface serves merely as a wrapper to bring Loro and it's data into the shape we need, /// without worrying too much about the internal details of Loro. +#[derive(Debug)] pub struct TextCrdt { doc: RefCell, event_rx: EventReceiver, diff --git a/aardvark-doc/src/document.rs b/aardvark-doc/src/document.rs index 0b98105d..3e495cb4 100644 --- a/aardvark-doc/src/document.rs +++ b/aardvark-doc/src/document.rs @@ -1,24 +1,22 @@ -use std::cell::{Cell, OnceCell, RefCell}; +use std::cell::{Cell, OnceCell}; use std::fmt; use std::str::FromStr; use std::sync::OnceLock; -use aardvark_node::NodeCommand; - -use aardvark_node::document::DocumentId as DocumentIdNode; +use aardvark_node::document::{DocumentId as DocumentIdNode, SubscribableDocument}; use anyhow::Result; use glib::prelude::*; -use glib::subclass::Signal; -use glib::subclass::prelude::*; +use glib::subclass::{Signal, prelude::*}; use glib::{Properties, clone}; -use p2panda_core::HashError; +use p2panda_core::{HashError, PublicKey}; +use tracing::error; use crate::crdt::{TextCrdt, TextCrdtEvent, TextDelta}; use crate::service::Service; #[derive(Clone, Debug, PartialEq, Eq, glib::Boxed)] #[boxed_type(name = "AardvarkDocumentId", nullable)] -pub struct DocumentId(pub DocumentIdNode); +pub struct DocumentId(DocumentIdNode); impl FromStr for DocumentId { type Err = HashError; @@ -35,15 +33,13 @@ impl fmt::Display for DocumentId { } mod imp { - use std::rc::Rc; - use super::*; #[derive(Properties, Default)] #[properties(wrapper_type = super::Document)] pub struct Document { #[property(name = "text", get = Self::text, type = String)] - crdt_doc: Rc>>, + crdt_doc: OnceCell, #[property(get, construct_only, set = Self::set_id)] id: OnceCell, #[property(get, set)] @@ -60,11 +56,7 @@ mod imp { impl Document { pub fn text(&self) -> String { - self.crdt_doc - .borrow() - .as_ref() - .expect("crdt_doc to be set") - .to_string() + self.crdt_doc.get().expect("crdt_doc to be set").to_string() } fn set_id(&self, id: Option) { @@ -74,8 +66,8 @@ mod imp { } pub fn splice_text(&self, index: i32, delete_len: i32, chunk: &str) -> Result<()> { - let mut doc_borrow = self.crdt_doc.borrow_mut(); - let doc = doc_borrow.as_mut().expect("crdt_doc to be set"); + let doc = self.crdt_doc.get().expect("crdt_doc to be set"); + if delete_len == 0 { doc.insert(index as usize, chunk) .expect("update document after text insertion"); @@ -87,22 +79,48 @@ mod imp { Ok(()) } - fn on_remote_message(&self, bytes: Vec) { - let mut doc_borrow = self.crdt_doc.borrow_mut(); - let doc = doc_borrow.as_mut().expect("crdt_doc to be set"); + pub fn on_remote_message(&self, bytes: &[u8]) { + let doc = self.crdt_doc.get().expect("crdt_doc to be set"); + if let Err(err) = doc.apply_encoded_delta(&bytes) { eprintln!("received invalid message: {}", err); } } - fn emit_text_inserted(&self, pos: i32, text: &str) { - self.obj() - .emit_by_name::<()>("text-inserted", &[&pos, &text]); + fn emit_text_inserted(&self, pos: i32, text: String) { + // Emit the signal on the main thread + let obj = self.obj(); + glib::source::idle_add_full( + glib::source::Priority::DEFAULT, + clone!( + #[weak] + obj, + #[upgrade_or] + glib::ControlFlow::Break, + move || { + obj.emit_by_name::<()>("text-inserted", &[&pos, &text]); + glib::ControlFlow::Break + } + ), + ); } fn emit_range_deleted(&self, start: i32, end: i32) { - self.obj() - .emit_by_name::<()>("range-deleted", &[&start, &end]); + // Emit the signal on the main thread + let obj = self.obj(); + glib::source::idle_add_full( + glib::source::Priority::DEFAULT, + clone!( + #[weak] + obj, + #[upgrade_or] + glib::ControlFlow::Break, + move || { + obj.emit_by_name::<()>("range-deleted", &[&start, &end]); + glib::ControlFlow::Break + } + ), + ); } } @@ -125,16 +143,17 @@ mod imp { fn constructed(&self) { self.parent_constructed(); - let service = self.service.get().unwrap(); - let (network_tx, mut rx) = if let Some(document_id) = self.id.get() { - service.join_document(document_id) - } else { - let (document_id, network_tx, rx) = service.create_document(); - self.set_id(Some(document_id)); - (network_tx, rx) - }; + if self.id.get().is_none() { + let document_id = self + .obj() + .service() + .node() + .create_document() + .expect("Create document"); + self.set_id(Some(DocumentId(document_id))); + } - let public_key = service.public_key(); + let public_key = self.obj().service().public_key(); let crdt_doc = TextCrdt::new({ // Take first 8 bytes of public key (32 bytes) to determine a unique "peer id" // which is used to keep authors apart inside the text crdt. @@ -149,24 +168,25 @@ mod imp { }); let crdt_doc_rx = crdt_doc.subscribe(); + self.crdt_doc.set(crdt_doc).expect("crdt_doc not to be set"); - self.crdt_doc.replace(Some(crdt_doc)); - - glib::spawn_future_local(clone!( - #[weak(rename_to = this)] - self, + let obj = self.obj(); + glib::spawn_future(clone!( + #[weak] + obj, async move { - while let Some(bytes) = rx.recv().await { - this.on_remote_message(bytes); + let document_id = obj.id().0; + let handle = DocumentHandle(obj.downgrade()); + if let Err(error) = obj.service().node().subscribe(document_id, handle).await { + error!("Failed to subscribe to document: {}", error); } } )); - let crdt_doc = self.crdt_doc.clone(); - - glib::spawn_future_local(clone!( - #[weak(rename_to = this)] - self, + let obj = self.obj(); + glib::spawn_future(clone!( + #[weak] + obj, async move { while let Ok(event) = crdt_doc_rx.recv().await { match event { @@ -176,17 +196,17 @@ mod imp { // TODO(adz): We should consider persisting the snapshot every x // times or x seconds, not sure yet what logic makes the most // sense. - let snapshot_bytes = crdt_doc - .borrow() - .as_ref() + let snapshot_bytes = obj + .imp() + .crdt_doc + .get() .expect("crdt_doc to be set") .snapshot(); - if network_tx - .send(NodeCommand::DeltaWithSnapshot { - snapshot_bytes, - delta_bytes, - }) + if obj + .service() + .node() + .delta_with_snapshot(obj.id().0, delta_bytes, snapshot_bytes) .await .is_err() { @@ -198,10 +218,10 @@ mod imp { for delta in text_deltas { match delta { TextDelta::Insert { index, chunk } => { - this.emit_text_inserted(index as i32, &chunk); + obj.imp().emit_text_inserted(index as i32, chunk); } TextDelta::Remove { index, len } => { - this.emit_range_deleted( + obj.imp().emit_range_deleted( index as i32, (index + len) as i32, ); @@ -236,3 +256,16 @@ impl Document { self.imp().splice_text(index, end - index, "") } } + +unsafe impl Send for Document {} +unsafe impl Sync for Document {} + +struct DocumentHandle(glib::WeakRef); + +impl SubscribableDocument for DocumentHandle { + fn bytes_received(&self, _author: PublicKey, data: &[u8]) { + if let Some(document) = self.0.upgrade() { + document.imp().on_remote_message(data); + } + } +} diff --git a/aardvark-doc/src/service.rs b/aardvark-doc/src/service.rs index a10ce136..300a853b 100644 --- a/aardvark-doc/src/service.rs +++ b/aardvark-doc/src/service.rs @@ -2,9 +2,7 @@ use glib::subclass::prelude::*; use p2panda_core::{Hash, PrivateKey, PublicKey}; use tracing::info; -use aardvark_node::{Node, NodeReceiver, NodeSender}; - -use crate::document::DocumentId; +use aardvark_node::Node; mod imp { use super::*; @@ -45,21 +43,8 @@ impl Service { self.imp().node.shutdown(); } - pub(crate) fn create_document(&self) -> (DocumentId, NodeSender, NodeReceiver) { - let (document_id, tx, rx) = self - .imp() - .node - .create_document() - .expect("to create document"); - info!("created new document: {}", document_id); - (DocumentId(document_id), tx, rx) - } - - pub(crate) fn join_document(&self, document_id: &DocumentId) -> (NodeSender, NodeReceiver) { - self.imp() - .node - .join_document(document_id.0) - .expect("to join document") + pub(crate) fn node(&self) -> &Node { + &self.imp().node } pub(crate) fn public_key(&self) -> PublicKey { diff --git a/aardvark-node/src/document.rs b/aardvark-node/src/document.rs index 2bd36b26..88870a9b 100644 --- a/aardvark-node/src/document.rs +++ b/aardvark-node/src/document.rs @@ -2,7 +2,7 @@ use std::fmt; use std::hash::Hash as StdHash; use std::str::FromStr; -use p2panda_core::{Hash, HashError}; +use p2panda_core::{Hash, HashError, PublicKey}; use p2panda_net::TopicId; use p2panda_sync::TopicQuery; use serde::{Deserialize, Serialize}; @@ -49,3 +49,7 @@ impl FromStr for DocumentId { Ok(Hash::from_str(value)?.into()) } } + +pub trait SubscribableDocument: Sync + Send { + fn bytes_received(&self, author: PublicKey, data: &[u8]); +} diff --git a/aardvark-node/src/lib.rs b/aardvark-node/src/lib.rs index 961b4231..aa2d7dfb 100644 --- a/aardvark-node/src/lib.rs +++ b/aardvark-node/src/lib.rs @@ -4,4 +4,5 @@ mod node; mod operation; mod store; -pub use node::{Node, NodeCommand, NodeReceiver, NodeSender}; +pub use document::SubscribableDocument; +pub use node::Node; diff --git a/aardvark-node/src/node.rs b/aardvark-node/src/node.rs index 273fd221..440b4d34 100644 --- a/aardvark-node/src/node.rs +++ b/aardvark-node/src/node.rs @@ -6,40 +6,14 @@ use p2panda_net::SyncConfiguration; use p2panda_sync::log_sync::LogSyncProtocol; use tokio::runtime::{Builder, Runtime}; use tokio::sync::OnceCell; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; use tracing::warn; -use crate::document::DocumentId; +use crate::document::{DocumentId, SubscribableDocument}; use crate::network::Network; use crate::operation::{LogType, create_operation, validate_operation}; use crate::store::{DocumentStore, OperationStore}; -pub enum NodeCommand { - /// Broadcast a "text delta" on the gossip overlay. - /// - /// This should be used to inform all subscribed peers about small changes to the text - /// document (Delta-Based CRDT). - Delta { bytes: Vec }, - - /// Same as [`NodeCommand::Delta`] next to persisting a whole snapshot and pruning. - /// - /// Snapshots contain the whole text document history and are much larger than deltas. This - /// data will only be sent to newly incoming peers via the sync protocol. - /// - /// Since a snapshot contains all data we need to reliably reconcile documents (it is a - /// State-Based CRDT) this command prunes all our logs and removes past snapshot- and delta - /// operations. - DeltaWithSnapshot { - delta_bytes: Vec, - snapshot_bytes: Vec, - }, -} - -pub type NodeSender = mpsc::Sender; - -pub type NodeReceiver = mpsc::Receiver>; - +#[derive(Clone)] pub struct Node { inner: Arc, } @@ -118,7 +92,7 @@ impl Node { }); } - pub fn create_document(&self) -> Result<(DocumentId, NodeSender, NodeReceiver)> { + pub fn create_document(&self) -> Result { let private_key = self.inner.private_key.get().expect("private key"); let mut operation_store = self.inner.operation_store.clone(); @@ -139,150 +113,165 @@ impl Node { .extension() .expect("document id from our own logs"); - // Add ourselves as an author to the document store. - self.inner.runtime.block_on(async { - self.inner - .document_store - .add_author(document_id, private_key.public_key()) - .await - })?; - - let (tx, rx) = self.subscribe(document_id)?; - - Ok((document_id, tx, rx)) - } - - pub fn join_document(&self, document_id: DocumentId) -> Result<(NodeSender, NodeReceiver)> { - let document = document_id.into(); - let (tx, rx) = self.subscribe(document)?; - Ok((tx, rx)) + Ok(document_id) } - fn subscribe(&self, document_id: DocumentId) -> Result<(NodeSender, NodeReceiver)> { - let (to_network, mut from_app) = mpsc::channel::(512); - let (to_app, from_network) = mpsc::channel(512); - + pub async fn subscribe( + &self, + document_id: DocumentId, + document: T, + ) -> Result<()> { let private_key = self.inner.private_key.get().expect("private key").clone(); // Add ourselves as an author to the document store. - self.inner.runtime.block_on(async { - self.inner - .document_store - .add_author(document_id, private_key.public_key()) - .await - })?; + self.inner + .document_store + .add_author(document_id, private_key.public_key()) + .await?; + + let inner_clone = self.inner.clone(); + let (document_tx, mut document_rx) = self + .inner + .runtime + .spawn(async move { + let network = inner_clone + .network + // Allow concurrent calls by awaiting network instance as it might be still + // in process of initialisation. + .get_or_init(|| async { + unreachable!("network was initialised in `run` method"); + }) + .await; + network.subscribe(document_id).await + }) + .await + .unwrap()?; + self.inner + .document_store + .set_subscription_for_document(document_id, document_tx) + .await; let inner = self.inner.clone(); - let _result: JoinHandle> = self.inner.runtime.spawn(async move { - let network = inner - .network - // Allow concurrent calls by awaiting network instance as it might be still - // in process of initialisation. - .get_or_init(|| async { - unreachable!("network was initialised in `run` method"); - }) - .await; - - let (document_tx, mut document_rx) = network.subscribe(document_id).await?; - + self.inner.runtime.spawn(async move { // Process the operations and forward application messages to app layer. This is where // we "materialize" our application state from incoming "application events". - let document_store = inner.document_store.clone(); - let _result: JoinHandle> = tokio::task::spawn(async move { - while let Some(operation) = document_rx.recv().await { - // Validation for our custom "document" extension. - if let Err(err) = validate_operation(&operation, &document_id) { - warn!( - public_key = %operation.header.public_key, - seq_num = %operation.header.seq_num, - "{err}" - ); - continue; - } - - // When we discover a new author we need to add them to our document store. - document_store - .add_author(document_id, operation.header.public_key) - .await?; - - // Forward the payload up to the app. - if let Some(body) = operation.body { - to_app.send(body.to_bytes()).await?; - } + while let Some(operation) = document_rx.recv().await { + // Validation for our custom "document" extension. + if let Err(err) = validate_operation(&operation, &document_id) { + warn!( + public_key = %operation.header.public_key, + seq_num = %operation.header.seq_num, + "{err}" + ); + continue; } - Ok(()) - }); - - // Task for handling events coming from the application layer. - let mut operation_store = inner.operation_store.clone(); - let _result: JoinHandle> = tokio::task::spawn(async move { - while let Some(command) = from_app.recv().await { - // Create the p2panda operations with application message as payload. - match command { - NodeCommand::Delta { bytes } => { - // Append one operation to our "ephemeral" delta log. - let operation = create_operation( - &mut operation_store, - &private_key, - LogType::Delta, - Some(document_id), - Some(&bytes), - false, - ) - .await?; - - // Broadcast operation on gossip overlay. - document_tx.send(operation).await?; - } - NodeCommand::DeltaWithSnapshot { - snapshot_bytes, - delta_bytes, - } => { - // Append an operation to our "snapshot" log and set the prune flag to - // true. This will remove previous snapshots. - // - // Snapshots are not broadcasted on the gossip overlay as they would be - // too large. Peers will sync them up when they join the document. - create_operation( - &mut operation_store, - &private_key, - LogType::Snapshot, - Some(document_id), - Some(&snapshot_bytes), - true, - ) - .await?; - - // Append an operation to our "ephemeral" delta log and set the prune - // flag to true. - // - // This signals removing all previous "delta" operations now. This is - // some sort of garbage collection whenever we snapshot. Snapshots - // already contain all history, there is no need to keep duplicate - // "delta" data around. - let operation = create_operation( - &mut operation_store, - &private_key, - LogType::Delta, - Some(document_id), - Some(&delta_bytes), - true, - ) - .await?; - - // Broadcast operation on gossip overlay. - document_tx.send(operation).await?; - } - }; + // When we discover a new author we need to add them to our document store. + inner + .document_store + .add_author(document_id, operation.header.public_key) + .await + .expect("Unable to add author to DocumentStore"); + + // Forward the payload up to the app. + if let Some(body) = operation.body { + document.bytes_received(operation.header.public_key, &body.to_bytes()); } + } + }); - Ok(()) - }); + Ok(()) + } - Ok(()) - }); + /// Broadcast a "text delta" on the gossip overlay. + /// + /// This should be used to inform all subscribed peers about small changes to the text + /// document (Delta-Based CRDT). + pub async fn delta(&self, document_id: DocumentId, bytes: Vec) -> Result<()> { + let private_key = self.inner.private_key.get().expect("private key"); + + // Append one operation to our "ephemeral" delta log. + let operation = create_operation( + &mut self.inner.operation_store.clone(), + &private_key, + LogType::Delta, + Some(document_id), + Some(&bytes), + false, + ) + .await?; + + let document_tx = self + .inner + .document_store + .subscription_for_document(document_id) + .await + .expect("Not subscribed to document"); + + // Broadcast operation on gossip overlay. + document_tx.send(operation).await?; + + Ok(()) + } + + /// Same as [`Self::Delta`] next to persisting a whole snapshot and pruning. + /// + /// Snapshots contain the whole text document history and are much larger than deltas. This + /// data will only be sent to newly incoming peers via the sync protocol. + /// + /// Since a snapshot contains all data we need to reliably reconcile documents (it is a + /// State-Based CRDT) this command prunes all our logs and removes past snapshot- and delta + /// operations. + pub async fn delta_with_snapshot( + &self, + document_id: DocumentId, + delta_bytes: Vec, + snapshot_bytes: Vec, + ) -> Result<()> { + let private_key = self.inner.private_key.get().expect("private key"); + + // Append an operation to our "snapshot" log and set the prune flag to + // true. This will remove previous snapshots. + // + // Snapshots are not broadcasted on the gossip overlay as they would be + // too large. Peers will sync them up when they join the document. + create_operation( + &mut self.inner.operation_store.clone(), + &private_key, + LogType::Snapshot, + Some(document_id), + Some(&snapshot_bytes), + true, + ) + .await?; + + // Append an operation to our "ephemeral" delta log and set the prune + // flag to true. + // + // This signals removing all previous "delta" operations now. This is + // some sort of garbage collection whenever we snapshot. Snapshots + // already contain all history, there is no need to keep duplicate + // "delta" data around. + let operation = create_operation( + &mut self.inner.operation_store.clone(), + &private_key, + LogType::Delta, + Some(document_id.into()), + Some(&delta_bytes), + true, + ) + .await?; + + let document_tx = self + .inner + .document_store + .subscription_for_document(document_id) + .await + .expect("Not subscribed to document"); + + // Broadcast operation on gossip overlay. + document_tx.send(operation).await?; - Ok((to_network, from_network)) + Ok(()) } } diff --git a/aardvark-node/src/store.rs b/aardvark-node/src/store.rs index f784ad31..df867eed 100644 --- a/aardvark-node/src/store.rs +++ b/aardvark-node/src/store.rs @@ -1,10 +1,11 @@ use std::collections::{HashMap, HashSet}; use std::hash::Hash as StdHash; use std::sync::Arc; +use tokio::sync::mpsc; use anyhow::Result; use async_trait::async_trait; -use p2panda_core::PublicKey; +use p2panda_core::{Operation, PublicKey}; use p2panda_store::MemoryStore; use p2panda_sync::log_sync::TopicLogMap; use serde::{Deserialize, Serialize}; @@ -21,6 +22,7 @@ pub struct DocumentStore { #[derive(Debug)] struct DocumentStoreInner { authors: HashMap>, + document_tx: HashMap>>, } impl DocumentStore { @@ -28,10 +30,28 @@ impl DocumentStore { Self { inner: Arc::new(RwLock::new(DocumentStoreInner { authors: HashMap::new(), + document_tx: HashMap::new(), })), } } + pub async fn set_subscription_for_document( + &self, + document_id: DocumentId, + tx: mpsc::Sender>, + ) { + let mut store = self.inner.write().await; + store.document_tx.insert(document_id, tx); + } + + pub async fn subscription_for_document( + &self, + document_id: DocumentId, + ) -> Option>> { + let store = self.inner.read().await; + store.document_tx.get(&document_id).cloned() + } + pub async fn add_author(&self, document: DocumentId, public_key: PublicKey) -> Result<()> { let mut store = self.inner.write().await; store From 550c4e136ba7f80a58921d7f1210826bc4454dd8 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Wed, 19 Mar 2025 16:59:56 +0100 Subject: [PATCH 6/8] doc: Remove loro abstraction Creating and subscribing to a loro doc is quite straight forward. I think having an abstraction between the `Document` and loro makes the code unnecessarily more complicated. Also if we decide to run the CRDT in a distinct thread we can use the `Document` object directly since it's Send+Sync. --- aardvark-doc/src/crdt.rs | 377 ----------------------------------- aardvark-doc/src/document.rs | 207 ++++++++++++------- aardvark-doc/src/lib.rs | 1 - 3 files changed, 134 insertions(+), 451 deletions(-) delete mode 100644 aardvark-doc/src/crdt.rs diff --git a/aardvark-doc/src/crdt.rs b/aardvark-doc/src/crdt.rs deleted file mode 100644 index 3bf19410..00000000 --- a/aardvark-doc/src/crdt.rs +++ /dev/null @@ -1,377 +0,0 @@ -use std::cell::RefCell; -use std::fmt; -use std::sync::Arc; - -use loro::event::{Diff, DiffEvent}; -use loro::{EventTriggerKind, ExportMode, LoroDoc, Subscription}; -use thiserror::Error; - -/// Identifier of container where we handle the text CRDT in a Loro document. -/// -/// Loro documents can contain multiple different CRDT types in one document. We can address these -/// with identifiers. -const TEXT_CONTAINER_ID: &str = "document"; - -pub type EventReceiver = async_channel::Receiver; - -/// Manages a Conflict-free Replicated Data Type (CRDTs) to resolve parallel edits by multiple -/// authors on the same text document. -/// -/// Internally this uses a text CRDT implementation by [Loro](https://www.loro.dev/). This -/// interface serves merely as a wrapper to bring Loro and it's data into the shape we need, -/// without worrying too much about the internal details of Loro. -#[derive(Debug)] -pub struct TextCrdt { - doc: RefCell, - event_rx: EventReceiver, - #[allow(dead_code)] - subscription: Subscription, - #[allow(dead_code)] - subscription_local: Subscription, -} - -impl TextCrdt { - /// Returns new instance managing a text CRDT. Use this when creating a new document. - /// - /// The peer id represents the identity of the author applying local changes (that's - /// essentially us), it needs be strictly unique. - pub fn new(peer_id: u64) -> Self { - let doc = LoroDoc::new(); - doc.set_record_timestamp(false); - doc.set_peer_id(peer_id) - .expect("set peer id for new document"); - - let text = doc.get_text(TEXT_CONTAINER_ID); - - // NOTE(adz): We're introducing a non-tokio channel implementation here as using a tokio - // channel would cause a panic in this setup. - // - // Tokio (rightly) informs us about using a `send_blocking` inside the same thread where - // the async runtime operates, thus potentially blocking it. - // - // This is not optimal but seems to work for now, later we might want to look into running - // the whole CRDT logic in a separate thread. - let (event_tx, event_rx) = async_channel::bounded::(64); - - let subscription = { - let event_tx = event_tx.clone(); - doc.subscribe( - &text.id(), - Arc::new(move |loro_event| { - let triggered_by = loro_event.triggered_by; - let deltas = { - let loro_deltas = extract_text_deltas(loro_event); - absolute_deltas(loro_deltas) - }; - let event = TextCrdtEvent::from_deltas(triggered_by, deltas); - let _ = event_tx.send_blocking(event); - }), - ) - }; - - let subscription_local = doc.subscribe_local_update(Box::new(move |bytes| { - let _ = event_tx.send_blocking(TextCrdtEvent::LocalEncoded(bytes.to_owned())); - true - })); - - Self { - doc: RefCell::new(doc), - event_rx, - subscription, - subscription_local, - } - } - - /// Returns text CRDT instance from a snapshot. - /// - /// Use this when restoring an existing, local document (for example when it was stored on your - /// file system) or when receiving a full snapshot from another peer after joining an existing - /// document. - /// - /// The peer id represents the identity of the author applying local changes (that's - /// essentially us), it needs be strictly unique. - #[allow(dead_code)] - pub fn from_bytes(peer_id: u64, bytes: &[u8]) -> Result { - let crdt = Self::new(peer_id); - { - let inner = crdt.doc.borrow_mut(); - inner - .import_with(bytes, "snapshot") - .map_err(|err| TextCrdtError::Imported(err))?; - } - Ok(crdt) - } - - /// Subscribe to changes to the document. - /// - /// This should be used as the "source of truth" for all text operations (local and remote text - /// inserts and removals), affecting all "higher layer" state (text buffer). - /// - /// ## Local Changes - /// - /// ```text - /// -> User types something - /// -> Text CRDT "insert" or "removal" called - /// -> Commit & create "local" delta event - /// -> Delta Event received via subscription - /// -> Apply delta to text buffer - /// ``` - /// - /// ## Remote Changes - /// - /// ```text - /// -> Received deltas from remote peer (via networking layer) - /// -> Apply encoded delta to Text CRDT - /// -> Commit & create "remote" delta event - /// -> Delta Event received via subscription - /// -> Apply delta to text buffer - /// ``` - pub fn subscribe(&self) -> EventReceiver { - self.event_rx.clone() - } - - /// Inserts text at the given unicode position. - /// - /// This text change gets directly committed, causing a local "delta event" which should be - /// used to update "higher layer" state, like the text buffer. Read - /// [subscribe](#method.subscribe) for receiving and handling these events. - pub fn insert(&self, index: usize, chunk: &str) -> Result<(), TextCrdtError> { - let doc = self.doc.borrow_mut(); - let text = doc.get_text(TEXT_CONTAINER_ID); - text.insert(index, chunk) - .map_err(|err| TextCrdtError::Local(err))?; - doc.commit(); - Ok(()) - } - - /// Removes range of text at the given unicode position with unicode length. - /// - /// This text change gets directly committed, causing a local "delta event" which should be - /// used to update "higher layer" state, like the text buffer. Read - /// [subscribe](#method.subscribe) for receiving and handling these events. - pub fn remove(&self, index: usize, len: usize) -> Result<(), TextCrdtError> { - let doc = self.doc.borrow_mut(); - let text = doc.get_text(TEXT_CONTAINER_ID); - text.delete(index, len) - .map_err(|err| TextCrdtError::Local(err))?; - doc.commit(); - Ok(()) - } - - /// Applies encoded text deltas received from a remote peer. - /// - /// Deltas are encoded according to the Loro specification. - pub fn apply_encoded_delta(&self, bytes: &[u8]) -> Result<(), TextCrdtError> { - let doc = self.doc.borrow_mut(); - doc.import_with(bytes, "delta") - .map_err(|err| TextCrdtError::Imported(err))?; - Ok(()) - } - - /// Exports encoded snapshot of current Text CRDT state. - /// - /// This can be used to persist the current state of the text CRDT on the file system or during - /// initial sync when a remote peer joins our document. See [from_bytes](#method.from_bytes) - /// for the reverse method. - /// - /// Snapshots are encoded according to the Loro specification. - #[allow(dead_code)] - pub fn snapshot(&self) -> Vec { - let doc = self.doc.borrow(); - doc.export(ExportMode::Snapshot) - .expect("encoded crdt snapshot") - } - - /// Applies local text changes. - #[cfg(test)] - fn apply_delta(&self, delta: TextDelta) -> Result<(), TextCrdtError> { - match delta { - TextDelta::Insert { index, chunk } => { - self.insert(index, &chunk)?; - } - TextDelta::Remove { index, len } => { - self.remove(index, len)?; - } - } - Ok(()) - } -} - -impl fmt::Display for TextCrdt { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let doc = self.doc.borrow(); - let text = doc.get_text(TEXT_CONTAINER_ID); - write!(f, "{}", text.to_string()) - } -} - -#[derive(Clone, Debug)] -pub enum TextDelta { - Insert { index: usize, chunk: String }, - Remove { index: usize, len: usize }, -} - -/// Events to notify other parts of the application about text changes. -#[derive(Debug)] -pub enum TextCrdtEvent { - /// We've locally inserted or removed text. - /// - /// Use this to apply changes to your local text buffer, etc. - Local(Vec), - - /// Same as `Local` but in encoded form, including additional information like a vector clock, - /// so we can send that delta over the wire to other peers. - /// - /// Use this to send "small" text changes directly to other peers, for example via gossip - /// broadcast. - LocalEncoded(Vec), - - /// Remote peer inserted or removed text. - /// - /// If a snapshot was received (for example during initial sync), this event might contain - /// multiple deltas. - /// - /// Use this to apply remote changes to your local text buffer. - Remote(Vec), -} - -impl TextCrdtEvent { - fn from_deltas(triggered_by: EventTriggerKind, deltas: Vec) -> Self { - match triggered_by { - EventTriggerKind::Local => Self::Local(deltas), - EventTriggerKind::Import => Self::Remote(deltas), - EventTriggerKind::Checkout => unimplemented!("document checkouts are not supported"), - } - } -} - -/// Loro supports all sorts of CRDTs (Lists, Maps, Counters, etc.), this method extracts only the -/// deltas related to collaborative text editing of our known text container. -fn extract_text_deltas(diff_event: DiffEvent<'_>) -> Vec> { - diff_event - .events - .into_iter() - .filter_map(|event| { - if event.is_unknown { - return None; - } - - if let Diff::Text(loro_deltas) = event.diff { - Some(loro_deltas) - } else { - None - } - }) - .collect() -} - -/// Converts relative text deltas to absolute ones. -/// -/// Loro's text deltas are represented as QuillJS "Deltas" which encode text inserts and removals -/// relative to position 0 in the document. -/// -/// For our purposes we need absolute positions, as our text buffer implementation requires the -/// exact position for every text insertion and removal. -/// -/// Read more: https://quilljs.com/docs/delta/ -fn absolute_deltas(loro_deltas: Vec>) -> Vec { - let mut deltas = Vec::new(); - - for commit in loro_deltas { - let mut index = 0; - for loro_delta in commit { - let delta = match loro_delta { - loro::TextDelta::Retain { retain, .. } => { - index += retain; - continue; - } - loro::TextDelta::Insert { insert, .. } => { - let len = insert.len(); - let result = TextDelta::Insert { - index, - chunk: insert, - }; - index += len; - result - } - loro::TextDelta::Delete { delete } => TextDelta::Remove { index, len: delete }, - }; - deltas.push(delta); - } - } - - deltas -} - -#[derive(Debug, Error)] -pub enum TextCrdtError { - #[error("could not apply local text change: {0}")] - Local(loro::LoroError), - - #[error("could not apply imported text change: {0}")] - Imported(loro::LoroError), -} - -#[cfg(test)] -mod tests { - use super::{TextCrdt, TextCrdtEvent}; - - #[test] - fn from_snapshot() { - let doc_1 = TextCrdt::new(1); - - doc_1.insert(0, "Hello,").unwrap(); - doc_1.insert(6, " World!").unwrap(); - doc_1.remove(7, 1).unwrap(); - doc_1.insert(7, "W").unwrap(); - - let doc_2 = TextCrdt::from_bytes(2, &doc_1.snapshot()).unwrap(); - - assert_eq!(doc_1.to_string(), "Hello, World!"); - assert_eq!(doc_1.to_string(), doc_2.to_string()); - } - - #[tokio::test] - async fn from_encoded_deltas() { - let doc_1 = TextCrdt::new(1); - let rx_1 = doc_1.subscribe(); - - doc_1.insert(0, "Hello,").unwrap(); - doc_1.insert(6, " World!").unwrap(); - doc_1.remove(7, 1).unwrap(); - doc_1.insert(7, "W").unwrap(); - - let doc_2 = TextCrdt::new(2); - - for _ in 0..8 { - if let TextCrdtEvent::LocalEncoded(bytes) = rx_1.recv().await.unwrap() { - doc_2.apply_encoded_delta(&bytes).unwrap(); - } - } - - assert_eq!(doc_1.to_string(), "Hello, World!"); - assert_eq!(doc_1.to_string(), doc_2.to_string()); - } - - #[tokio::test] - async fn from_deltas() { - let doc_1 = TextCrdt::new(1); - let rx_1 = doc_1.subscribe(); - - doc_1.insert(0, "Hello").unwrap(); - doc_1.remove(1, 4).unwrap(); - doc_1.insert(1, "uhu!").unwrap(); - - assert_eq!(doc_1.to_string(), "Huhu!"); - - let doc_2 = TextCrdt::new(2); - - for _ in 0..6 { - if let TextCrdtEvent::Local(deltas) = rx_1.recv().await.unwrap() { - doc_2.apply_delta(deltas.get(0).unwrap().clone()).unwrap(); - } - } - - assert_eq!(doc_1.to_string(), doc_2.to_string()); - } -} diff --git a/aardvark-doc/src/document.rs b/aardvark-doc/src/document.rs index 3e495cb4..c967b4e3 100644 --- a/aardvark-doc/src/document.rs +++ b/aardvark-doc/src/document.rs @@ -1,6 +1,7 @@ use std::cell::{Cell, OnceCell}; use std::fmt; use std::str::FromStr; +use std::sync::Arc; use std::sync::OnceLock; use aardvark_node::document::{DocumentId as DocumentIdNode, SubscribableDocument}; @@ -8,10 +9,10 @@ use anyhow::Result; use glib::prelude::*; use glib::subclass::{Signal, prelude::*}; use glib::{Properties, clone}; +use loro::{ExportMode, LoroDoc, event::Diff}; use p2panda_core::{HashError, PublicKey}; use tracing::error; -use crate::crdt::{TextCrdt, TextCrdtEvent, TextDelta}; use crate::service::Service; #[derive(Clone, Debug, PartialEq, Eq, glib::Boxed)] @@ -33,13 +34,18 @@ impl fmt::Display for DocumentId { } mod imp { + /// Identifier of container where we handle the text CRDT in a Loro document. + /// + /// Loro documents can contain multiple different CRDT types in one document. + const TEXT_CONTAINER_ID: &str = "document"; + use super::*; #[derive(Properties, Default)] #[properties(wrapper_type = super::Document)] pub struct Document { #[property(name = "text", get = Self::text, type = String)] - crdt_doc: OnceCell, + crdt_doc: OnceCell, #[property(get, construct_only, set = Self::set_id)] id: OnceCell, #[property(get, set)] @@ -56,7 +62,11 @@ mod imp { impl Document { pub fn text(&self) -> String { - self.crdt_doc.get().expect("crdt_doc to be set").to_string() + self.crdt_doc + .get() + .expect("crdt_doc to be set") + .get_text(TEXT_CONTAINER_ID) + .to_string() } fn set_id(&self, id: Option) { @@ -67,22 +77,26 @@ mod imp { pub fn splice_text(&self, index: i32, delete_len: i32, chunk: &str) -> Result<()> { let doc = self.crdt_doc.get().expect("crdt_doc to be set"); + let text = doc.get_text(TEXT_CONTAINER_ID); if delete_len == 0 { - doc.insert(index as usize, chunk) + text.insert(index as usize, chunk) .expect("update document after text insertion"); } else { - doc.remove(index as usize, delete_len as usize) + text.delete(index as usize, delete_len as usize) .expect("update document after text removal"); } + doc.commit(); + Ok(()) } + /// Apply changes to the CRDT from a message received from another peer pub fn on_remote_message(&self, bytes: &[u8]) { let doc = self.crdt_doc.get().expect("crdt_doc to be set"); - if let Err(err) = doc.apply_encoded_delta(&bytes) { + if let Err(err) = doc.import_with(bytes, "delta") { eprintln!("received invalid message: {}", err); } } @@ -122,6 +136,119 @@ mod imp { ), ); } + + fn setup_loro_document(&self) { + let public_key = self.obj().service().public_key(); + let obj = self.obj(); + let doc = LoroDoc::new(); + // The peer id represents the identity of the author applying local changes (that's + // essentially us), it needs be strictly unique. + doc.set_peer_id({ + // Take first 8 bytes of public key (32 bytes) to determine a unique "peer id" + // which is used to keep authors apart inside the text crdt. + // + // TODO(adz): This is strictly speaking not collision-resistant but we're limited + // here by the 8 bytes / 64 bit from the u64 `PeerId` type from Loro. In practice + // this should not really be a problem, but it would be nice if the Loro API would + // change some day. + let mut buf = [0u8; 8]; + buf[..8].copy_from_slice(&public_key.as_bytes()[..8]); + u64::from_be_bytes(buf) + }) + .expect("set peer id for new document"); + + let text = doc.get_text(TEXT_CONTAINER_ID); + doc.subscribe( + &text.id(), + Arc::new(clone!( + #[weak] + obj, + move |loro_event| { + let text_deltas = loro_event.events.into_iter().filter_map(|event| { + if event.is_unknown { + return None; + } + + // Loro supports all sorts of CRDTs (Lists, Maps, Counters, etc.), + // extract only text deltas. + if let Diff::Text(loro_deltas) = event.diff { + Some(loro_deltas) + } else { + None + } + }); + + // Loro's text deltas are represented as QuillJS "Deltas" + // See: https://quilljs.com/docs/delta/ + for commit in text_deltas { + let mut index = 0; + for delta in commit { + match delta { + loro::TextDelta::Retain { retain, .. } => { + index += retain; + } + loro::TextDelta::Insert { insert, .. } => { + let len = insert.len(); + obj.imp().emit_text_inserted(index as i32, insert); + index += len; + } + loro::TextDelta::Delete { delete } => { + obj.imp().emit_range_deleted( + index as i32, + (index + delete) as i32, + ); + } + } + } + } + } + )), + ) + .detach(); + + doc.subscribe_local_update(Box::new(clone!( + #[weak] + obj, + #[upgrade_or] + false, + move |delta_bytes| { + let delta_bytes = delta_bytes.to_vec(); + // Move a strong reference to the Document into the spawn, + // to ensure changes are always propagated to the network + glib::spawn_future(async move { + // Broadcast a "text delta" to all peers and persist the snapshot. + // + // TODO(adz): We should consider persisting the snapshot every x + // times or x seconds, not sure yet what logic makes the most + // sense. + let snapshot_bytes = obj + .imp() + .crdt_doc + .get() + .expect("crdt_doc to be set") + .export(ExportMode::Snapshot) + .expect("encoded crdt snapshot"); + + if let Err(error) = obj + .service() + .node() + .delta_with_snapshot(obj.id().0, delta_bytes, snapshot_bytes) + .await + { + error!( + "Failed to send snapshot of document to the network: {}", + error + ); + } + }); + + true + } + ))) + .detach(); + + self.crdt_doc.set(doc).unwrap(); + } } #[glib::derived_properties] @@ -153,22 +280,7 @@ mod imp { self.set_id(Some(DocumentId(document_id))); } - let public_key = self.obj().service().public_key(); - let crdt_doc = TextCrdt::new({ - // Take first 8 bytes of public key (32 bytes) to determine a unique "peer id" - // which is used to keep authors apart inside the text crdt. - // - // TODO(adz): This is strictly speaking not collision-resistant but we're limited - // here by the 8 bytes / 64 bit from the u64 `PeerId` type from Loro. In practice - // this should not really be a problem, but it would be nice if the Loro API would - // change some day. - let mut buf = [0u8; 8]; - buf[..8].copy_from_slice(&public_key.as_bytes()[..8]); - u64::from_be_bytes(buf) - }); - - let crdt_doc_rx = crdt_doc.subscribe(); - self.crdt_doc.set(crdt_doc).expect("crdt_doc not to be set"); + self.setup_loro_document(); let obj = self.obj(); glib::spawn_future(clone!( @@ -182,57 +294,6 @@ mod imp { } } )); - - let obj = self.obj(); - glib::spawn_future(clone!( - #[weak] - obj, - async move { - while let Ok(event) = crdt_doc_rx.recv().await { - match event { - TextCrdtEvent::LocalEncoded(delta_bytes) => { - // Broadcast a "text delta" to all peers and persist the snapshot. - // - // TODO(adz): We should consider persisting the snapshot every x - // times or x seconds, not sure yet what logic makes the most - // sense. - let snapshot_bytes = obj - .imp() - .crdt_doc - .get() - .expect("crdt_doc to be set") - .snapshot(); - - if obj - .service() - .node() - .delta_with_snapshot(obj.id().0, delta_bytes, snapshot_bytes) - .await - .is_err() - { - break; - } - } - TextCrdtEvent::Local(text_deltas) - | TextCrdtEvent::Remote(text_deltas) => { - for delta in text_deltas { - match delta { - TextDelta::Insert { index, chunk } => { - obj.imp().emit_text_inserted(index as i32, chunk); - } - TextDelta::Remove { index, len } => { - obj.imp().emit_range_deleted( - index as i32, - (index + len) as i32, - ); - } - } - } - } - } - } - } - )); } } } diff --git a/aardvark-doc/src/lib.rs b/aardvark-doc/src/lib.rs index 59e7d306..139b17f9 100644 --- a/aardvark-doc/src/lib.rs +++ b/aardvark-doc/src/lib.rs @@ -1,3 +1,2 @@ -mod crdt; pub mod document; pub mod service; From 4dbda565d50bf6ee34cbefaa75502c5060630310 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Thu, 20 Mar 2025 18:51:18 +0100 Subject: [PATCH 7/8] doc: Fix missing notify::text --- aardvark-doc/src/document.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/aardvark-doc/src/document.rs b/aardvark-doc/src/document.rs index c967b4e3..49da2f33 100644 --- a/aardvark-doc/src/document.rs +++ b/aardvark-doc/src/document.rs @@ -201,6 +201,7 @@ mod imp { } } } + obj.notify_text(); } )), ) From 23b2391d90cbb194f6266c800fe8a4929341c4d8 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Thu, 20 Mar 2025 18:58:15 +0100 Subject: [PATCH 8/8] doc: Add basic tests for document syncing --- aardvark-doc/src/lib.rs | 128 ++++++++++++++++++++++++++++++++++++++ aardvark-node/src/node.rs | 2 +- 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/aardvark-doc/src/lib.rs b/aardvark-doc/src/lib.rs index 139b17f9..405cc979 100644 --- a/aardvark-doc/src/lib.rs +++ b/aardvark-doc/src/lib.rs @@ -1,2 +1,130 @@ pub mod document; pub mod service; + +#[cfg(test)] +mod tests { + use crate::document::Document; + use crate::service::Service; + use glib::object::ObjectExt; + + #[test] + fn create_document() { + let context = glib::MainContext::default(); + let service = Service::new(); + let test_string = "Hello World"; + service.startup(); + let document = Document::new(&service, None); + context.iteration(false); + assert!(document.insert_text(0, test_string).is_ok()); + assert_eq!(document.text(), test_string); + } + + #[test] + fn basic_sync() { + let main_loop = glib::MainLoop::new(None, false); + let test_string = "Hello World"; + let service = Service::new(); + service.startup(); + + let document = Document::new(&service, None); + let id = document.id(); + + let service2 = Service::new(); + service2.startup(); + let document2 = Document::new(&service2, Some(&id)); + + assert_eq!(document.id(), document2.id()); + main_loop.context().spawn(async move { + assert!(document.insert_text(0, test_string).is_ok()); + assert_eq!(document.text(), test_string); + }); + + let main_loop_clone = main_loop.clone(); + document2.connect_notify(Some("text"), move |_, _| { + main_loop_clone.quit(); + }); + + main_loop.run(); + service.shutdown(); + service2.shutdown(); + + assert_eq!(document2.text(), test_string); + } + + #[test] + fn sync_multiple_changes() { + let main_loop = glib::MainLoop::new(None, false); + let expected_string = "Hello, World!"; + let service = Service::new(); + service.startup(); + + let document = Document::new(&service, None); + let id = document.id(); + + let service2 = Service::new(); + service2.startup(); + let document2 = Document::new(&service2, Some(&id)); + + assert_eq!(document.id(), document2.id()); + main_loop.context().spawn(async move { + assert!(document.insert_text(0, "Hello,").is_ok()); + assert!(document.insert_text(6, " World!").is_ok()); + assert!(document.delete_range(7, 8).is_ok()); + assert!(document.insert_text(7, "W").is_ok()); + assert_eq!(document.text(), expected_string); + }); + + let main_loop_clone = main_loop.clone(); + document2.connect_notify(Some("text"), move |document2, _| { + if document2.text() == expected_string { + main_loop_clone.quit(); + } + }); + + main_loop.run(); + + service.shutdown(); + service2.shutdown(); + } + + #[test] + fn sync_longer_text() { + let main_loop = glib::MainLoop::new(None, false); + let test_string = "Et aut omnis eos corporis ut. Qui est blanditiis blanditiis. + Sit quia nam maxime accusantium ut voluptatem. Fuga consequuntur animi et et est. + Unde voluptas consequatur mollitia id odit optio harum sint. Fugit quo aut et laborum aut cupiditate."; + let expected_string = format!( + "{}{}{}{}", + test_string, test_string, test_string, test_string + ); + let service = Service::new(); + service.startup(); + + let document = Document::new(&service, None); + let id = document.id(); + + let service2 = Service::new(); + service2.startup(); + let document2 = Document::new(&service2, Some(&id)); + + assert_eq!(document.id(), document2.id()); + main_loop.context().spawn(async move { + assert!(document.insert_text(0, test_string).is_ok()); + assert!(document.insert_text(0, test_string).is_ok()); + assert!(document.insert_text(0, test_string).is_ok()); + assert!(document.insert_text(0, test_string).is_ok()); + }); + + let main_loop_clone = main_loop.clone(); + document2.connect_notify(Some("text"), move |document2, _| { + if document2.text() == expected_string { + main_loop_clone.quit(); + } + }); + + main_loop.run(); + + service.shutdown(); + service2.shutdown(); + } +} diff --git a/aardvark-node/src/node.rs b/aardvark-node/src/node.rs index 440b4d34..83098355 100644 --- a/aardvark-node/src/node.rs +++ b/aardvark-node/src/node.rs @@ -68,7 +68,7 @@ impl Node { let operation_store = self.inner.operation_store.clone(); let inner = self.inner.clone(); - self.inner.runtime.spawn(async move { + self.inner.runtime.block_on(async move { inner .private_key .set(private_key.clone())