Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn main() -> anyhow::Result<()> {
status_interval: std::time::Duration::from_secs(1),
idle_wakeup_interval: std::time::Duration::from_secs(30),
buffer_events: 8192,
binary: false,
};

let mut repl = ReplicationClient::connect(cfg).await?;
Expand Down
1 change: 1 addition & 0 deletions examples/bounded_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub async fn main() -> anyhow::Result<()> {
status_interval: std::time::Duration::from_secs(1),
idle_wakeup_interval: std::time::Duration::from_secs(30),
buffer_events: 8192,
binary: false,
};

let mut repl = ReplicationClient::connect(cfg).await?;
Expand Down
1 change: 1 addition & 0 deletions examples/checkpointed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async fn main() -> anyhow::Result<()> {
status_interval: std::time::Duration::from_secs(1),
idle_wakeup_interval: std::time::Duration::from_secs(30),
buffer_events: 8192,
binary: false,
};

let mut repl = ReplicationClient::connect(cfg).await?;
Expand Down
1 change: 1 addition & 0 deletions examples/control_and_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub async fn main() -> anyhow::Result<()> {
status_interval: std::time::Duration::from_secs(1),
idle_wakeup_interval: std::time::Duration::from_secs(30),
buffer_events: 8192,
binary: false,
};

let mut repl = ReplicationClient::connect(cfg).await?;
Expand Down
1 change: 1 addition & 0 deletions examples/with_mtls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub async fn main() -> anyhow::Result<()> {
status_interval: std::time::Duration::from_secs(1),
idle_wakeup_interval: std::time::Duration::from_secs(30),
buffer_events: 8192,
binary: false,
};

let mut repl = ReplicationClient::connect(cfg).await?;
Expand Down
1 change: 1 addition & 0 deletions examples/with_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub async fn main() -> anyhow::Result<()> {
status_interval: std::time::Duration::from_secs(1),
idle_wakeup_interval: std::time::Duration::from_secs(30),
buffer_events: 8192,
binary: false,
};

let mut repl = ReplicationClient::connect(cfg).await?;
Expand Down
9 changes: 7 additions & 2 deletions src/client/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,15 @@ impl WorkerState {
) -> Result<()> {
// Escape single quotes in publication name
let publication = self.cfg.publication.replace('\'', "''");
let binary_opt = if self.cfg.binary {
", binary 'true'"
} else {
""
};
let sql = format!(
"START_REPLICATION SLOT {} LOGICAL {} \
(proto_version '1', publication_names '{}', messages 'true')",
self.cfg.slot, self.cfg.start_lsn, publication,
(proto_version '1', publication_names '{}', messages 'true'{})",
self.cfg.slot, self.cfg.start_lsn, publication, binary_opt,
);
write_query(stream, &sql).await?;

Expand Down
20 changes: 20 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,19 @@ pub struct ReplicationConfig {
///
/// Default: 8192 events
pub buffer_events: usize,

/// Request column values in PostgreSQL binary wire format.
///
/// When `true`, the `binary 'true'` option is added to `START_REPLICATION`
/// and `pgoutput` will encode column values using each type's binary
/// send function instead of text output.
///
/// Leave as `false` (the default) unless you control both ends and have
/// measured a real win: any column whose type lacks a binary send function
/// will cause the walsender to error and close the replication stream.
///
/// Requires PostgreSQL 14 or newer.
pub binary: bool,
}

impl Default for ReplicationConfig {
Expand All @@ -331,6 +344,7 @@ impl Default for ReplicationConfig {
status_interval: Duration::from_secs(10),
idle_wakeup_interval: Duration::from_secs(10),
buffer_events: 8192,
binary: false,
}
}
}
Expand Down Expand Up @@ -482,6 +496,12 @@ impl ReplicationConfig {
self
}

/// Request binary-format column values from `pgoutput` (requires PG 14+).
pub fn with_binary(mut self, binary: bool) -> Self {
self.binary = binary;
self
}

/// Returns the connection string for display (password masked).
///
/// Useful for logging without exposing credentials.
Expand Down