diff --git a/examples/basic.rs b/examples/basic.rs index dc2bbc5..f56ca72 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -40,6 +40,7 @@ pub async fn main() -> anyhow::Result<()> { status_interval: std::time::Duration::from_secs(1), idle_wakeup_interval: std::time::Duration::from_secs(30), buffer_events: 8192, + binary: false, }; let mut repl = ReplicationClient::connect(cfg).await?; diff --git a/examples/bounded_replay.rs b/examples/bounded_replay.rs index 60cee6a..7a210d2 100644 --- a/examples/bounded_replay.rs +++ b/examples/bounded_replay.rs @@ -52,6 +52,7 @@ pub async fn main() -> anyhow::Result<()> { status_interval: std::time::Duration::from_secs(1), idle_wakeup_interval: std::time::Duration::from_secs(30), buffer_events: 8192, + binary: false, }; let mut repl = ReplicationClient::connect(cfg).await?; diff --git a/examples/checkpointed.rs b/examples/checkpointed.rs index c44eeb7..37e8e82 100644 --- a/examples/checkpointed.rs +++ b/examples/checkpointed.rs @@ -42,6 +42,7 @@ async fn main() -> anyhow::Result<()> { status_interval: std::time::Duration::from_secs(1), idle_wakeup_interval: std::time::Duration::from_secs(30), buffer_events: 8192, + binary: false, }; let mut repl = ReplicationClient::connect(cfg).await?; diff --git a/examples/control_and_stream.rs b/examples/control_and_stream.rs index c0aa5ca..1eb24b3 100644 --- a/examples/control_and_stream.rs +++ b/examples/control_and_stream.rs @@ -100,6 +100,7 @@ pub async fn main() -> anyhow::Result<()> { status_interval: std::time::Duration::from_secs(1), idle_wakeup_interval: std::time::Duration::from_secs(30), buffer_events: 8192, + binary: false, }; let mut repl = ReplicationClient::connect(cfg).await?; diff --git a/examples/with_mtls.rs b/examples/with_mtls.rs index 320f662..c33fc0d 100644 --- a/examples/with_mtls.rs +++ b/examples/with_mtls.rs @@ -74,6 +74,7 @@ pub async fn main() -> anyhow::Result<()> { status_interval: std::time::Duration::from_secs(1), idle_wakeup_interval: std::time::Duration::from_secs(30), buffer_events: 8192, + binary: false, }; let mut repl = ReplicationClient::connect(cfg).await?; diff --git a/examples/with_tls.rs b/examples/with_tls.rs index 2f53ef5..15540c1 100644 --- a/examples/with_tls.rs +++ b/examples/with_tls.rs @@ -58,6 +58,7 @@ pub async fn main() -> anyhow::Result<()> { status_interval: std::time::Duration::from_secs(1), idle_wakeup_interval: std::time::Duration::from_secs(30), buffer_events: 8192, + binary: false, }; let mut repl = ReplicationClient::connect(cfg).await?; diff --git a/src/client/worker.rs b/src/client/worker.rs index 37bb654..8c0c729 100644 --- a/src/client/worker.rs +++ b/src/client/worker.rs @@ -182,10 +182,15 @@ impl WorkerState { ) -> Result<()> { // Escape single quotes in publication name let publication = self.cfg.publication.replace('\'', "''"); + let binary_opt = if self.cfg.binary { + ", binary 'true'" + } else { + "" + }; let sql = format!( "START_REPLICATION SLOT {} LOGICAL {} \ - (proto_version '1', publication_names '{}', messages 'true')", - self.cfg.slot, self.cfg.start_lsn, publication, + (proto_version '1', publication_names '{}', messages 'true'{})", + self.cfg.slot, self.cfg.start_lsn, publication, binary_opt, ); write_query(stream, &sql).await?; diff --git a/src/config.rs b/src/config.rs index 05e9fc4..2dbb1cb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -313,6 +313,19 @@ pub struct ReplicationConfig { /// /// Default: 8192 events pub buffer_events: usize, + + /// Request column values in PostgreSQL binary wire format. + /// + /// When `true`, the `binary 'true'` option is added to `START_REPLICATION` + /// and `pgoutput` will encode column values using each type's binary + /// send function instead of text output. + /// + /// Leave as `false` (the default) unless you control both ends and have + /// measured a real win: any column whose type lacks a binary send function + /// will cause the walsender to error and close the replication stream. + /// + /// Requires PostgreSQL 14 or newer. + pub binary: bool, } impl Default for ReplicationConfig { @@ -331,6 +344,7 @@ impl Default for ReplicationConfig { status_interval: Duration::from_secs(10), idle_wakeup_interval: Duration::from_secs(10), buffer_events: 8192, + binary: false, } } } @@ -482,6 +496,12 @@ impl ReplicationConfig { self } + /// Request binary-format column values from `pgoutput` (requires PG 14+). + pub fn with_binary(mut self, binary: bool) -> Self { + self.binary = binary; + self + } + /// Returns the connection string for display (password masked). /// /// Useful for logging without exposing credentials.