Skip to content

Commit 575bcd9

Browse files
committed
feat(wind-core): add generic Dispatcher with Router and OutboundAction traits
1 parent 2ad24e2 commit 575bcd9

27 files changed

Lines changed: 569 additions & 216 deletions

.gitattributes

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[*]
2+
end_of_line = lf
3+
insert_final_newline = true

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/wind-core/src/dispatcher.rs

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
//! Generic dispatcher: sits between inbound and outbound.
2+
//!
3+
//! The dispatcher receives every inbound connection from an [`InboundCallback`]
4+
//! implementation, evaluates routing rules via a user-supplied [`Router`], and
5+
//! hands the connection off to the matching [`OutboundAction`] handler.
6+
//!
7+
//! # Design
8+
//!
9+
//! * [`Router`] – an **async** trait that inspects the destination and returns a
10+
//! [`RouteAction`]. Implementations live in the application crate (e.g.
11+
//! `tuic-server`) where ACL rules and outbound configs are known.
12+
//! * [`OutboundAction`] – an **object-safe** trait representing a concrete
13+
//! outbound handler (direct, socks5, …). Handlers are keyed by name string.
14+
//! * [`Dispatcher`] – wraps a router and a map of named handlers, and
15+
//! implements [`InboundCallback`] so it can be passed directly to
16+
//! `inbound.listen()`.
17+
18+
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
19+
20+
use crate::{InboundCallback, tcp::AbstractTcpStream, types::TargetAddr, udp::UdpStream};
21+
22+
// ============================================================================
23+
// Public types
24+
// ============================================================================
25+
26+
/// Boxed future alias used throughout this module.
27+
///
28+
/// Both `Send` and `Sync` are required so the future satisfies the
29+
/// `FutResult` alias used by `InboundCallback`.
30+
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
31+
32+
/// Decision returned by a [`Router`].
33+
#[derive(Debug, Clone)]
34+
pub enum RouteAction {
35+
/// Reject the connection (drop it with an optional reason).
36+
Reject(String),
37+
/// Forward to the named outbound handler.
38+
///
39+
/// The name must match a key previously registered via
40+
/// [`Dispatcher::add_handler`] (or `"default"` which is always tried as a
41+
/// fallback).
42+
Forward(String),
43+
}
44+
45+
// ============================================================================
46+
// Router trait
47+
// ============================================================================
48+
49+
/// Determines which outbound handler should serve a connection.
50+
///
51+
/// Implementations are free to perform DNS resolution, consult ACL tables, or
52+
/// apply any other policy. The trait is object-safe; all methods take
53+
/// `&self` and return a pinned boxed future.
54+
pub trait Router: Send + Sync + 'static {
55+
/// Classify a TCP or UDP connection.
56+
///
57+
/// * `target` – the destination address as reported by the inbound.
58+
/// * `is_tcp` – `true` for TCP streams, `false` for UDP streams.
59+
fn route<'a>(&'a self, target: &'a TargetAddr, is_tcp: bool) -> BoxFuture<'a, eyre::Result<RouteAction>>;
60+
}
61+
62+
// ============================================================================
63+
// OutboundAction trait
64+
// ============================================================================
65+
66+
/// Object-safe outbound handler.
67+
///
68+
/// Each concrete outbound strategy (direct connect, SOCKS5 proxy, …)
69+
/// implements this trait. The stream types are erased via trait objects so
70+
/// handlers can be stored in a `HashMap`.
71+
pub trait OutboundAction: Send + Sync + 'static {
72+
/// Handle an inbound TCP stream.
73+
///
74+
/// The stream is boxed and `'static` so it can be stored or sent across
75+
/// tasks. All concrete `AbstractTcpStream` implementations (owned
76+
/// `TcpStream`, `Socks5Stream<TcpStream>`, …) satisfy this bound.
77+
fn handle_tcp<'a>(
78+
&'a self,
79+
target: TargetAddr,
80+
stream: Box<dyn AbstractTcpStream + 'static>,
81+
) -> BoxFuture<'a, eyre::Result<()>>;
82+
83+
/// Handle an inbound UDP session.
84+
fn handle_udp<'a>(&'a self, stream: UdpStream) -> BoxFuture<'a, eyre::Result<()>>;
85+
}
86+
87+
// ============================================================================
88+
// Dispatcher
89+
// ============================================================================
90+
91+
/// Routes inbound connections to named outbound handlers.
92+
///
93+
/// # Construction
94+
///
95+
/// ```ignore
96+
/// let mut dispatcher = Dispatcher::new(my_router);
97+
/// dispatcher.add_handler("default", Arc::new(DirectOutbound::new()));
98+
/// dispatcher.add_handler("via_socks5", Arc::new(Socks5Outbound::new("127.0.0.1:1080")));
99+
/// ```
100+
///
101+
/// Then pass `dispatcher` (or `dispatcher.clone()`) to `inbound.listen()`.
102+
pub struct Dispatcher<R: Router> {
103+
router: Arc<R>,
104+
handlers: Arc<HashMap<String, Arc<dyn OutboundAction>>>,
105+
}
106+
107+
impl<R: Router> Dispatcher<R> {
108+
/// Create a new dispatcher with the given router and no handlers yet.
109+
pub fn new(router: R) -> Self {
110+
Self {
111+
router: Arc::new(router),
112+
handlers: Arc::new(HashMap::new()),
113+
}
114+
}
115+
116+
/// Register a named outbound handler.
117+
///
118+
/// Call this before passing the dispatcher to an inbound. The name
119+
/// `"default"` is used as the fallback when the router returns a name that
120+
/// is not otherwise registered.
121+
pub fn add_handler(&mut self, name: impl Into<String>, handler: Arc<dyn OutboundAction>) {
122+
Arc::make_mut(&mut self.handlers).insert(name.into(), handler);
123+
}
124+
125+
/// Look up a handler by name, falling back to `"default"` if the exact
126+
/// name is not registered.
127+
fn resolve_handler(&self, name: &str) -> Option<Arc<dyn OutboundAction>> {
128+
self.handlers.get(name).or_else(|| self.handlers.get("default")).cloned()
129+
}
130+
}
131+
132+
impl<R: Router> Clone for Dispatcher<R> {
133+
fn clone(&self) -> Self {
134+
Self {
135+
router: self.router.clone(),
136+
handlers: self.handlers.clone(),
137+
}
138+
}
139+
}
140+
141+
// ============================================================================
142+
// InboundCallback implementation
143+
// ============================================================================
144+
145+
impl<R: Router> InboundCallback for Dispatcher<R> {
146+
async fn handle_tcpstream(&self, target_addr: TargetAddr, stream: impl AbstractTcpStream + 'static) -> eyre::Result<()> {
147+
let action = self.router.route(&target_addr, true).await?;
148+
149+
match action {
150+
RouteAction::Reject(reason) => {
151+
tracing::debug!("[dispatcher] TCP {} → reject: {}", target_addr, reason);
152+
return Err(eyre::eyre!("connection rejected: {}", reason));
153+
}
154+
RouteAction::Forward(name) => {
155+
tracing::debug!("[dispatcher] TCP {} → outbound '{}'", target_addr, name);
156+
157+
let handler = self
158+
.resolve_handler(&name)
159+
.ok_or_else(|| eyre::eyre!("no outbound handler registered for '{}' (and no 'default')", name))?;
160+
161+
// Erase the concrete stream type into a Box<dyn AbstractTcpStream>.
162+
handler.handle_tcp(target_addr, Box::new(stream)).await
163+
}
164+
}
165+
}
166+
167+
async fn handle_udpstream(&self, udp_stream: UdpStream) -> eyre::Result<()> {
168+
// For UDP we peek at the target from the stream's perspective.
169+
// Because UdpStream is a channel pair, we cannot peek without consuming
170+
// a packet. Instead we wrap the stream in a small shim that intercepts
171+
// the first packet, classifies it, then replays it into the handler.
172+
//
173+
// For simplicity we classify using a sentinel "unknown" address when
174+
// no target is readily available from the stream struct itself. Real
175+
// per-packet routing happens inside the OutboundAction handler.
176+
//
177+
// If your Router needs per-packet classification, implement it inside
178+
// your OutboundAction::handle_udp instead.
179+
//
180+
// Here we use a dummy TargetAddr for initial routing (e.g. the handler
181+
// selection stage). This works well for most use cases where all UDP is
182+
// routed to the same outbound, or the OutboundAction handles per-packet
183+
// routing internally.
184+
//
185+
// For finer-grained control, use a custom Router that inspects a known
186+
// session target recorded elsewhere (e.g. from the TUIC header).
187+
let sentinel = TargetAddr::IPv4(std::net::Ipv4Addr::UNSPECIFIED, 0);
188+
let action = self.router.route(&sentinel, false).await?;
189+
190+
match action {
191+
RouteAction::Reject(reason) => {
192+
tracing::debug!("[dispatcher] UDP session → reject: {}", reason);
193+
Err(eyre::eyre!("UDP session rejected: {}", reason))
194+
}
195+
RouteAction::Forward(name) => {
196+
tracing::debug!("[dispatcher] UDP session → outbound '{}'", name);
197+
198+
let handler = self
199+
.resolve_handler(&name)
200+
.ok_or_else(|| eyre::eyre!("no outbound handler registered for '{}' (and no 'default')", name))?;
201+
202+
handler.handle_udp(udp_stream).await
203+
}
204+
}
205+
}
206+
}

