Skip to content

Commit e19b758

Browse files
committed
feat(proxy): add connection timeout with default 120s and proper error reporting
- Add configurable connection_timeout (default 120s, 0 to disable) to DatabaseConfig - Fix ConnectionTimeout error message to display milliseconds (was showing seconds) - Map ConnectionTimeout to PostgreSQL error code 57P05 instead of generic 58000 - Send ErrorResponse to client on timeout via channel writer - Add pull_policy: never to docker-compose proxy services to ensure local builds - Add connect_timeout to integration test client config - Add unit tests for config defaults, error message format, and error code mapping - Add integration tests for connection isolation under load - Document local build requirement in DEVELOPMENT.md
1 parent e1ff080 commit e19b758

10 files changed

Lines changed: 312 additions & 4 deletions

File tree

DEVELOPMENT.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,9 @@ mise run proxy:down
321321
Running Proxy in a container cross-compiles a binary for Linux and the current architecture (`amd64`, `arm64`), then copies the binary into the container.
322322
We cross-compile binary outside the container because it's generally faster, due to packages already being cached, and slower network and disk IO in Docker.
323323

324+
> [!IMPORTANT]
325+
> **Proxy must always be built from source for testing.** The `proxy:up` task builds a binary from source (`build:binary`), packages it into a Docker image tagged `cipherstash/proxy:latest` (`build:docker`), then starts it via `docker compose up`. The `tests/docker-compose.yml` file uses `pull_policy: never` on the proxy services to ensure Docker never pulls the released image from Docker Hub. If you see an error like `pull access denied` or `image not found`, run `mise run build` first to build the local image.
326+
324327
### Building
325328

326329
Build a binary and Docker image:
@@ -460,6 +463,8 @@ This project uses `docker compose` to manage containers and networking.
460463

461464
The configuration for those containers is in `tests/docker-compose.yml`.
462465

466+
The proxy services in `tests/docker-compose.yml` use `pull_policy: never` to ensure Docker never pulls the released `cipherstash/proxy:latest` image from Docker Hub. The image must be built locally from source via `mise run proxy:up` (or `mise run build`). This guarantees integration tests always run against the current source code.
467+
463468
The integration tests use the `proxy:up` and `proxy:down` commands documented above to run containers in different configurations.
464469

465470
#### Configuration: configuring PostgreSQL containers in integration tests

