From 2377bd90f1e384c946e60c148cf8e19f875cb629 Mon Sep 17 00:00:00 2001 From: Alex Newman Date: Sat, 18 Apr 2026 21:42:45 +0000 Subject: [PATCH] feat(ingress): runtime per-workload CF ingress on /deploy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #134 wired `expose` at boot time — the ingress rules baked into the agent VM's config.iso got published when CP first created the tunnel. This extends that to the runtime path: when a workload POSTed to dd-agent's /deploy declares `expose: {hostname_label, port}`, the agent now calls the CP's new /ingress/replace endpoint with the merged (boot + runtime) extras list, and CP re-PUTs the tunnel config + upserts a CNAME for the new hostname. Wire-level summary: - cf.rs — extract `apply_ingress()` used by both `create()` (at register) and a new public `update_ingress()` (at runtime). The existing tunnel id + token stay stable. - cp.rs — new endpoint POST /ingress/replace. PAT-authenticated, looks up the agent in the store by agent_id, re-PUTs the tunnel config, updates the store's `extras` field for the agent. - collector::Agent — gains `tunnel_id` + `extras` fields, preserved across /health scrapes so the collector doesn't clobber them. - agent.rs — stores `agent_id` from the register bootstrap, holds a live `Arc>>` for the merged extras, hooks into /deploy to push updates. Soft-fails — workload stays running even if the ingress update fails; only public reachability is affected. Opens the runtime path slopandmop needs: POST openclaw to an agent, the agent asks CP to route `openclaw.` → localhost:port, CF picks up the ingress config within seconds, browser hits the URL. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/agent.rs | 92 +++++++++++++++++++++++++++++++++++++++++++++++- src/cf.rs | 54 ++++++++++++++++++++++------ src/collector.rs | 26 +++++++++++++- src/config.rs | 12 +++---- src/cp.rs | 83 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 249 insertions(+), 18 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index 22cbdcd..77b3f03 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -40,10 +40,20 @@ struct St { ee: Arc, keys: Arc, hostname: String, + /// Tunnel name returned by the CP at /register — stable for the + /// life of this agent's tunnel. The /ingress/replace call on the + /// CP keys off this to look up the tunnel_id. + agent_id: String, cp_hostname: String, started: Instant, /// Current Intel-signed JWT. Refreshed by a background task. ita_token: Arc>, + /// Live set of per-workload ingress rules this agent has asked + /// the CP to publish. Seeded from boot `cfg.extra_ingress`; + /// appended each time a POSTed workload declares `expose`. The + /// agent forwards the full list on every /ingress/replace call + /// so the CP's PUT is a straight replacement. + extras: Arc>>, } pub async fn run() -> Result<()> { @@ -95,9 +105,11 @@ pub async fn run() -> Result<()> { ee, keys, hostname: b.hostname, + agent_id: b.agent_id, cp_hostname: b.cp_hostname, started: Instant::now(), ita_token, + extras: Arc::new(RwLock::new(cfg.extra_ingress.clone())), }; let app = Router::new() @@ -122,6 +134,7 @@ pub async fn run() -> Result<()> { struct Bootstrap { tunnel_token: String, hostname: String, + agent_id: String, jwt_secret_b64: String, cp_hostname: String, } @@ -432,7 +445,84 @@ async fn deploy( Json(spec): Json, ) -> Result> { auth::resolve(&s.keys, &s.cfg.common.owner, &headers).await?; - Ok(Json(s.ee.deploy(spec).await?)) + + // Pull `expose` off the spec before forwarding to EE. EE ignores + // unknown fields today but keeping the payload tidy avoids future + // surprises if EE ever grows stricter parsing. + let expose = parse_expose(&spec); + + let response = s.ee.deploy(spec).await?; + + if let Some((label, port)) = expose { + if let Err(e) = push_extra_ingress(&s, label.clone(), port).await { + // Soft-fail: the workload is deployed, the owner just can't + // reach it from the public internet yet. Better than failing + // the whole /deploy and leaving the caller unsure whether + // the process is running. + eprintln!( + "agent: /ingress/replace add {label}:{port} failed (workload still running): {e}" + ); + } + } + + Ok(Json(response)) +} + +/// Extract `expose.hostname_label` + `expose.port` from a DeployRequest +/// JSON body. Returns None if the field is missing or malformed; the +/// caller treats that as "no runtime ingress requested" and moves on. +fn parse_expose(spec: &serde_json::Value) -> Option<(String, u16)> { + let expose = spec.get("expose")?; + let label = expose.get("hostname_label")?.as_str()?.to_string(); + let port = expose.get("port")?.as_u64()?; + if label.is_empty() || port == 0 || port > u16::MAX as u64 { + return None; + } + Some((label, port as u16)) +} + +/// Append `(label, port)` to the live extras list (dedup by label — +/// redeploying the same app_name with the same hostname_label is a +/// no-op, not a duplicate rule) and POST the full list to the CP's +/// /ingress/replace endpoint. The CP re-PUTs the tunnel config and +/// upserts CNAMEs. +async fn push_extra_ingress(s: &St, label: String, port: u16) -> Result<()> { + let extras = { + let mut guard = s.extras.write().await; + if let Some(existing) = guard.iter_mut().find(|(l, _)| *l == label) { + existing.1 = port; + } else { + guard.push((label, port)); + } + guard.clone() + }; + + let body_extras: Vec = extras + .iter() + .map(|(l, p)| serde_json::json!({"hostname_label": l, "port": p})) + .collect(); + let body = serde_json::json!({ + "agent_id": s.agent_id, + "extras": body_extras, + }); + + let url = format!("{}/ingress/replace", s.cfg.cp_url.trim_end_matches('/')); + let resp = reqwest::Client::new() + .post(&url) + .bearer_auth(&s.cfg.pat) + .json(&body) + .send() + .await + .map_err(|e| Error::Upstream(format!("ingress/replace {url}: {e}")))?; + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + return Err(Error::Upstream(format!( + "ingress/replace {url} → {status}: {text}" + ))); + } + eprintln!("agent: ingress/replace ok ({} extras total)", extras.len()); + Ok(()) } #[derive(Debug, Deserialize)] diff --git a/src/cf.rs b/src/cf.rs index f916ee4..da69b6c 100644 --- a/src/cf.rs +++ b/src/cf.rs @@ -96,6 +96,43 @@ pub async fn create( .ok_or_else(|| Error::Upstream("tunnel create: missing token".into()))? .to_string(); + let extra_hostnames = apply_ingress(http, cf, &id, hostname, extras).await?; + + Ok(Tunnel { + id, + token, + hostname: hostname.to_string(), + extra_hostnames, + }) +} + +/// Replace an existing tunnel's ingress rules + CNAME records. Used +/// for runtime updates — e.g. a workload POSTed to `/deploy` declares +/// `expose`, the agent forwards the full current extras list to the +/// CP, and the CP calls this to re-PUT the tunnel config without +/// recreating the tunnel or touching the tunnel token. Returns the +/// resolved extra hostnames so the caller can log / store them. +pub async fn update_ingress( + http: &Client, + cf: &CfCreds, + tunnel_id: &str, + hostname: &str, + extras: &[(String, u16)], +) -> Result> { + apply_ingress(http, cf, tunnel_id, hostname, extras).await +} + +/// Build the ingress array (extras first, then the primary +/// `hostname → localhost:8080` rule, then the 404 catch-all), PUT +/// it to the tunnel, and upsert a CNAME for each hostname pointing +/// at `{tunnel_id}.cfargotunnel.com`. +async fn apply_ingress( + http: &Client, + cf: &CfCreds, + tunnel_id: &str, + hostname: &str, + extras: &[(String, u16)], +) -> Result> { let mut ingress: Vec = extras .iter() .map(|(label, port)| { @@ -115,25 +152,22 @@ pub async fn create( http, cf, Method::PUT, - &format!("/accounts/{}/cfd_tunnel/{id}/configurations", cf.account_id), + &format!( + "/accounts/{}/cfd_tunnel/{tunnel_id}/configurations", + cf.account_id + ), Some(serde_json::json!({"config": {"ingress": ingress}})), ) .await?; - upsert_cname(http, cf, &id, hostname).await?; + upsert_cname(http, cf, tunnel_id, hostname).await?; let mut extra_hostnames = Vec::with_capacity(extras.len()); for (label, _) in extras { let extra = format!("{label}.{hostname}"); - upsert_cname(http, cf, &id, &extra).await?; + upsert_cname(http, cf, tunnel_id, &extra).await?; extra_hostnames.push(extra); } - - Ok(Tunnel { - id, - token, - hostname: hostname.to_string(), - extra_hostnames, - }) + Ok(extra_hostnames) } async fn upsert_cname(http: &Client, cf: &CfCreds, tunnel_id: &str, hostname: &str) -> Result<()> { diff --git a/src/collector.rs b/src/collector.rs index 725b9b8..6864a14 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -46,6 +46,19 @@ pub struct Agent { /// Intel-verified ITA claims. Required — agents without a valid /// token don't enter the store. pub ita: ita::Claims, + /// CF tunnel ID (not name) — needed to re-PUT ingress at runtime + /// when a POSTed workload declares `expose`. Empty for the + /// `control-plane` pseudo-entry which doesn't take runtime slop. + #[serde(default)] + pub tunnel_id: String, + /// Currently-active per-workload ingress rules for this agent's + /// tunnel. Seeded at /register from the boot-workload `expose` + /// set; appended on each runtime /ingress/replace call. If the + /// agent relaunches, the CP re-seeds from the new register's + /// `extra_ingress` field — runtime extras are intentionally NOT + /// persisted across relaunches. + #[serde(default)] + pub extras: Vec<(String, u16)>, } pub type Store = Arc>>; @@ -156,7 +169,15 @@ async fn tick( // reports its full hostname there, which would land in a // different key than /register used (the bare tunnel name) and // produce a duplicate /api/agents entry per agent. - store.lock().await.insert( + let mut s = store.lock().await; + // Preserve tunnel_id + extras across scrapes — they're owned by + // /register and /ingress/replace; the collector's job is + // health/metrics refresh only, not ingress bookkeeping. + let (tunnel_id, extras) = s + .get(name) + .map(|a| (a.tunnel_id.clone(), a.extras.clone())) + .unwrap_or_default(); + s.insert( name.clone(), Agent { agent_id: name.clone(), @@ -183,8 +204,11 @@ async fn tick( nets: serde_json::from_value(h["nets"].clone()).unwrap_or_default(), disks: serde_json::from_value(h["disks"].clone()).unwrap_or_default(), ita: claims, + tunnel_id, + extras, }, ); + drop(s); verified += 1; } diff --git a/src/config.rs b/src/config.rs index 0a08c0a..7d9d13a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -124,12 +124,12 @@ pub struct Agent { pub pat: String, pub ee_socket: String, pub ita: Ita, - /// Extra cloudflared ingress rules requested at register time. - /// Populated from `DD_EXTRA_INGRESS` — a JSON array of - /// `{"hostname_label": "...", "port": N}` objects assembled by - /// the boot-workload builder (`apps/_infra/local-agents.sh`) - /// from `expose` hints on individual workload specs. Empty is - /// fine — the agent just gets the default dashboard rule. + /// Extra cloudflared ingress rules requested at register time, + /// parsed from `DD_EXTRA_INGRESS` (a comma-separated list of + /// `label:port` pairs, e.g. `gpu:8081,web:9000`). The boot-workload + /// builder (`apps/_infra/local-agents.sh`) collects these from + /// `expose` hints on individual workload specs. Empty is fine — + /// the agent just gets the default dashboard rule. pub extra_ingress: Vec<(String, u16)>, } diff --git a/src/cp.rs b/src/cp.rs index f30b46c..3742e56 100644 --- a/src/cp.rs +++ b/src/cp.rs @@ -131,6 +131,12 @@ pub async fn run() -> Result<()> { nets: cp_m.nets, disks: cp_m.disks, ita: cp_claims, + // CP doesn't take per-workload runtime ingress — its own tunnel + // only routes `DD_HOSTNAME → localhost:8080`. tunnel_id stays + // empty so the runtime-ingress endpoint rejects attempts to + // target "control-plane". + tunnel_id: String::new(), + extras: Vec::new(), }, ); @@ -207,6 +213,7 @@ pub async fn run() -> Result<()> { .route("/", get(fleet)) .route("/health", get(health)) .route("/register", post(register)) + .route("/ingress/replace", post(ingress_replace)) .route("/agent/{id}", get(agent_detail)) .route("/agent/{id}/logs/{app}", get(agent_logs)) .route("/api/agents", get(api_agents)) @@ -372,6 +379,10 @@ async fn register( nets: Vec::new(), disks: Vec::new(), ita: ita_claims, + // Seeded from the boot `extra_ingress`; runtime /deploy + // requests extend this list via /ingress/replace (below). + tunnel_id: tunnel.id.clone(), + extras: extras.clone(), }, ); } @@ -390,6 +401,78 @@ async fn register( }))) } +#[derive(Debug, Deserialize)] +struct IngressReplaceReq { + /// The agent's own `agent_id` (== tunnel name) as returned from + /// /register. Authenticated indirectly: the caller's PAT must + /// belong to `DD_OWNER`, and the agent must already exist in the + /// CP's store under this id (so anyone with a valid owner-PAT can + /// only target agents the CP already knows about). + agent_id: String, + /// Full replacement set of per-workload ingress rules for this + /// agent. The CP re-PUTs the tunnel config with `extras` first, + /// the primary `hostname → localhost:8080` rule, and the 404 + /// catch-all. Runtime additions from /deploy live alongside the + /// boot-time `extra_ingress` from /register here — the agent + /// owns the merge. + extras: Vec, +} + +#[derive(Debug, Deserialize)] +struct IngressPair { + hostname_label: String, + port: u16, +} + +/// POST /ingress/replace — agent pushes an updated ingress list +/// (boot extras + anything POSTed to /deploy with an `expose` field) +/// and the CP re-PUTs the tunnel config + CNAMEs. No tunnel +/// recreation — the token + tunnel id stay stable across calls. +async fn ingress_replace( + State(s): State, + headers: HeaderMap, + Json(req): Json, +) -> Result> { + let pat = auth::bearer(&headers).ok_or(Error::Unauthorized)?; + let _login = auth::verify_pat(&pat, &s.cfg.common.owner).await?; + + let (tunnel_id, hostname) = { + let store = s.store.lock().await; + let agent = store.get(&req.agent_id).ok_or(Error::NotFound)?; + if agent.tunnel_id.is_empty() { + // Control-plane pseudo-entry, or an older store entry that + // pre-dates the tunnel_id field. Either way, nothing to update. + return Err(Error::BadRequest(format!( + "{} has no tunnel — runtime ingress applies only to agent tunnels", + req.agent_id + ))); + } + (agent.tunnel_id.clone(), agent.hostname.clone()) + }; + + let extras: Vec<(String, u16)> = req + .extras + .iter() + .map(|e| (e.hostname_label.clone(), e.port)) + .collect(); + + let http = reqwest::Client::new(); + let hostnames = cf::update_ingress(&http, &s.cfg.cf, &tunnel_id, &hostname, &extras).await?; + + { + let mut store = s.store.lock().await; + if let Some(agent) = store.get_mut(&req.agent_id) { + agent.extras = extras; + } + } + + eprintln!("cp: ingress/replace {} → {:?}", req.agent_id, hostnames); + Ok(Json(serde_json::json!({ + "agent_id": req.agent_id, + "extra_hostnames": hostnames, + }))) +} + /// Mint the CP's own ITA token at startup. Fatal on any failure — /// the CP refuses to start without proving its own TDX measurement. async fn mint_cp_ita(cfg: &Cfg, ee: &Ee) -> Result {