crates/wind-core/src/inbound.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ pub trait AbstractInbound {
88
}
99

1010
pub trait InboundCallback: Send + Sync + Clone + 'static {
11-
fn handle_tcpstream(&self, target_addr: TargetAddr, stream: impl AbstractTcpStream) -> impl FutResult<()>;
11+
fn handle_tcpstream(&self, target_addr: TargetAddr, stream: impl AbstractTcpStream + 'static) -> impl FutResult<()>;
1212
fn handle_udpstream(&self, udp_stream: UdpStream) -> impl FutResult<()>;
1313
}

crates/wind-core/src/interface.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use serde::{Deserialize, Serialize};
44

55
#[derive(Serialize, Deserialize, Debug, Clone)]
66
pub struct Iface {
7-
pub name: String,
8-
pub ipv4: Option<Ipv4Addr>,
9-
pub ipv6: Option<Ipv6Addr>,
7+
pub name: String,
8+
pub ipv4: Option<Ipv4Addr>,
9+
pub ipv6: Option<Ipv6Addr>,
1010
pub index: u32,
1111
}
1212

crates/wind-core/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
#![feature(type_alias_impl_trait)]
33
#![feature(trait_alias)]
44

5+
pub mod dispatcher;
56
pub mod inbound;
67
mod interface;
78
pub mod io;
89
mod outbound;
910
pub mod types;
1011

