diff --git a/crates/openshell-core/src/sandbox_env.rs b/crates/openshell-core/src/sandbox_env.rs index d345762ca..755ec0c72 100644 --- a/crates/openshell-core/src/sandbox_env.rs +++ b/crates/openshell-core/src/sandbox_env.rs @@ -26,6 +26,12 @@ pub const LOG_LEVEL: &str = "OPENSHELL_LOG_LEVEL"; /// Shell command to run inside the sandbox. pub const SANDBOX_COMMAND: &str = "OPENSHELL_SANDBOX_COMMAND"; +/// Sandbox-local loopback HTTP proxy URL managed by the supervisor. +/// +/// This is distinct from `HTTP_PROXY`/`HTTPS_PROXY`, which continue to point +/// at the gateway-side proxy address for ordinary proxy-aware clients. +pub const LOOPBACK_PROXY_URL: &str = "OPENSHELL_LOOPBACK_PROXY_URL"; + /// Path to the CA certificate for mTLS communication with the gateway. pub const TLS_CA: &str = "OPENSHELL_TLS_CA"; diff --git a/crates/openshell-sandbox/src/child_env.rs b/crates/openshell-sandbox/src/child_env.rs index 32eecbee3..43e60f2b2 100644 --- a/crates/openshell-sandbox/src/child_env.rs +++ b/crates/openshell-sandbox/src/child_env.rs @@ -21,6 +21,13 @@ pub fn proxy_env_vars(proxy_url: &str) -> [(&'static str, String); 9] { ] } +pub fn loopback_proxy_env_vars(proxy_url: &str) -> [(&'static str, String); 1] { + [( + openshell_core::sandbox_env::LOOPBACK_PROXY_URL, + proxy_url.to_owned(), + )] +} + pub fn tls_env_vars( ca_cert_path: &Path, combined_bundle_path: &Path, @@ -65,6 +72,27 @@ mod tests { assert!(stdout.contains("no_proxy=127.0.0.1,localhost,::1")); } + #[test] + fn apply_loopback_proxy_env_exposes_managed_url_without_changing_proxy_vars() { + let mut cmd = Command::new("/usr/bin/env"); + cmd.stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::null()); + + for (key, value) in proxy_env_vars("http://10.200.0.1:3128") { + cmd.env(key, value); + } + for (key, value) in loopback_proxy_env_vars("http://127.0.0.1:3128") { + cmd.env(key, value); + } + + let output = cmd.output().expect("spawn env"); + let stdout = String::from_utf8(output.stdout).expect("utf8"); + + assert!(stdout.contains("HTTP_PROXY=http://10.200.0.1:3128")); + assert!(stdout.contains("OPENSHELL_LOOPBACK_PROXY_URL=http://127.0.0.1:3128")); + } + #[test] fn apply_tls_env_sets_node_and_bundle_paths() { let mut cmd = Command::new("/usr/bin/env"); diff --git a/crates/openshell-sandbox/src/lib.rs b/crates/openshell-sandbox/src/lib.rs index ded56ce9e..66561e83b 100644 --- a/crates/openshell-sandbox/src/lib.rs +++ b/crates/openshell-sandbox/src/lib.rs @@ -174,6 +174,8 @@ use crate::l7::tls::{ }; use crate::opa::OpaEngine; use crate::policy::{NetworkMode, NetworkPolicy, ProxyPolicy, SandboxPolicy}; +#[cfg(target_os = "linux")] +use crate::proxy::LoopbackProxyHandle; use crate::proxy::ProxyHandle; #[cfg(target_os = "linux")] use crate::sandbox::linux::netns::NetworkNamespace; @@ -571,8 +573,10 @@ pub async fn run_sandbox( // the entrypoint process's /proc/net/tcp for identity binding. let entrypoint_pid = Arc::new(AtomicU32::new(0)); - let (_proxy, denial_rx, bypass_denial_tx) = if matches!(policy.network.mode, NetworkMode::Proxy) - { + let (_proxy, loopback_proxy, denial_rx, bypass_denial_tx) = if matches!( + policy.network.mode, + NetworkMode::Proxy + ) { let proxy_policy = policy.network.proxy.as_ref().ok_or_else(|| { miette::miette!("Network mode is set to proxy but no proxy configuration was provided") })?; @@ -617,21 +621,66 @@ pub async fn run_sandbox( let proxy_handle = ProxyHandle::start_with_bind_addr( proxy_policy, bind_addr, - engine, - cache, + engine.clone(), + cache.clone(), entrypoint_pid.clone(), - tls_state, - inference_ctx, + tls_state.clone(), + inference_ctx.clone(), Some(provider_credentials.clone()), Some(policy_local_ctx.clone()), - denial_tx, + denial_tx.clone(), ) .await?; - (Some(proxy_handle), denial_rx, bypass_denial_tx) + + #[cfg(target_os = "linux")] + let loopback_proxy_handle = if let (Some(ns), Some(_upstream_addr)) = + (netns.as_ref(), bind_addr) + { + let Some(netns_fd) = ns.ns_fd() else { + return Err(miette::miette!( + "Managed loopback proxy requires a sandbox network namespace file descriptor" + )); + }; + let port = proxy_policy.http_addr.map_or(3128, |addr| addr.port()); + let listen_addr: SocketAddr = ([127, 0, 0, 1], port).into(); + Some(LoopbackProxyHandle::start_in_netns( + netns_fd, + listen_addr, + engine, + cache, + entrypoint_pid.clone(), + tls_state, + inference_ctx, + Some(provider_credentials.clone()), + Some(policy_local_ctx.clone()), + denial_tx, + )?) + } else { + None + }; + + #[cfg(not(target_os = "linux"))] + let loopback_proxy_handle: Option<()> = None; + + ( + Some(proxy_handle), + loopback_proxy_handle, + denial_rx, + bypass_denial_tx, + ) } else { - (None, None, None) + (None, None, None, None) }; + #[cfg(target_os = "linux")] + let loopback_proxy_url = loopback_proxy.as_ref().map(LoopbackProxyHandle::proxy_url); + + #[cfg(not(target_os = "linux"))] + let _ = &loopback_proxy; + + #[cfg(not(target_os = "linux"))] + let loopback_proxy_url: Option = None; + // Spawn bypass detection monitor (Linux only, proxy mode only). // Reads /dev/kmsg for nftables log entries and emits structured // tracing events for direct connection attempts that bypass the proxy. @@ -758,6 +807,7 @@ pub async fn run_sandbox( let policy_clone = policy.clone(); let workdir_clone = workdir.clone(); let proxy_url = ssh_proxy_url; + let loopback_proxy_url = loopback_proxy_url.clone(); let netns_fd = ssh_netns_fd; let ca_paths = ca_file_paths.clone(); let provider_credentials_clone = provider_credentials.clone(); @@ -772,6 +822,7 @@ pub async fn run_sandbox( workdir_clone, netns_fd, proxy_url, + loopback_proxy_url, ca_paths, provider_credentials_clone, ) @@ -838,6 +889,7 @@ pub async fn run_sandbox( interactive, &policy, netns.as_ref(), + loopback_proxy_url.as_deref(), ca_file_paths.as_ref(), &provider_env, )?; @@ -849,6 +901,7 @@ pub async fn run_sandbox( workdir.as_deref(), interactive, &policy, + loopback_proxy_url.as_deref(), ca_file_paths.as_ref(), &provider_env, )?; diff --git a/crates/openshell-sandbox/src/process.rs b/crates/openshell-sandbox/src/process.rs index 3d2f6d576..991556d7c 100644 --- a/crates/openshell-sandbox/src/process.rs +++ b/crates/openshell-sandbox/src/process.rs @@ -94,6 +94,7 @@ impl ProcessHandle { interactive: bool, policy: &SandboxPolicy, netns: Option<&NetworkNamespace>, + loopback_proxy_url: Option<&str>, ca_paths: Option<&(PathBuf, PathBuf)>, provider_env: &HashMap, ) -> Result { @@ -104,6 +105,7 @@ impl ProcessHandle { interactive, policy, netns.and_then(NetworkNamespace::ns_fd), + loopback_proxy_url, ca_paths, provider_env, ) @@ -115,12 +117,14 @@ impl ProcessHandle { /// /// Returns an error if the process fails to start. #[cfg(not(target_os = "linux"))] + #[allow(clippy::too_many_arguments)] pub fn spawn( program: &str, args: &[String], workdir: Option<&str>, interactive: bool, policy: &SandboxPolicy, + loopback_proxy_url: Option<&str>, ca_paths: Option<&(PathBuf, PathBuf)>, provider_env: &HashMap, ) -> Result { @@ -130,6 +134,7 @@ impl ProcessHandle { workdir, interactive, policy, + loopback_proxy_url, ca_paths, provider_env, ) @@ -144,6 +149,7 @@ impl ProcessHandle { interactive: bool, policy: &SandboxPolicy, netns_fd: Option, + loopback_proxy_url: Option<&str>, ca_paths: Option<&(PathBuf, PathBuf)>, provider_env: &HashMap, ) -> Result { @@ -185,6 +191,12 @@ impl ProcessHandle { } } + if let Some(url) = loopback_proxy_url { + for (key, value) in child_env::loopback_proxy_env_vars(url) { + cmd.env(key, value); + } + } + // Set TLS trust store env vars so sandbox processes trust the ephemeral CA if let Some((ca_cert_path, combined_bundle_path)) = ca_paths { for (key, value) in child_env::tls_env_vars(ca_cert_path, combined_bundle_path) { @@ -264,12 +276,14 @@ impl ProcessHandle { } #[cfg(not(target_os = "linux"))] + #[allow(clippy::too_many_arguments)] fn spawn_impl( program: &str, args: &[String], workdir: Option<&str>, interactive: bool, policy: &SandboxPolicy, + loopback_proxy_url: Option<&str>, ca_paths: Option<&(PathBuf, PathBuf)>, provider_env: &HashMap, ) -> Result { @@ -301,6 +315,12 @@ impl ProcessHandle { } } + if let Some(url) = loopback_proxy_url { + for (key, value) in child_env::loopback_proxy_env_vars(url) { + cmd.env(key, value); + } + } + // Set TLS trust store env vars so sandbox processes trust the ephemeral CA if let Some((ca_cert_path, combined_bundle_path)) = ca_paths { for (key, value) in child_env::tls_env_vars(ca_cert_path, combined_bundle_path) { diff --git a/crates/openshell-sandbox/src/proxy.rs b/crates/openshell-sandbox/src/proxy.rs index 037ecfc78..f2de73962 100644 --- a/crates/openshell-sandbox/src/proxy.rs +++ b/crates/openshell-sandbox/src/proxy.rs @@ -18,6 +18,8 @@ use openshell_ocsf::{ NetworkActivityBuilder, Process, SeverityId, StatusId, Url as OcsfUrl, ocsf_emit, }; use std::net::{IpAddr, SocketAddr}; +#[cfg(target_os = "linux")] +use std::os::unix::io::RawFd; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; @@ -26,7 +28,7 @@ use tokio::io::{ }; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; -use tokio::task::JoinHandle; +use tokio::task::JoinHandle as TokioJoinHandle; use tracing::{debug, warn}; const MAX_HEADER_BYTES: usize = 8192; @@ -166,7 +168,16 @@ impl InferenceContext { pub struct ProxyHandle { #[allow(dead_code)] http_addr: Option, - join: JoinHandle<()>, + join: TokioJoinHandle<()>, +} + +#[cfg(target_os = "linux")] +#[derive(Debug)] +pub struct LoopbackProxyHandle { + http_addr: SocketAddr, + shutdown: Option>, + sandbox_acceptor_thread: Option>, + supervisor_dispatcher_join: TokioJoinHandle<()>, } impl ProxyHandle { @@ -230,41 +241,20 @@ impl ProxyHandle { loop { match listener.accept().await { Ok((stream, _addr)) => { - let opa = opa_engine.clone(); - let cache = identity_cache.clone(); - let spid = entrypoint_pid.clone(); - let tls = tls_state.clone(); - let inf = inference_ctx.clone(); - let policy_local = policy_local_ctx.clone(); - let gw = trusted_host_gateway.clone(); - let resolver = provider_credentials - .as_ref() - .and_then(ProviderCredentialState::resolver); - let dtx = denial_tx.clone(); - tokio::spawn(async move { - if let Err(err) = handle_tcp_connection( - stream, - opa, - cache, - spid, - tls, - inf, - policy_local, - gw, - resolver, - dtx, - ) - .await - { - let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) - .activity(ActivityId::Fail) - .severity(SeverityId::Low) - .status(StatusId::Failure) - .message(format!("Proxy connection error: {err}")) - .build(); - ocsf_emit!(event); - } - }); + spawn_proxy_connection( + stream, + opa_engine.clone(), + identity_cache.clone(), + entrypoint_pid.clone(), + tls_state.clone(), + inference_ctx.clone(), + policy_local_ctx.clone(), + trusted_host_gateway.clone(), + provider_credentials + .as_ref() + .and_then(ProviderCredentialState::resolver), + denial_tx.clone(), + ); } Err(err) => { let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) @@ -292,12 +282,350 @@ impl ProxyHandle { } } +#[allow(clippy::too_many_arguments)] +fn spawn_proxy_connection( + stream: TcpStream, + opa_engine: Arc, + identity_cache: Arc, + entrypoint_pid: Arc, + tls_state: Option>, + inference_ctx: Option>, + policy_local_ctx: Option>, + trusted_host_gateway: Arc>, + resolver: Option>, + denial_tx: Option>, +) { + tokio::spawn(async move { + if let Err(err) = handle_tcp_connection( + stream, + opa_engine, + identity_cache, + entrypoint_pid, + tls_state, + inference_ctx, + policy_local_ctx, + trusted_host_gateway, + resolver, + denial_tx, + ) + .await + { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Low) + .status(StatusId::Failure) + .message(format!("Proxy connection error: {err}")) + .build(); + ocsf_emit!(event); + } + }); +} + +#[cfg(target_os = "linux")] +impl LoopbackProxyHandle { + /// Start a managed loopback proxy for clients that can only be configured + /// with a loopback proxy URL. + /// + /// This is intentionally split across two network namespaces: + /// - the sandbox-netns acceptor thread enters `setns()`, binds loopback, + /// accepts client sockets, and sends accepted sockets out; + /// - the supervisor dispatcher task stays in the supervisor namespace and + /// invokes the normal proxy path for policy, DNS, SSRF checks, TLS/L7, + /// WebSocket rewrite, credential resolution, and upstream connect. + #[allow(clippy::too_many_arguments)] + pub(crate) fn start_in_netns( + netns_fd: RawFd, + listen_addr: SocketAddr, + opa_engine: Arc, + identity_cache: Arc, + entrypoint_pid: Arc, + tls_state: Option>, + inference_ctx: Option>, + provider_credentials: Option, + policy_local_ctx: Option>, + denial_tx: Option>, + ) -> Result { + if !listen_addr.ip().is_loopback() { + return Err(miette::miette!( + "Loopback proxy listen address must be loopback-only: {listen_addr}" + )); + } + + let (ready_tx, ready_rx) = std::sync::mpsc::channel(); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + let (accepted_tx, accepted_rx) = mpsc::unbounded_channel(); + + let supervisor_dispatcher_join = spawn_loopback_supervisor_dispatcher_task( + accepted_rx, + opa_engine, + identity_cache, + entrypoint_pid, + tls_state, + inference_ctx, + provider_credentials, + policy_local_ctx, + denial_tx, + ); + + let thread = match std::thread::Builder::new() + .name("openshell-loopback-netns-acceptor".to_string()) + .spawn(move || { + run_loopback_sandbox_netns_acceptor_thread( + netns_fd, + listen_addr, + ready_tx, + shutdown_rx, + accepted_tx, + ); + }) { + Ok(thread) => thread, + Err(err) => { + supervisor_dispatcher_join.abort(); + return Err(err).into_diagnostic(); + } + }; + + // Wait for an explicit ready/error signal instead of timing out. The + // thread borrows the sandbox netns fd for setns(), so returning early + // could orphan a thread that later observes a closed or reused fd. + let http_addr = match ready_rx.recv() { + Ok(Ok(addr)) => addr, + Ok(Err(message)) => { + let _ = thread.join(); + supervisor_dispatcher_join.abort(); + return Err(miette::miette!("{message}")); + } + Err(std::sync::mpsc::RecvError) => { + supervisor_dispatcher_join.abort(); + return Err(miette::miette!( + "Loopback proxy thread exited before startup on {listen_addr}" + )); + } + }; + + Ok(Self { + http_addr, + shutdown: Some(shutdown_tx), + sandbox_acceptor_thread: Some(thread), + supervisor_dispatcher_join, + }) + } + + pub fn proxy_url(&self) -> String { + format!("http://{}", self.http_addr) + } +} + +#[cfg(target_os = "linux")] +#[allow(clippy::too_many_arguments)] +fn spawn_loopback_supervisor_dispatcher_task( + mut accepted_rx: mpsc::UnboundedReceiver, + opa_engine: Arc, + identity_cache: Arc, + entrypoint_pid: Arc, + tls_state: Option>, + inference_ctx: Option>, + provider_credentials: Option, + policy_local_ctx: Option>, + denial_tx: Option>, +) -> TokioJoinHandle<()> { + tokio::spawn(async move { + // Detect the trusted host gateway from the supervisor context, matching + // the primary gateway proxy. + // + // Invariant: this task is the only half allowed to run policy, DNS, + // SSRF checks, TLS/L7 handling, WebSocket rewrite, credential + // resolution, or upstream dialing for managed loopback traffic. + let trusted_host_gateway: Arc> = Arc::new(detect_trusted_host_gateway()); + while let Some(stream) = accepted_rx.recv().await { + if let Err(err) = stream.set_nonblocking(true) { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Low) + .status(StatusId::Failure) + .message(format!( + "Loopback proxy failed to set stream nonblocking: {err}" + )) + .build(); + ocsf_emit!(event); + continue; + } + + match TcpStream::from_std(stream) { + Ok(stream) => { + spawn_proxy_connection( + stream, + opa_engine.clone(), + identity_cache.clone(), + entrypoint_pid.clone(), + tls_state.clone(), + inference_ctx.clone(), + policy_local_ctx.clone(), + trusted_host_gateway.clone(), + provider_credentials + .as_ref() + .and_then(ProviderCredentialState::resolver), + denial_tx.clone(), + ); + } + Err(err) => { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Low) + .status(StatusId::Failure) + .message(format!( + "Loopback proxy failed to register accepted stream: {err}" + )) + .build(); + ocsf_emit!(event); + } + } + } + }) +} + +#[cfg(target_os = "linux")] +#[allow(clippy::too_many_arguments)] +fn run_loopback_sandbox_netns_acceptor_thread( + netns_fd: RawFd, + listen_addr: SocketAddr, + ready_tx: std::sync::mpsc::Sender>, + shutdown_rx: tokio::sync::oneshot::Receiver<()>, + accepted_tx: mpsc::UnboundedSender, +) { + let startup_failure_tx = ready_tx.clone(); + let result = (|| -> std::result::Result<(), String> { + // Invariant: after setns(), this dedicated thread only binds loopback, + // accepts sandbox client sockets, and hands those accepted sockets back + // to the supervisor dispatcher. It must not perform DNS, policy checks, + // credential rewrite, or upstream dialing from the sandbox netns. + // SAFETY: setns is called on a dedicated OS thread that only serves + // this listener and exits with the sandbox supervisor. + #[allow(unsafe_code)] + let rc = unsafe { libc::setns(netns_fd, libc::CLONE_NEWNET) }; + if rc != 0 { + return Err(format!( + "Loopback proxy failed to enter sandbox network namespace: {}", + std::io::Error::last_os_error() + )); + } + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|err| format!("Loopback proxy runtime initialization failed: {err}"))?; + + let std_listener = std::net::TcpListener::bind(listen_addr) + .map_err(|err| format!("Loopback proxy failed to bind {listen_addr}: {err}"))?; + std_listener + .set_nonblocking(true) + .map_err(|err| format!("Loopback proxy failed to set nonblocking mode: {err}"))?; + let local_addr = std_listener + .local_addr() + .map_err(|err| format!("Loopback proxy failed to read local address: {err}"))?; + + runtime.block_on(async move { + let listener = match TcpListener::from_std(std_listener) { + Ok(listener) => listener, + Err(err) => { + let _ = ready_tx.send(Err(format!( + "Loopback proxy failed to register listener {local_addr}: {err}" + ))); + return; + } + }; + + let _ = ready_tx.send(Ok(local_addr)); + + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Listen) + .severity(SeverityId::Informational) + .status(StatusId::Success) + .dst_endpoint(Endpoint::from_ip(local_addr.ip(), local_addr.port())) + .message(format!( + "Loopback proxy listening on {local_addr} inside sandbox network namespace" + )) + .build(); + ocsf_emit!(event); + + let mut shutdown_rx = Box::pin(shutdown_rx); + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + } + accepted = listener.accept() => { + match accepted { + Ok((stream, _addr)) => { + match stream.into_std() { + Ok(stream) => { + if accepted_tx.send(stream).is_err() { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Low) + .status(StatusId::Failure) + .message("Loopback proxy dispatcher exited".to_string()) + .build(); + ocsf_emit!(event); + break; + } + } + Err(err) => { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Low) + .status(StatusId::Failure) + .message(format!( + "Loopback proxy failed to detach accepted stream: {err}" + )) + .build(); + ocsf_emit!(event); + } + } + } + Err(err) => { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Low) + .status(StatusId::Failure) + .message(format!("Loopback proxy accept error: {err}")) + .build(); + ocsf_emit!(event); + break; + } + } + } + } + } + }); + + Ok(()) + })(); + + if let Err(message) = result { + let _ = startup_failure_tx.send(Err(message)); + } +} + impl Drop for ProxyHandle { fn drop(&mut self) { self.join.abort(); } } +#[cfg(target_os = "linux")] +impl Drop for LoopbackProxyHandle { + fn drop(&mut self) { + if let Some(shutdown) = self.shutdown.take() { + let _ = shutdown.send(()); + } + if let Some(thread) = self.sandbox_acceptor_thread.take() { + let _ = thread.join(); + } + self.supervisor_dispatcher_join.abort(); + } +} + /// Emit a denial event to the aggregator channel (if configured). /// Used by `handle_tcp_connection` which owns `Option`. fn emit_denial( diff --git a/crates/openshell-sandbox/src/ssh.rs b/crates/openshell-sandbox/src/ssh.rs index c92180748..564675af2 100644 --- a/crates/openshell-sandbox/src/ssh.rs +++ b/crates/openshell-sandbox/src/ssh.rs @@ -105,6 +105,7 @@ pub async fn run_ssh_server( workdir: Option, netns_fd: Option, proxy_url: Option, + loopback_proxy_url: Option, ca_file_paths: Option<(PathBuf, PathBuf)>, provider_credentials: ProviderCredentialState, ) -> Result<()> { @@ -129,6 +130,7 @@ pub async fn run_ssh_server( let policy = policy.clone(); let workdir = workdir.clone(); let proxy_url = proxy_url.clone(); + let loopback_proxy_url = loopback_proxy_url.clone(); let ca_paths = ca_paths.clone(); let provider_credentials = provider_credentials.clone(); @@ -140,6 +142,7 @@ pub async fn run_ssh_server( workdir, netns_fd, proxy_url, + loopback_proxy_url, ca_paths, provider_credentials, ) @@ -166,6 +169,7 @@ async fn handle_connection( workdir: Option, netns_fd: Option, proxy_url: Option, + loopback_proxy_url: Option, ca_file_paths: Option>, provider_credentials: ProviderCredentialState, ) -> Result<()> { @@ -188,6 +192,7 @@ async fn handle_connection( workdir, netns_fd, proxy_url, + loopback_proxy_url, ca_file_paths, provider_credentials, ); @@ -215,6 +220,7 @@ struct SshHandler { workdir: Option, netns_fd: Option, proxy_url: Option, + loopback_proxy_url: Option, ca_file_paths: Option>, provider_credentials: ProviderCredentialState, channels: HashMap, @@ -226,6 +232,7 @@ impl SshHandler { workdir: Option, netns_fd: Option, proxy_url: Option, + loopback_proxy_url: Option, ca_file_paths: Option>, provider_credentials: ProviderCredentialState, ) -> Self { @@ -234,6 +241,7 @@ impl SshHandler { workdir, netns_fd, proxy_url, + loopback_proxy_url, ca_file_paths, provider_credentials, channels: HashMap::new(), @@ -456,6 +464,7 @@ impl russh::server::Handler for SshHandler { channel, self.netns_fd, self.proxy_url.clone(), + self.loopback_proxy_url.clone(), self.ca_file_paths.clone(), &self.provider_credentials.snapshot().child_env, )?; @@ -551,6 +560,7 @@ impl SshHandler { channel, self.netns_fd, self.proxy_url.clone(), + self.loopback_proxy_url.clone(), self.ca_file_paths.clone(), &provider_snapshot.child_env, )?; @@ -568,6 +578,7 @@ impl SshHandler { channel, self.netns_fd, self.proxy_url.clone(), + self.loopback_proxy_url.clone(), self.ca_file_paths.clone(), &provider_snapshot.child_env, )?; @@ -668,12 +679,14 @@ fn session_user_and_home(policy: &SandboxPolicy) -> (String, String) { } } +#[allow(clippy::too_many_arguments)] fn apply_child_env( cmd: &mut Command, session_home: &str, session_user: &str, term: &str, proxy_url: Option<&str>, + loopback_proxy_url: Option<&str>, ca_file_paths: Option<&(PathBuf, PathBuf)>, provider_env: &HashMap, ) { @@ -693,6 +706,12 @@ fn apply_child_env( } } + if let Some(url) = loopback_proxy_url { + for (key, value) in child_env::loopback_proxy_env_vars(url) { + cmd.env(key, value); + } + } + if let Some((ca_cert_path, combined_bundle_path)) = ca_file_paths { for (key, value) in child_env::tls_env_vars(ca_cert_path, combined_bundle_path) { cmd.env(key, value); @@ -714,6 +733,7 @@ fn spawn_pty_shell( channel: ChannelId, netns_fd: Option, proxy_url: Option, + loopback_proxy_url: Option, ca_file_paths: Option>, provider_env: &HashMap, ) -> anyhow::Result<(std::fs::File, mpsc::Sender>)> { @@ -762,6 +782,7 @@ fn spawn_pty_shell( &session_user, term, proxy_url.as_deref(), + loopback_proxy_url.as_deref(), ca_file_paths.as_deref(), provider_env, ); @@ -877,6 +898,7 @@ fn spawn_pipe_exec( channel: ChannelId, netns_fd: Option, proxy_url: Option, + loopback_proxy_url: Option, ca_file_paths: Option>, provider_env: &HashMap, ) -> anyhow::Result>> { @@ -907,6 +929,7 @@ fn spawn_pipe_exec( &session_user, "dumb", proxy_url.as_deref(), + loopback_proxy_url.as_deref(), ca_file_paths.as_deref(), provider_env, ); diff --git a/e2e/rust/tests/loopback_proxy_netns.rs b/e2e/rust/tests/loopback_proxy_netns.rs new file mode 100644 index 000000000..fea2a239e --- /dev/null +++ b/e2e/rust/tests/loopback_proxy_netns.rs @@ -0,0 +1,250 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! E2E proof that the managed loopback proxy accepts inside the sandbox +//! network namespace but dispatches upstream dialing from the supervisor side. + +#![cfg(feature = "e2e-host-gateway")] + +use std::io::Write; + +use openshell_e2e::harness::sandbox::SandboxGuard; +use tempfile::NamedTempFile; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use tokio::task::JoinHandle; + +const TEST_HOST: &str = "host.openshell.internal"; + +struct HostServer { + port: u16, + task: JoinHandle<()>, +} + +impl HostServer { + async fn start() -> Result { + let listener = TcpListener::bind(("0.0.0.0", 0)) + .await + .map_err(|e| format!("bind host test server: {e}"))?; + let port = listener + .local_addr() + .map_err(|e| format!("read host test server address: {e}"))? + .port(); + let task = tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + tokio::spawn(async move { + let mut request = Vec::new(); + let mut buf = [0_u8; 1024]; + loop { + let Ok(read) = stream.read(&mut buf).await else { + return; + }; + if read == 0 { + return; + } + request.extend_from_slice(&buf[..read]); + if request.windows(4).any(|window| window == b"\r\n\r\n") { + break; + } + } + + let body = br#"{"message":"loopback-supervisor-dispatch-ok"}"#; + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body.len() + ); + if stream.write_all(response.as_bytes()).await.is_err() { + return; + } + let _ = stream.write_all(body).await; + let _ = stream.shutdown().await; + }); + } + }); + + Ok(Self { port, task }) + } +} + +impl Drop for HostServer { + fn drop(&mut self) { + self.task.abort(); + } +} + +fn write_policy(port: u16) -> Result { + let mut file = NamedTempFile::new().map_err(|e| format!("create temp policy file: {e}"))?; + let policy = format!( + r#"version: 1 + +filesystem_policy: + include_workdir: true + read_only: + - /usr + - /lib + - /proc + - /dev/urandom + - /app + - /etc + - /var/log + read_write: + - /sandbox + - /tmp + - /dev/null + +landlock: + compatibility: best_effort + +process: + run_as_user: sandbox + run_as_group: sandbox + +network_policies: + loopback_proxy_netns: + name: loopback_proxy_netns + endpoints: + - host: {TEST_HOST} + port: {port} + allowed_ips: + - "10.0.0.0/8" + - "172.0.0.0/8" + - "192.168.0.0/16" + - "fc00::/7" + binaries: + - path: /usr/bin/python* + - path: /usr/local/bin/python* + - path: /sandbox/.uv/python/*/bin/python* +"# + ); + file.write_all(policy.as_bytes()) + .map_err(|e| format!("write temp policy file: {e}"))?; + file.flush() + .map_err(|e| format!("flush temp policy file: {e}"))?; + Ok(file) +} + +fn netns_boundary_script(port: u16) -> String { + format!( + r#" +import json +import os +import socket +import urllib.parse + +HOST = {TEST_HOST:?} +PORT = {port} + +def recv_until(sock, marker): + data = b"" + while marker not in data: + chunk = sock.recv(4096) + if not chunk: + break + data += chunk + return data + +def read_response(sock): + response = recv_until(sock, b"\r\n\r\n") + headers, _, body = response.partition(b"\r\n\r\n") + content_length = 0 + for line in headers.split(b"\r\n")[1:]: + if line.lower().startswith(b"content-length:"): + content_length = int(line.split(b":", 1)[1].strip()) + break + while len(body) < content_length: + chunk = sock.recv(4096) + if not chunk: + break + body += chunk + return response.decode("iso-8859-1", "replace"), body.decode("utf-8", "replace") + +def direct_connect_result(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + try: + sock.connect((HOST, PORT)) + sock.sendall(f"GET /direct HTTP/1.1\r\nHost: {{HOST}}:{{PORT}}\r\nConnection: close\r\n\r\n".encode("ascii")) + response, body = read_response(sock) + return {{"result": "connected", "response": response.splitlines()[0] if response else "", "body": body}} + except ConnectionRefusedError as error: + return {{"result": "refused", "error": str(error)}} + except socket.timeout as error: + return {{"result": "timeout", "error": str(error)}} + except OSError as error: + return {{"result": "error", "errno": error.errno, "error": str(error)}} + finally: + sock.close() + +def loopback_connect_result(): + proxy_url = os.environ.get("OPENSHELL_LOOPBACK_PROXY_URL") + if not proxy_url: + return {{"result": "missing_proxy_url"}} + parsed = urllib.parse.urlparse(proxy_url) + if parsed.hostname not in ("127.0.0.1", "localhost", "::1"): + return {{"result": "non_loopback_proxy_url", "proxy_url": proxy_url}} + + target = f"{{HOST}}:{{PORT}}" + with socket.create_connection((parsed.hostname, parsed.port or 80), timeout=10) as sock: + sock.sendall(f"CONNECT {{target}} HTTP/1.1\r\nHost: {{target}}\r\n\r\n".encode("ascii")) + connect_response = recv_until(sock, b"\r\n\r\n").decode("iso-8859-1", "replace") + if not (connect_response.startswith("HTTP/1.1 200") or connect_response.startswith("HTTP/1.0 200")): + return {{"result": "connect_failed", "response": connect_response.splitlines()[0] if connect_response else ""}} + sock.sendall(f"GET /proxied HTTP/1.1\r\nHost: {{target}}\r\nConnection: close\r\n\r\n".encode("ascii")) + response, body = read_response(sock) + return {{"result": "ok", "response": response.splitlines()[0] if response else "", "body": body}} + +print(json.dumps({{ + "direct": direct_connect_result(), + "loopback": loopback_connect_result(), +}}, sort_keys=True), flush=True) +"# + ) +} + +#[tokio::test] +async fn loopback_proxy_connect_uses_supervisor_namespace_for_upstream_dial() { + let server = HostServer::start().await.expect("start host test server"); + let policy = write_policy(server.port).expect("write custom policy"); + let policy_path = policy + .path() + .to_str() + .expect("temp policy path should be utf-8") + .to_string(); + let script = netns_boundary_script(server.port); + + let guard = SandboxGuard::create(&["--policy", &policy_path, "--", "python3", "-c", &script]) + .await + .expect("sandbox create"); + + let output = guard + .create_output + .lines() + .find(|line| line.contains("\"direct\"") && line.contains("\"loopback\"")) + .unwrap_or_else(|| { + panic!( + "expected netns boundary JSON in output:\n{}", + guard.create_output + ) + }); + let parsed: serde_json::Value = serde_json::from_str(output.trim()) + .unwrap_or_else(|err| panic!("failed to parse JSON '{output}': {err}")); + + assert_eq!( + parsed["direct"]["result"], "refused", + "expected direct sandbox egress to be rejected before reaching host server:\n{}", + guard.create_output + ); + assert_eq!( + parsed["loopback"]["result"], "ok", + "expected CONNECT through OPENSHELL_LOOPBACK_PROXY_URL to reach host server:\n{}", + guard.create_output + ); + assert_eq!( + parsed["loopback"]["body"], r#"{"message":"loopback-supervisor-dispatch-ok"}"#, + "expected loopback proxy path to receive host server response:\n{}", + guard.create_output + ); +} diff --git a/e2e/rust/tests/no_proxy.rs b/e2e/rust/tests/no_proxy.rs index ced4d02d5..ebe9496ad 100644 --- a/e2e/rust/tests/no_proxy.rs +++ b/e2e/rust/tests/no_proxy.rs @@ -16,6 +16,7 @@ from http.server import BaseHTTPRequestHandler, HTTPServer expected_no_proxy = '127.0.0.1,localhost,::1' assert os.environ['HTTP_PROXY'].startswith('http://') assert os.environ['HTTPS_PROXY'].startswith('http://') +assert os.environ['OPENSHELL_LOOPBACK_PROXY_URL'].startswith('http://127.0.0.1:') assert os.environ['NO_PROXY'] == expected_no_proxy assert os.environ['no_proxy'] == expected_no_proxy @@ -37,6 +38,7 @@ try: with urllib.request.urlopen(f'http://127.0.0.1:{server.server_port}', timeout=10) as response: print(json.dumps({ 'no_proxy': os.environ['NO_PROXY'], + 'loopback_proxy': os.environ['OPENSHELL_LOOPBACK_PROXY_URL'].split(':')[:2], 'payload': json.loads(response.read().decode()), }), flush=True) finally: @@ -53,9 +55,11 @@ async fn sandbox_bypasses_proxy_for_localhost_http() { .expect("sandbox create with localhost proxy bypass check"); assert!( - guard.create_output.contains( - r#"{"no_proxy": "127.0.0.1,localhost,::1", "payload": {"message": "hello"}}"# - ), + guard + .create_output + .contains(r#""no_proxy": "127.0.0.1,localhost,::1""#) + && guard.create_output.contains(r#""loopback_proxy": ["http", "//127.0.0.1"]"#) + && guard.create_output.contains(r#""payload": {"message": "hello"}"#), "expected localhost HTTP request to bypass proxy and succeed:\n{}", guard.create_output ); diff --git a/e2e/rust/tests/websocket_conformance.rs b/e2e/rust/tests/websocket_conformance.rs index 65ba19aa1..091e0656a 100644 --- a/e2e/rust/tests/websocket_conformance.rs +++ b/e2e/rust/tests/websocket_conformance.rs @@ -364,13 +364,14 @@ def read_frame(sock): return first, payload def proxy_parts(): - names = ("HTTP_PROXY", "http_proxy", "HTTPS_PROXY", "https_proxy", "ALL_PROXY", "all_proxy") - proxy_url = next((os.environ.get(name) for name in names if os.environ.get(name)), None) + proxy_url = os.environ.get("OPENSHELL_LOOPBACK_PROXY_URL") if not proxy_url: - raise RuntimeError("proxy environment is not configured") + raise RuntimeError("managed loopback proxy URL is not configured") parsed = urllib.parse.urlparse(proxy_url) if not parsed.hostname: raise RuntimeError(f"invalid proxy URL: {{proxy_url!r}}") + if parsed.hostname not in ("127.0.0.1", "localhost", "::1"): + raise RuntimeError(f"managed loopback proxy URL is not loopback: {{proxy_url!r}}") return parsed.hostname, parsed.port or 80 def connect_with_retry(host, port, timeout_seconds=20):