From 3fba89c2a36c7e432e64cc58ba4c7aef6bd7b413 Mon Sep 17 00:00:00 2001 From: Rodrigo Navarro Date: Thu, 21 May 2026 18:05:36 -0300 Subject: [PATCH] Allow creating replication slots using multiple publications. --- examples/basic.rs | 2 +- examples/bounded_replay.rs | 2 +- examples/checkpointed.rs | 2 +- examples/with_mtls.rs | 2 +- examples/with_tls.rs | 2 +- src/client/worker.rs | 4 +--- src/config.rs | 34 +++++++++++++++++++++++++++++++--- 7 files changed, 37 insertions(+), 11 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index dc2bbc5..ee56832 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -33,7 +33,7 @@ pub async fn main() -> anyhow::Result<()> { database, tls: TlsConfig::disabled(), slot, - publication, + publication: publication.into(), start_lsn, stop_at_lsn: None, diff --git a/examples/bounded_replay.rs b/examples/bounded_replay.rs index 60cee6a..aa4822b 100644 --- a/examples/bounded_replay.rs +++ b/examples/bounded_replay.rs @@ -45,7 +45,7 @@ pub async fn main() -> anyhow::Result<()> { database, tls: TlsConfig::disabled(), slot, - publication, + publication: publication.into(), start_lsn, stop_at_lsn: Some(stop_at_lsn), diff --git a/examples/checkpointed.rs b/examples/checkpointed.rs index c44eeb7..973bafb 100644 --- a/examples/checkpointed.rs +++ b/examples/checkpointed.rs @@ -36,7 +36,7 @@ async fn main() -> anyhow::Result<()> { database, tls: TlsConfig::disabled(), slot, - publication, + publication: publication.into(), start_lsn, stop_at_lsn: None, status_interval: std::time::Duration::from_secs(1), diff --git a/examples/with_mtls.rs b/examples/with_mtls.rs index 320f662..20c5a68 100644 --- a/examples/with_mtls.rs +++ b/examples/with_mtls.rs @@ -67,7 +67,7 @@ pub async fn main() -> anyhow::Result<()> { }, slot, - publication, + publication: publication.into(), start_lsn, stop_at_lsn: None, diff --git a/examples/with_tls.rs b/examples/with_tls.rs index 2f53ef5..1103369 100644 --- a/examples/with_tls.rs +++ b/examples/with_tls.rs @@ -51,7 +51,7 @@ pub async fn main() -> anyhow::Result<()> { }, slot, - publication, + publication: publication.into(), start_lsn, stop_at_lsn: None, diff --git a/src/client/worker.rs b/src/client/worker.rs index 37bb654..445fff7 100644 --- a/src/client/worker.rs +++ b/src/client/worker.rs @@ -180,12 +180,10 @@ impl WorkerState { &self, stream: &mut S, ) -> Result<()> { - // Escape single quotes in publication name - let publication = self.cfg.publication.replace('\'', "''"); let sql = format!( "START_REPLICATION SLOT {} LOGICAL {} \ (proto_version '1', publication_names '{}', messages 'true')", - self.cfg.slot, self.cfg.start_lsn, publication, + self.cfg.slot, self.cfg.start_lsn, self.cfg.publication, ); write_query(stream, &sql).await?; diff --git a/src/config.rs b/src/config.rs index 05e9fc4..5bbd387 100644 --- a/src/config.rs +++ b/src/config.rs @@ -214,6 +214,34 @@ impl TlsConfig { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Publication(Vec); + +impl From<&str> for Publication { + fn from(value: &str) -> Self { + Self(vec![value.into()]) + } +} + +impl From for Publication { + fn from(value: String) -> Self { + Self(vec![value]) + } +} + +impl> FromIterator for Publication { + fn from_iter>(iter: T) -> Self { + Self(iter.into_iter().map(|item| item.into()).collect()) + } +} + +impl std::fmt::Display for Publication { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Escape single quotes in publication name + write!(f, "{}", self.0.join(",").replace('\'', "''")) + } +} + /// Configuration for PostgreSQL logical replication connections. /// /// # Example @@ -267,7 +295,7 @@ pub struct ReplicationConfig { /// Name of the publication to subscribe to. /// /// The publication must exist and include the tables you want to replicate. - pub publication: String, + pub publication: Publication, /// LSN position to start replication from. /// @@ -359,7 +387,7 @@ impl ReplicationConfig { password: impl Into, database: impl Into, slot: impl Into, - publication: impl Into, + publication: impl Into, ) -> Self { Self { host: host.into(), @@ -425,7 +453,7 @@ impl ReplicationConfig { password: impl Into, database: impl Into, slot: impl Into, - publication: impl Into, + publication: impl Into, ) -> Self { Self { host: socket_dir.into(),