mise.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,9 @@ cp -v {{config_root}}/target/{{ target }}/release/cipherstash-proxy {{config_roo
663663

664664
[tasks."build:docker"]
665665
depends = ["eql:download"]
666+
# Tags the image as cipherstash/proxy:latest locally.
667+
# tests/docker-compose.yml uses pull_policy: never to ensure this local image
668+
# is always used instead of the released image on Docker Hub.
666669
description = "Build a Docker image for cipherstash-proxy"
667670
run = """
668671
{% set default_platform = "linux/" ~ arch() | replace(from="x86_64", to="amd64") %}

packages/cipherstash-proxy-integration/src/common.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ pub fn connection_config(port: u16) -> tokio_postgres::Config {
128128
.port(port)
129129
.user(&username)
130130
.password(&password)
131-
.dbname(&name);
131+
.dbname(&name)
132+
.connect_timeout(std::time::Duration::from_secs(10));
132133

133134
db_config
134135
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/// Tests that validate proxy connection isolation under load.
2+
///
3+
/// These tests verify that:
4+
/// - Slow queries on one connection don't block other connections
5+
/// - The proxy accepts new connections after client disconnect
6+
/// - Concurrent connections under load remain responsive
7+
/// - Blocked backend connections don't affect other proxy connections
8+
#[cfg(test)]
9+
mod tests {
10+
use crate::common::{connect_with_tls, PROXY, PG_PORT};
11+
use std::sync::Arc;
12+
use std::time::Instant;
13+
use tokio::sync::Notify;
14+
use tokio::task::JoinSet;
15+
use tokio::time::{timeout, Duration};
16+
17+
/// A slow query on one connection does not block other connections through the proxy.
18+
#[tokio::test]
19+
async fn slow_query_does_not_block_other_connections() {
20+
let result = timeout(Duration::from_secs(30), async {
21+
let client_a = connect_with_tls(PROXY).await;
22+
let client_b = connect_with_tls(PROXY).await;
23+
24+
// Connection A: run a slow query
25+
let a_handle = tokio::spawn(async move {
26+
client_a
27+
.simple_query("SELECT pg_sleep(5)")
28+
.await
29+
.unwrap();
30+
});
31+
32+
// Brief pause to ensure A's query is in flight
33+
tokio::time::sleep(Duration::from_millis(200)).await;
34+
35+
// Connection B: run a fast query, should complete promptly
36+
let start = Instant::now();
37+
let rows = client_b.simple_query("SELECT 1").await.unwrap();
38+
let elapsed = start.elapsed();
39+
40+
assert!(!rows.is_empty(), "Expected result from SELECT 1");
41+
assert!(
42+
elapsed < Duration::from_secs(2),
43+
"Fast query took {elapsed:?}, expected < 2s — proxy may be blocking"
44+
);
45+
46+
a_handle.await.unwrap();
47+
})
48+
.await;
49+
50+
result.expect("Test timed out after 30s");
51+
}
52+
53+
/// Proxy accepts new connections after a client disconnects.
54+
#[tokio::test]
55+
async fn proxy_accepts_new_connections_after_client_disconnect() {
56+
let result = timeout(Duration::from_secs(10), async {
57+
// First connection: query, then drop
58+
{
59+
let client = connect_with_tls(PROXY).await;
60+
let rows = client.simple_query("SELECT 1").await.unwrap();
61+
assert!(!rows.is_empty());
62+
}
63+
// Client dropped here
64+
65+
// Brief pause
66+
tokio::time::sleep(Duration::from_millis(100)).await;
67+
68+
// Second connection: should work fine
69+
let client = connect_with_tls(PROXY).await;
70+
let rows = client.simple_query("SELECT 1").await.unwrap();
71+
assert!(!rows.is_empty());
72+
})
73+
.await;
74+
75+
result.expect("Test timed out after 10s");
76+
}
77+
78+
/// Concurrent slow and fast connections: fast queries complete promptly under slow load.
79+
#[tokio::test]
80+
async fn concurrent_connections_under_slow_load() {
81+
let result = timeout(Duration::from_secs(30), async {
82+
let mut join_set = JoinSet::new();
83+
84+
// 5 slow connections
85+
for _ in 0..5 {
86+
join_set.spawn(async {
87+
let client = connect_with_tls(PROXY).await;
88+
client
89+
.simple_query("SELECT pg_sleep(3)")
90+
.await
91+
.unwrap();
92+
});
93+
}
94+
95+
// Brief pause to let slow queries start
96+
tokio::time::sleep(Duration::from_millis(300)).await;
97+
98+
// 5 fast connections, each should complete promptly
99+
for _ in 0..5 {
100+
join_set.spawn(async {
101+
let start = Instant::now();
102+
let client = connect_with_tls(PROXY).await;
103+
let rows = client.simple_query("SELECT 1").await.unwrap();
104+
let elapsed = start.elapsed();
105+
106+
assert!(!rows.is_empty());
107+
assert!(
108+
elapsed < Duration::from_secs(5),
109+
"Fast query took {elapsed:?} under slow load, expected < 5s"
110+
);
111+
});
112+
}
113+
114+
while let Some(result) = join_set.join_next().await {
115+
result.unwrap();
116+
}
117+
})
118+
.await;
119+
120+
result.expect("Test timed out after 30s");
121+
}
122+
123+
/// An advisory-lock-blocked connection through the proxy does not block other proxy connections.
124+
#[tokio::test]
125+
async fn advisory_lock_blocked_connection_does_not_block_proxy() {
126+
let result = timeout(Duration::from_secs(30), async {
127+
// Connection A: hold an advisory lock (connect directly to PG to avoid proxy interference)
128+
let client_a = connect_with_tls(PG_PORT).await;
129+
client_a
130+
.simple_query("SELECT pg_advisory_lock(12345)")
131+
.await
132+
.unwrap();
133+
134+
let a_ready = Arc::new(Notify::new());
135+
let a_ready_tx = a_ready.clone();
136+
137+
// Connection B: through proxy, attempt to acquire the same lock (will block)
138+
let b_handle = tokio::spawn(async move {
139+
let client_b = connect_with_tls(PROXY).await;
140+
a_ready_tx.notify_one();
141+
// This will block until A releases the lock
142+
client_b
143+
.simple_query("SELECT pg_advisory_lock(12345)")
144+
.await
145+
.unwrap();
146+
// Release after acquiring
147+
client_b
148+
.simple_query("SELECT pg_advisory_unlock(12345)")
149+
.await
150+
.unwrap();
151+
});
152+
153+
// Wait for B to be connected and attempting the lock
154+
a_ready.notified().await;
155+
tokio::time::sleep(Duration::from_millis(500)).await;
156+
157+
// Connection C: through proxy, should complete immediately despite B being blocked
158+
let start = Instant::now();
159+
let client_c = connect_with_tls(PROXY).await;
160+
let rows = client_c.simple_query("SELECT 1").await.unwrap();
161+
let elapsed = start.elapsed();
162+
163+
assert!(!rows.is_empty());
164+
assert!(
165+
elapsed < Duration::from_secs(2),
166+
"Connection C took {elapsed:?}, expected < 2s — blocked connection may be affecting proxy"
167+
);
168+
169+
// Release the lock so B can complete
170+
client_a
171+
.simple_query("SELECT pg_advisory_unlock(12345)")
172+
.await
173+
.unwrap();
174+
175+
b_handle.await.unwrap();
176+
})
177+
.await;
178+
179+
result.expect("Test timed out after 30s");
180+
}
181+
}

packages/cipherstash-proxy-integration/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod common;
2+
mod connection_resilience;
23
mod decrypt;
34
mod diagnostics;
45
mod disable_mapping;

packages/cipherstash-proxy/src/config/database.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,14 @@ impl DatabaseConfig {
7575
self.password.to_owned().risky_unwrap()
7676
}
7777

78+
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 120_000;
79+
7880
pub fn connection_timeout(&self) -> Option<Duration> {
79-
self.connection_timeout.map(Duration::from_millis)
81+
match self.connection_timeout {
82+
Some(0) => None,
83+
Some(ms) => Some(Duration::from_millis(ms)),
84+
None => Some(Duration::from_millis(Self::DEFAULT_CONNECTION_TIMEOUT_MS)),
85+
}
8086
}
8187

8288
pub fn server_name(&self) -> Result<ServerName<'_>, Error> {
@@ -104,6 +110,37 @@ impl DatabaseConfig {
104110
}
105111
}
106112

113+
#[cfg(test)]
114+
mod tests {
115+
use super::*;
116+
117+
#[test]
118+
fn connection_timeout_defaults_to_120_seconds() {
119+
let config = DatabaseConfig::for_testing();
120+
assert_eq!(
121+
config.connection_timeout(),
122+
Some(Duration::from_secs(120))
123+
);
124+
}
125+
126+
#[test]
127+
fn connection_timeout_zero_disables_timeout() {
128+
let mut config = DatabaseConfig::for_testing();
129+
config.connection_timeout = Some(0);
130+
assert_eq!(config.connection_timeout(), None);
131+
}
132+
133+
#[test]
134+
fn connection_timeout_custom_value_in_millis() {
135+
let mut config = DatabaseConfig::for_testing();
136+
config.connection_timeout = Some(5000);
137+
assert_eq!(
138+
config.connection_timeout(),
139+
Some(Duration::from_millis(5000))
140+
);
141+
}
142+
}
143+
107144
///
108145
/// Password is NEVER EVER displayed
109146
///

packages/cipherstash-proxy/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub enum Error {
2727
#[error("Connection closed by client")]
2828
ConnectionClosed,
2929

30-
#[error("Connection timed out after {} ms", duration.as_secs())]
30+
#[error("Connection timed out after {} ms", duration.as_millis())]
3131
ConnectionTimeout { duration: Duration },
3232

3333
#[error("Error creating connection")]
@@ -522,4 +522,12 @@ mod tests {
522522

523523
assert_eq!(format!("Statement encountered an internal error. This may be a bug in the statement mapping module of CipherStash Proxy. Please visit {ERROR_DOC_BASE_URL}#mapping-internal-error for more information."), message);
524524
}
525+
526+
#[test]
527+
fn connection_timeout_message_shows_millis() {
528+
let error = Error::ConnectionTimeout {
529+
duration: Duration::from_millis(5000),
530+
};
531+
assert_eq!(error.to_string(), "Connection timed out after 5000 ms");
532+
}
525533
}

packages/cipherstash-proxy/src/postgresql/error_handler.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub trait PostgreSqlErrorHandler {
5353
Error::Encrypt(EncryptError::UnknownKeysetIdentifier { .. }) => {
5454
ErrorResponse::system_error(err.to_string())
5555
}
56+
Error::ConnectionTimeout { .. } => ErrorResponse::connection_timeout(),
5657
_ => ErrorResponse::system_error(err.to_string()),
5758
}
5859
}
@@ -67,3 +68,55 @@ pub trait PostgreSqlErrorHandler {
6768
/// * `error_response` - The ErrorResponse to send to the client
6869
fn send_error_response(&mut self, err: Error) -> Result<(), Error>;
6970
}
71+
72+
#[cfg(test)]
73+
mod tests {
74+
use super::*;
75+
use crate::postgresql::messages::error_response::{
76+
ErrorResponseCode, CODE_IDLE_SESSION_TIMEOUT, CODE_SYSTEM_ERROR,
77+
};
78+
use std::time::Duration;
79+
80+
/// Minimal implementation of PostgreSqlErrorHandler for testing the default method.
81+
struct TestHandler;
82+
83+
impl PostgreSqlErrorHandler for TestHandler {
84+
fn client_sender(&mut self) -> &mut Sender {
85+
unimplemented!("not needed for error_to_response tests")
86+
}
87+
88+
fn client_id(&self) -> i32 {
89+
0
90+
}
91+
92+
fn send_error_response(&mut self, _err: Error) -> Result<(), Error> {
93+
unimplemented!("not needed for error_to_response tests")
94+
}
95+
}
96+
97+
fn error_code(response: &ErrorResponse) -> Option<&str> {
98+
response
99+
.fields
100+
.iter()
101+
.find(|f| f.code == ErrorResponseCode::Code)
102+
.map(|f| f.value.as_str())
103+
}
104+
105+
#[test]
106+
fn connection_timeout_maps_to_57p05() {
107+
let handler = TestHandler;
108+
let err = Error::ConnectionTimeout {
109+
duration: Duration::from_millis(5000),
110+
};
111+
let response = handler.error_to_response(err);
112+
assert_eq!(error_code(&response), Some(CODE_IDLE_SESSION_TIMEOUT));
113+
}
114+
115+
#[test]
116+
fn unknown_error_maps_to_system_error() {
117+
let handler = TestHandler;
118+
let err = Error::Unknown;
119+
let response = handler.error_to_response(err);
120+
assert_eq!(error_code(&response), Some(CODE_SYSTEM_ERROR));
121+
}
122+
}

packages/cipherstash-proxy/src/postgresql/handler.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ pub async fn handler(client_stream: AsyncStream, context: Context<ZeroKms>) -> R
232232
}
233233
}
234234

235+
let timeout_sender = channel_writer.sender();
235236
tokio::spawn(channel_writer.receive());
236237

237238
let client_to_server = async {
@@ -258,7 +259,18 @@ pub async fn handler(client_stream: AsyncStream, context: Context<ZeroKms>) -> R
258259
Ok::<(), Error>(())
259260
};
260261

261-
tokio::try_join!(client_to_server, server_to_client)?;
262+
let result = tokio::try_join!(client_to_server, server_to_client);
263+
264+
if let Err(Error::ConnectionTimeout { .. }) = &result {
265+
let error_response = ErrorResponse::connection_timeout();
266+
if let Ok(bytes) = BytesMut::try_from(error_response) {
267+
let _ = timeout_sender.send(bytes);
268+
}
269+
// Brief yield to allow ChannelWriter to flush
270+
tokio::task::yield_now().await;
271+
}
272+
273+
result?;
262274

263275
Ok(())
264276
}

0 commit comments

Comments
 (0)