Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn main() -> anyhow::Result<()> {
database,
tls: TlsConfig::disabled(),
slot,
publication,
publication: publication.into(),
start_lsn,
stop_at_lsn: None,

Expand Down
2 changes: 1 addition & 1 deletion examples/bounded_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn main() -> anyhow::Result<()> {
database,
tls: TlsConfig::disabled(),
slot,
publication,
publication: publication.into(),
start_lsn,
stop_at_lsn: Some(stop_at_lsn),

Expand Down
2 changes: 1 addition & 1 deletion examples/checkpointed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> anyhow::Result<()> {
database,
tls: TlsConfig::disabled(),
slot,
publication,
publication: publication.into(),
start_lsn,
stop_at_lsn: None,
status_interval: std::time::Duration::from_secs(1),
Expand Down
2 changes: 1 addition & 1 deletion examples/with_mtls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn main() -> anyhow::Result<()> {
},

slot,
publication,
publication: publication.into(),
start_lsn,
stop_at_lsn: None,

Expand Down
2 changes: 1 addition & 1 deletion examples/with_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn main() -> anyhow::Result<()> {
},

slot,
publication,
publication: publication.into(),
start_lsn,
stop_at_lsn: None,

Expand Down
4 changes: 1 addition & 3 deletions src/client/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,10 @@ impl WorkerState {
&self,
stream: &mut S,
) -> Result<()> {
// Escape single quotes in publication name
let publication = self.cfg.publication.replace('\'', "''");
let sql = format!(
"START_REPLICATION SLOT {} LOGICAL {} \
(proto_version '1', publication_names '{}', messages 'true')",
self.cfg.slot, self.cfg.start_lsn, publication,
self.cfg.slot, self.cfg.start_lsn, self.cfg.publication,
);
write_query(stream, &sql).await?;

Expand Down
34 changes: 31 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,34 @@ impl TlsConfig {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Publication(Vec<String>);

impl From<&str> for Publication {
fn from(value: &str) -> Self {
Self(vec![value.into()])
}
}

impl From<String> for Publication {
fn from(value: String) -> Self {
Self(vec![value])
}
}

impl<A: Into<String>> FromIterator<A> for Publication {
fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
Self(iter.into_iter().map(|item| item.into()).collect())
}
}

impl std::fmt::Display for Publication {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Escape single quotes in publication name
write!(f, "{}", self.0.join(",").replace('\'', "''"))
}
}

/// Configuration for PostgreSQL logical replication connections.
///
/// # Example
Expand Down Expand Up @@ -267,7 +295,7 @@ pub struct ReplicationConfig {
/// Name of the publication to subscribe to.
///
/// The publication must exist and include the tables you want to replicate.
pub publication: String,
pub publication: Publication,

/// LSN position to start replication from.
///
Expand Down Expand Up @@ -359,7 +387,7 @@ impl ReplicationConfig {
password: impl Into<String>,
database: impl Into<String>,
slot: impl Into<String>,
publication: impl Into<String>,
publication: impl Into<Publication>,
) -> Self {
Self {
host: host.into(),
Expand Down Expand Up @@ -425,7 +453,7 @@ impl ReplicationConfig {
password: impl Into<String>,
database: impl Into<String>,
slot: impl Into<String>,
publication: impl Into<String>,
publication: impl Into<Publication>,
) -> Self {
Self {
host: socket_dir.into(),
Expand Down