Skip to content

Commit 806eb59

Browse files
committed
fix(proxy): address code review feedback for connection timeout
- Pass actual timeout duration in ErrorResponse message to client - Document rationale for using 57P05 over 08006 error code - Document yield_now() as best-effort flush with known limitation - Document advisory lock test race condition timing margin - Replace magic number 12345 with named ADVISORY_LOCK_ID constant
1 parent e19b758 commit 806eb59

4 files changed

Lines changed: 47 additions & 10 deletions

File tree

packages/cipherstash-proxy-integration/src/connection_resilience.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ mod tests {
1414
use tokio::task::JoinSet;
1515
use tokio::time::{timeout, Duration};
1616

17+
/// Advisory lock ID used in isolation tests. Arbitrary value — just needs to be
18+
/// unique across concurrently running test suites against the same database.
19+
const ADVISORY_LOCK_ID: i64 = 99_001;
20+
1721
/// A slow query on one connection does not block other connections through the proxy.
1822
#[tokio::test]
1923
async fn slow_query_does_not_block_other_connections() {
@@ -121,31 +125,40 @@ mod tests {
121125
}
122126

123127
/// An advisory-lock-blocked connection through the proxy does not block other proxy connections.
128+
///
129+
/// Note: Connection B notifies readiness before `pg_advisory_lock` reaches PostgreSQL.
130+
/// The 500ms sleep provides a generous margin for the lock attempt to reach PG, but is
131+
/// not strictly guaranteed. In practice this has not caused flakiness.
124132
#[tokio::test]
125133
async fn advisory_lock_blocked_connection_does_not_block_proxy() {
134+
let lock_query = format!("SELECT pg_advisory_lock({ADVISORY_LOCK_ID})");
135+
let unlock_query = format!("SELECT pg_advisory_unlock({ADVISORY_LOCK_ID})");
136+
126137
let result = timeout(Duration::from_secs(30), async {
127138
// Connection A: hold an advisory lock (connect directly to PG to avoid proxy interference)
128139
let client_a = connect_with_tls(PG_PORT).await;
129140
client_a
130-
.simple_query("SELECT pg_advisory_lock(12345)")
141+
.simple_query(&lock_query)
131142
.await
132143
.unwrap();
133144

134145
let a_ready = Arc::new(Notify::new());
135146
let a_ready_tx = a_ready.clone();
147+
let b_lock_query = lock_query.clone();
148+
let b_unlock_query = unlock_query.clone();
136149

137150
// Connection B: through proxy, attempt to acquire the same lock (will block)
138151
let b_handle = tokio::spawn(async move {
139152
let client_b = connect_with_tls(PROXY).await;
140153
a_ready_tx.notify_one();
141154
// This will block until A releases the lock
142155
client_b
143-
.simple_query("SELECT pg_advisory_lock(12345)")
156+
.simple_query(&b_lock_query)
144157
.await
145158
.unwrap();
146159
// Release after acquiring
147160
client_b
148-
.simple_query("SELECT pg_advisory_unlock(12345)")
161+
.simple_query(&b_unlock_query)
149162
.await
150163
.unwrap();
151164
});
@@ -168,7 +181,7 @@ mod tests {
168181

169182
// Release the lock so B can complete
170183
client_a
171-
.simple_query("SELECT pg_advisory_unlock(12345)")
184+
.simple_query(&unlock_query)
172185
.await
173186
.unwrap();
174187

packages/cipherstash-proxy/src/postgresql/error_handler.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ pub trait PostgreSqlErrorHandler {
5353
Error::Encrypt(EncryptError::UnknownKeysetIdentifier { .. }) => {
5454
ErrorResponse::system_error(err.to_string())
5555
}
56-
Error::ConnectionTimeout { .. } => ErrorResponse::connection_timeout(),
56+
Error::ConnectionTimeout { .. } => {
57+
ErrorResponse::connection_timeout(err.to_string())
58+
}
5759
_ => ErrorResponse::system_error(err.to_string()),
5860
}
5961
}
@@ -102,6 +104,14 @@ mod tests {
102104
.map(|f| f.value.as_str())
103105
}
104106

107+
fn error_message(response: &ErrorResponse) -> Option<&str> {
108+
response
109+
.fields
110+
.iter()
111+
.find(|f| f.code == ErrorResponseCode::Message)
112+
.map(|f| f.value.as_str())
113+
}
114+
105115
#[test]
106116
fn connection_timeout_maps_to_57p05() {
107117
let handler = TestHandler;
@@ -110,6 +120,10 @@ mod tests {
110120
};
111121
let response = handler.error_to_response(err);
112122
assert_eq!(error_code(&response), Some(CODE_IDLE_SESSION_TIMEOUT));
123+
assert_eq!(
124+
error_message(&response),
125+
Some("Connection timed out after 5000 ms")
126+
);
113127
}
114128

115129
#[test]

packages/cipherstash-proxy/src/postgresql/handler.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,12 +261,15 @@ pub async fn handler(client_stream: AsyncStream, context: Context<ZeroKms>) -> R
261261

262262
let result = tokio::try_join!(client_to_server, server_to_client);
263263

264-
if let Err(Error::ConnectionTimeout { .. }) = &result {
265-
let error_response = ErrorResponse::connection_timeout();
264+
if let Err(ref err @ Error::ConnectionTimeout { .. }) = &result {
265+
let error_response = ErrorResponse::connection_timeout(err.to_string());
266266
if let Ok(bytes) = BytesMut::try_from(error_response) {
267267
let _ = timeout_sender.send(bytes);
268268
}
269-
// Brief yield to allow ChannelWriter to flush
269+
// Best-effort yield to allow ChannelWriter to flush the error response
270+
// before the connection tears down. Not guaranteed — if the runtime doesn't
271+
// schedule the writer task before teardown, the client may see a connection
272+
// reset instead of the ErrorResponse.
270273
tokio::task::yield_now().await;
271274
}
272275

packages/cipherstash-proxy/src/postgresql/messages/error_response.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,14 @@ pub enum ErrorResponseCode {
6060
}
6161

6262
impl ErrorResponse {
63-
pub fn connection_timeout() -> Self {
63+
/// Create a FATAL error response for connection timeout.
64+
///
65+
/// Uses PostgreSQL error code 57P05 (idle_session_timeout). While this code
66+
/// is technically for idle session timeouts, it is the closest match for a
67+
/// proxy-enforced connection timeout. The alternative 08006 (connection_failure)
68+
/// implies a network-level failure, which is misleading — the proxy is
69+
/// deliberately terminating a connection that exceeded its time limit.
70+
pub fn connection_timeout(message: String) -> Self {
6471
Self {
6572
fields: vec![
6673
Field {
@@ -77,7 +84,7 @@ impl ErrorResponse {
7784
},
7885
Field {
7986
code: ErrorResponseCode::Message,
80-
value: "Connection timeout".to_string(),
87+
value: message,
8188
},
8289
],
8390
}

0 commit comments

Comments
 (0)