- Updated to
automerge@0.8.0
- A bug where locally unavailable documents sent by peers with an announce policy set to false would be marked as unavailable
TcpDialer::newwhich takes aUrlparameter, rather than a host and a port or a socket address.Repo::dial_tcp()to simplify construction ofTcpDialer.- Allow documents syncing over the TCP transport to be up to 8gb size instead of Tokio's default 8mb frame size
- Exposed receiving
ConnectionHandles viaaccept(). Users can now subscribe to anevents()stream directly on the handle, orawaitforhandshake_completed().
The main focus of this release is a new connection management API which
replaces the Repo::connect method with separate APIs for making outgoing
connections (referred to as a "dialer") and accepting incoming connections
(an "acceptor"). The payoff is that we can automatically handle reconnection.
A second, smaller feature is the addition of a RepoObserver to help with
monitoring running samod processes.
What follows is a quick guide to the new connections API; for more details on breaking changes and other added features, see the "Added" and "Breaking Changes" sections which follow the guide.
// Before
let conn = repo.connect(rx, tx, ConnDirection::Outgoing).unwrap();
conn.handshake_complete().await.unwrap();
// After
let handle = repo.dial(BackoffConfig::default(), Arc::new(my_dialer)).unwrap();
// or, for WebSocket URLs directly:
let handle = repo.dial_websocket(url, BackoffConfig::default()).unwrap();// Before
let conn = repo.connect(rx, tx, ConnDirection::Incoming).unwrap();
// After — set up an acceptor once, then hand transports to it as they arrive:
let acceptor = repo.make_acceptor(url).unwrap();
// From arbitrary streams/sinks:
acceptor.accept(Transport::new(rx_stream, tx_sink)).unwrap();
// From an axum WebSocket upgrade handler:
acceptor.accept_axum(socket).unwrap();
// From a raw tungstenite stream:
acceptor.accept_tungstenite(ws_stream).unwrap();// Dialer side — wait for the first successful connection:
let peer_info = handle.established().await?;
// Dialer side — stream every lifecycle event:
let mut events = handle.events();
while let Some(event) = events.next().await {
match event {
DialerEvent::Connected { peer_info } => { /* … */ }
DialerEvent::Disconnected => { /* … */ }
DialerEvent::Reconnecting { attempt } => { /* … */ }
DialerEvent::MaxRetriesReached => { break; }
}
}
// Acceptor side — react to clients connecting/disconnecting:
let mut events = acceptor.events();
while let Some(event) = events.next().await {
match event {
AcceptorEvent::ClientConnected { connection_id, peer_info } => { /* … */ }
AcceptorEvent::ClientDisconnected { connection_id } => { /* … */ }
}
}use samod::{Dialer, Transport};
use url::Url;
use std::pin::Pin;
struct MyDialer { url: Url }
impl Dialer for MyDialer {
fn url(&self) -> Url { self.url.clone() }
fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Transport, Box<dyn std::error::Error + Send + Sync>>> + Send>> {
Box::pin(async move {
// establish your transport here, then wrap it:
Ok(Transport::new(my_rx_stream, my_tx_sink))
})
}
}If you have a custom RuntimeHandle, add the new required method:
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send {
tokio::time::sleep(duration) // or your runtime's equivalent
}Repo::dial— initiates an outgoing connection using a user-suppliedDialerimplementation. Returns aDialerHandleimmediately (non-blocking). TheDialerHandlecan be used to monitor the connection.Repo::dial_websocket— convenience wrapper aroundRepo::dialfor WebSocket connections; takes aUrlandBackoffConfig.Repo::acceptor— registers a URL as a listening address and returns anAcceptorHandlefor accepting incoming connections on that URL.Transport— a type-erased(Stream, Sink)pair returned byDialer::connectand accepted byAcceptorHandle::accept.RuntimeHandle::sleep— required new method on theRuntimeHandletrait; must return a future that completes after the givenDuration. This powers back-off delays inside the reconnection logic.samod::NeverAnnounce, anAnnouncePolicywhich never announces any documents to peers- Add the
native-tlsfeature totungsteniteandtokio-tungstenitewhen thetungstenitefeature is enabled. This allows using TLS with WebSocket dialers. - The
samod::tokio_iomodule which containsTcpDialerfor connecting to servers over TCP andAcceptorHandle::accetp_tokio_iowhich implements the receiving end samod::RepoObserverwhich is notified of events occurring in theRepowhich may be of interest for monitoring (e.g. for producing throughput statistics on sync message processing)
- A bug where requests which were forwarded across peers who were configured to not announce documents would fail to resolve on the original requestor
- Some interoperability bugs with the JS implementation
- A bug where if a connection failed during establishment the io loop would crash, causing the whole repo to stop working
Repo::connect/Connectionremoved. The old unifiedconnectmethod and theConnection/ConnDirectiontypes have been removed entirely. UseRepo::dial(outgoing) andRepo::acceptor+AcceptorHandle::accept(incoming) instead.RuntimeHandle::sleepis now required. Any customRuntimeHandleimplementation must add asleep(duration: Duration) -> impl Futuremethod.Repo::connect_tungstenite/Repo::accept_axum/Repo::accept_tungstenitemoved toAcceptorHandle. Callacceptor.accept_axum(socket)/acceptor.accept_tungstenite(ws)instead of the repo directly.Repo::when_connectedremoved. Replace calls likerepo.when_connected(peer_id).awaitwithdialer_handle.established().await(dialer side) or listening onacceptor.events()forAcceptorEvent::ClientConnected(acceptor side). Connection IDs are now obtained from those futures/events rather than fromwhen_connected.load_localreplacesloadonLocalPoolrepos.RepoBuilder::load_local()must be used instead ofRepoBuilder::load()when building a repo for afutures::executor::LocalPoolruntime.
- Added
DocumentActor::begin_modificationmethod for modification patterns which can't be expressed using a callback
- There was a bug where document larger than 8Mb wouldn't sync due to a size
limit in the length delimited codec used in
Repo::connect_tokio_io. This has been fixed by increasing the limit
Repo::connected_peersnow returns a stream of changes to the connected peers, as well as the currently connected peers. This makes it possible to track changes to connected peers consistently
- Fixed a deadlock when using ConcurrencyConfig::Threadpool and loading more documents than threads in the pool
- impl
PartialEqandEqforAutomergeUrl
- samod: Use doc_cfg instead of doc_auto_cfg to make docs.rs build work
This release is a reasonably chunky change to the API of samod. Instead
of having Repo::connect return a future which needs to be driven, Repo::connect
now handles the connection on the runtime it was created with and returns a
Connection object, which can be used to examine the connection.
Repo::connectis now synchronous and returns aConnectionobjectDocHandle::{we_have_their_changes, they_have_our_changes}methods for waiting for sync to completeDocHandle::peersfor listening to changes to the state of peers connected to a given documentAutomergeUrl::document_id
Repo::connectis now synchronous and returns aConnectionobject. This means that where previously you would spawn a future here, you can now just call the method and forget about the connection (unless you want it for some reason)
From<u32>andInto<u32>impls forsamod_core::{CommandId, IoTaskId, ConnectionId, DocumentActorId}Cloneimplementations for varioussamod_coretypesDocumentId::as_bytesUnixTimestamp::from_millis
- Updated to
automerge0.7.1
AutomergeUrlis now clone (thanks to @pkgw)
- Updated to
automerge0.7.0
- It is now possible to use
StorageandAnnouncePolicyimplementations which can't supportSendfutures. This is enabled by implementing the newLocalStorageandLocalAnnouncePolicytraits instead ofStorageandAnnouncePolicy. and loading the repo using aLocalRuntimeHandlerather than aRuntimeHandle.
- Use of a
rayonthreadpool to run document actors is now gated behind thethreadpoolfeature flag and theRepoBuilder::with_concurrencymethod. RuntimeHandleis now much simpler and only requires aspawnfunctionStorageKeyno longer implementsFromIterator<String>orFrom<Vec<String>>, useStorageKey::from_partsinstead
- It was possible for the compaction logic to completely delete a document in some cases, fixed in alexjg#19
- Added a
RuntimeHandlefor afutures::executor::LocalPool - Add
Repo::connect_tokio_ioas a convenience for connecting atokio::io::Async{ReadWrite}source as a length delimited stream/sink combination - Added a bunch of docs
- Rename
samod::Samodtosamod::Repoandsamod::SamodBuildertosamod::RepoBuilder
This release is a significant rewrite of the samod_core crate to not use
async/await syntax internally. It introduces no changes to samod but there
are breaking changes in samod_core:
samod_core::ActorResultis now calledsamod_core::DocActorResultand has an additionalstoppedfieldHub::loadno longer takes arand::RngorUnixTimestampargumentSamodLoader::steptakes an additionalrand::RngargumentSamodLoader::provide_io_resultno longer takes aUnixTimestampargumentHub::handle_eventtakes an additionalrand::Rngargument
- Fix a deadlock
- Make
samod_core::Hubandsamod_core::SamodLoaderSend(#3 by @matheus23)