Skip to content

Commit 56d863e

Browse files
committed
Add proxied network-impairment tests
Add a suite of integration tests under link/tests/proxied that simulate various TCP/proxy impairments using TcpDisconnectProxy to validate client reconnect and resume behavior. New tests include: - blackhole_during_subscribe.rs: blackhole during subscribe handshake and recovery - event_counter_integrity.rs: verify connect/disconnect event counters across outages - gradual_degradation.rs: latency ramp causing reconnect and recovery - heavy_write_burst_recovery.rs: large write backlog delivered exactly once after outage - latency_during_snapshot.rs: latency during initial snapshot and subsequent live delivery - subscribe_during_reconnect.rs: add subscription while reconnecting and ensure delivery - unsubscribe_during_outage.rs: unsubscribe during outage and ensure it is not re-subscribed Also update link/tests/proxied.rs to register the new modules and adjust rapid_flap.rs to ignore an unused connect_count variable. These tests exercise resume/replay, duplicate avoidance, subscription lifecycle, and event-counting correctness under adverse network conditions.
1 parent e6e5f8a commit 56d863e

9 files changed

Lines changed: 1388 additions & 1 deletion

link/tests/proxied.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,22 @@ mod common;
77

88
#[path = "proxied/ack_before_first_batch.rs"]
99
mod ack_before_first_batch;
10+
#[path = "proxied/blackhole_during_subscribe.rs"]
11+
mod blackhole_during_subscribe;
1012
#[path = "proxied/double_outage.rs"]
1113
mod double_outage;
14+
#[path = "proxied/event_counter_integrity.rs"]
15+
mod event_counter_integrity;
16+
#[path = "proxied/gradual_degradation.rs"]
17+
mod gradual_degradation;
18+
#[path = "proxied/heavy_write_burst_recovery.rs"]
19+
mod heavy_write_burst_recovery;
1220
#[path = "proxied/helpers.rs"]
1321
mod helpers;
1422
#[path = "proxied/large_snapshot_repeated_outages.rs"]
1523
mod large_snapshot_repeated_outages;
24+
#[path = "proxied/latency_during_snapshot.rs"]
25+
mod latency_during_snapshot;
1626
#[path = "proxied/live_updates_resume.rs"]
1727
mod live_updates_resume;
1828
#[path = "proxied/loading_resume_with_live_writes.rs"]
@@ -21,6 +31,8 @@ mod loading_resume_with_live_writes;
2131
mod mixed_stage_recovery;
2232
#[path = "proxied/multi_sub_bounce.rs"]
2333
mod multi_sub_bounce;
34+
#[path = "proxied/rapid_flap.rs"]
35+
mod rapid_flap;
2436
#[path = "proxied/server_down_connecting.rs"]
2537
mod server_down_connecting;
2638
#[path = "proxied/server_down_initial_load.rs"]
@@ -29,7 +41,11 @@ mod server_down_initial_load;
2941
mod socket_drop_resume;
3042
#[path = "proxied/staggered_outages.rs"]
3143
mod staggered_outages;
44+
#[path = "proxied/subscribe_during_reconnect.rs"]
45+
mod subscribe_during_reconnect;
3246
#[path = "proxied/transport_impairments.rs"]
3347
mod transport_impairments;
48+
#[path = "proxied/unsubscribe_during_outage.rs"]
49+
mod unsubscribe_during_outage;
3450
#[path = "proxied/update_delete_resume.rs"]
3551
mod update_delete_resume;
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
use super::helpers::*;
2+
use crate::common::tcp_proxy::TcpDisconnectProxy;
3+
use kalam_link::SubscriptionConfig;
4+
use std::time::Duration;
5+
use tokio::time::{sleep, timeout};
6+
7+
/// Blackhole the proxy right as the client sends its subscribe request.
8+
/// The TCP socket stays open but no data flows. The client should detect a
9+
/// dead link (pong timeout), reconnect, re-subscribe, and ultimately deliver
10+
/// the full snapshot plus any rows inserted during the outage.
11+
#[tokio::test]
12+
async fn test_blackhole_during_subscribe_handshake_recovers() {
13+
let result = timeout(Duration::from_secs(60), async {
14+
let writer = match create_test_client() {
15+
Ok(c) => c,
16+
Err(e) => {
17+
eprintln!("Skipping test (writer client unavailable): {}", e);
18+
return;
19+
},
20+
};
21+
22+
let proxy = TcpDisconnectProxy::start(upstream_server_url()).await;
23+
let (client, _connect_count, _disconnect_count) =
24+
match create_test_client_with_events_for_base_url(proxy.base_url()) {
25+
Ok(v) => v,
26+
Err(e) => {
27+
eprintln!("Skipping test (proxy client unavailable): {}", e);
28+
proxy.shutdown().await;
29+
return;
30+
},
31+
};
32+
33+
let suffix = unique_suffix();
34+
let table = format!("default.blackhole_subscribe_{}", suffix);
35+
ensure_table(&writer, &table).await;
36+
37+
// Seed a few rows so the subscription has data to deliver.
38+
for i in 0..5 {
39+
writer
40+
.execute_query(
41+
&format!(
42+
"INSERT INTO {} (id, value) VALUES ('seed-{}', 'val-{}')",
43+
table, i, i
44+
),
45+
None,
46+
None,
47+
None,
48+
)
49+
.await
50+
.expect("insert seed row");
51+
}
52+
53+
client.connect().await.expect("connect through proxy");
54+
assert!(
55+
proxy.wait_for_active_connections(1, Duration::from_secs(2)).await,
56+
"proxy should see at least one active connection"
57+
);
58+
59+
// Blackhole immediately BEFORE subscribing so the subscribe message
60+
// (and the server's response) never make it across.
61+
proxy.blackhole();
62+
63+
// Subscribe will eventually time out or hang because nothing comes back.
64+
// We wrap it in a timeout so the test can continue.
65+
let sub_result = timeout(
66+
Duration::from_secs(3),
67+
client.subscribe_with_config(SubscriptionConfig::new(
68+
format!("blackhole-sub-{}", suffix),
69+
format!("SELECT id, value FROM {}", table),
70+
)),
71+
)
72+
.await;
73+
74+
// Whether subscribe itself succeeded (buffered locally) or timed out,
75+
// we need to let the client recover.
76+
let mut sub = match sub_result {
77+
Ok(Ok(s)) => s,
78+
Ok(Err(e)) => {
79+
// subscribe failed — restore traffic and try again.
80+
proxy.restore_traffic();
81+
sleep(Duration::from_millis(500)).await;
82+
83+
for _ in 0..80 {
84+
if client.is_connected().await {
85+
break;
86+
}
87+
sleep(Duration::from_millis(100)).await;
88+
}
89+
90+
client
91+
.subscribe_with_config(SubscriptionConfig::new(
92+
format!("blackhole-sub-{}", suffix),
93+
format!("SELECT id, value FROM {}", table),
94+
))
95+
.await
96+
.unwrap_or_else(|_| panic!("subscribe after recovery should succeed: {}", e))
97+
},
98+
Err(_timeout) => {
99+
// subscribe hung — restore traffic so the client can reconnect
100+
// and the subscription can complete via re-subscribe.
101+
proxy.restore_traffic();
102+
103+
for _ in 0..80 {
104+
if client.is_connected().await {
105+
break;
106+
}
107+
sleep(Duration::from_millis(100)).await;
108+
}
109+
110+
client
111+
.subscribe_with_config(SubscriptionConfig::new(
112+
format!("blackhole-sub-{}", suffix),
113+
format!("SELECT id, value FROM {}", table),
114+
))
115+
.await
116+
.expect("subscribe after timeout recovery should succeed")
117+
},
118+
};
119+
120+
// Insert a row after recovery.
121+
writer
122+
.execute_query(
123+
&format!(
124+
"INSERT INTO {} (id, value) VALUES ('post-blackhole', 'live')",
125+
table
126+
),
127+
None,
128+
None,
129+
None,
130+
)
131+
.await
132+
.expect("insert post-blackhole row");
133+
134+
// Collect events — we should see the seed rows AND the post-blackhole row.
135+
let mut seen_ids = Vec::<String>::new();
136+
let mut seq = None;
137+
for _ in 0..60 {
138+
if seen_ids.iter().any(|id| id == "post-blackhole")
139+
&& (0..5).all(|i| seen_ids.iter().any(|id| id == &format!("seed-{}", i)))
140+
{
141+
break;
142+
}
143+
match timeout(Duration::from_millis(2000), sub.next()).await {
144+
Ok(Some(Ok(ev))) => {
145+
collect_ids_and_track_seq(
146+
&ev,
147+
&mut seen_ids,
148+
&mut seq,
149+
None,
150+
"blackhole-subscribe recovery",
151+
);
152+
},
153+
Ok(Some(Err(e))) => {
154+
// Subscription errors are acceptable during recovery.
155+
eprintln!("subscription error (may be transient): {}", e);
156+
},
157+
Ok(None) => break,
158+
Err(_) => {},
159+
}
160+
}
161+
162+
assert!(
163+
(0..5).all(|i| seen_ids.iter().any(|id| id == &format!("seed-{}", i))),
164+
"all seed rows should arrive after blackhole recovery; got: {:?}",
165+
seen_ids
166+
);
167+
assert!(
168+
seen_ids.iter().any(|id| id == "post-blackhole"),
169+
"post-blackhole row should arrive after recovery"
170+
);
171+
172+
sub.close().await.ok();
173+
client.disconnect().await;
174+
proxy.shutdown().await;
175+
})
176+
.await;
177+
178+
assert!(result.is_ok(), "blackhole during subscribe test timed out");
179+
}

0 commit comments

Comments
 (0)