12+
pub use dispatcher::{Dispatcher, OutboundAction, RouteAction, Router};
1113
pub use inbound::*;
1214
pub use interface::*;
1315
pub use outbound::*;
@@ -17,6 +19,9 @@ pub mod log;
1719

1820
pub mod tcp;
1921
pub mod udp;
22+
pub mod utils;
23+
24+
pub use utils::{StackPrefer, is_private_ip};
2025

2126
#[cfg(test)]
2227
mod udp_tests;

crates/wind-core/src/types.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ impl<'de> Deserialize<'de> for TargetAddr {
101101
}
102102
}
103103

104-
105104
#[cfg(test)]
106105
mod tests {
107106
use super::*;

crates/wind-core/src/udp.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use crate::types::TargetAddr;
21
use bytes::Bytes;
32
use tokio::sync::mpsc;
43

4+
use crate::types::TargetAddr;
5+
56
#[derive(Debug, Clone)]
67
pub struct UdpPacket {
78
pub source: Option<TargetAddr>,

crates/wind-core/src/udp_tests.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ mod tests {
1717
data
1818
}
1919

20-
2120
#[test]
2221
fn basic() {
2322
let data_len = 1024;
@@ -36,11 +35,11 @@ mod tests {
3635
&send.into(),
3736
&recv.into(),
3837
Transmit {
39-
destination: dst_addr,
40-
ecn: None,
41-
contents: &test_data,
38+
destination: dst_addr,
39+
ecn: None,
40+
contents: &test_data,
4241
segment_size: None,
43-
src_ip: None,
42+
src_ip: None,
4443
},
4544
);
4645
}
@@ -62,11 +61,11 @@ mod tests {
6261
&send.into(),
6362
&recv.into(),
6463
Transmit {
65-
destination: dst_addr,
66-
ecn: None,
67-
contents: &msg,
64+
destination: dst_addr,
65+
ecn: None,
66+
contents: &msg,
6867
segment_size: Some(SEGMENT_SIZE),
69-
src_ip: None,
68+
src_ip: None,
7069
},
7170
);
7271
}
@@ -122,11 +121,11 @@ mod tests {
122121
&send,
123122
&recv,
124123
Transmit {
125-
destination: recv.local_addr().unwrap().as_socket().unwrap(),
126-
ecn: None,
127-
contents: b"hello",
124+
destination: recv.local_addr().unwrap().as_socket().unwrap(),
125+
ecn: None,
126+
contents: b"hello",
128127
segment_size: None,
129-
src_ip: None,
128+
src_ip: None,
130129
},
131130
);
132131
}

0 commit comments

Comments
 (0)