Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,20 @@ struct St {
ee: Arc<Ee>,
keys: Arc<Keys>,
hostname: String,
/// Tunnel name returned by the CP at /register — stable for the
/// life of this agent's tunnel. The /ingress/replace call on the
/// CP keys off this to look up the tunnel_id.
agent_id: String,
cp_hostname: String,
started: Instant,
/// Current Intel-signed JWT. Refreshed by a background task.
ita_token: Arc<RwLock<String>>,
/// Live set of per-workload ingress rules this agent has asked
/// the CP to publish. Seeded from boot `cfg.extra_ingress`;
/// appended each time a POSTed workload declares `expose`. The
/// agent forwards the full list on every /ingress/replace call
/// so the CP's PUT is a straight replacement.
extras: Arc<RwLock<Vec<(String, u16)>>>,
}

pub async fn run() -> Result<()> {
Expand Down Expand Up @@ -95,9 +105,11 @@ pub async fn run() -> Result<()> {
ee,
keys,
hostname: b.hostname,
agent_id: b.agent_id,
cp_hostname: b.cp_hostname,
started: Instant::now(),
ita_token,
extras: Arc::new(RwLock::new(cfg.extra_ingress.clone())),
};

let app = Router::new()
Expand All @@ -122,6 +134,7 @@ pub async fn run() -> Result<()> {
struct Bootstrap {
tunnel_token: String,
hostname: String,
agent_id: String,
jwt_secret_b64: String,
cp_hostname: String,
}
Expand Down Expand Up @@ -432,7 +445,84 @@ async fn deploy(
Json(spec): Json<serde_json::Value>,
) -> Result<Json<serde_json::Value>> {
auth::resolve(&s.keys, &s.cfg.common.owner, &headers).await?;
Ok(Json(s.ee.deploy(spec).await?))

// Pull `expose` off the spec before forwarding to EE. EE ignores
// unknown fields today but keeping the payload tidy avoids future
// surprises if EE ever grows stricter parsing.
let expose = parse_expose(&spec);

let response = s.ee.deploy(spec).await?;

if let Some((label, port)) = expose {
if let Err(e) = push_extra_ingress(&s, label.clone(), port).await {
// Soft-fail: the workload is deployed, the owner just can't
// reach it from the public internet yet. Better than failing
// the whole /deploy and leaving the caller unsure whether
// the process is running.
eprintln!(
"agent: /ingress/replace add {label}:{port} failed (workload still running): {e}"
);
}
}

Ok(Json(response))
}

/// Extract `expose.hostname_label` + `expose.port` from a DeployRequest
/// JSON body. Returns None if the field is missing or malformed; the
/// caller treats that as "no runtime ingress requested" and moves on.
fn parse_expose(spec: &serde_json::Value) -> Option<(String, u16)> {
let expose = spec.get("expose")?;
let label = expose.get("hostname_label")?.as_str()?.to_string();
let port = expose.get("port")?.as_u64()?;
if label.is_empty() || port == 0 || port > u16::MAX as u64 {
return None;
}
Some((label, port as u16))
}

/// Append `(label, port)` to the live extras list (dedup by label —
/// redeploying the same app_name with the same hostname_label is a
/// no-op, not a duplicate rule) and POST the full list to the CP's
/// /ingress/replace endpoint. The CP re-PUTs the tunnel config and
/// upserts CNAMEs.
async fn push_extra_ingress(s: &St, label: String, port: u16) -> Result<()> {
let extras = {
let mut guard = s.extras.write().await;
if let Some(existing) = guard.iter_mut().find(|(l, _)| *l == label) {
existing.1 = port;
} else {
guard.push((label, port));
}
guard.clone()
};

let body_extras: Vec<serde_json::Value> = extras
.iter()
.map(|(l, p)| serde_json::json!({"hostname_label": l, "port": p}))
.collect();
let body = serde_json::json!({
"agent_id": s.agent_id,
"extras": body_extras,
});

let url = format!("{}/ingress/replace", s.cfg.cp_url.trim_end_matches('/'));
let resp = reqwest::Client::new()
.post(&url)
.bearer_auth(&s.cfg.pat)
.json(&body)
.send()
.await
.map_err(|e| Error::Upstream(format!("ingress/replace {url}: {e}")))?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
return Err(Error::Upstream(format!(
"ingress/replace {url} → {status}: {text}"
)));
}
eprintln!("agent: ingress/replace ok ({} extras total)", extras.len());
Ok(())
}

#[derive(Debug, Deserialize)]
Expand Down
54 changes: 44 additions & 10 deletions src/cf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,43 @@ pub async fn create(
.ok_or_else(|| Error::Upstream("tunnel create: missing token".into()))?
.to_string();

let extra_hostnames = apply_ingress(http, cf, &id, hostname, extras).await?;

Ok(Tunnel {
id,
token,
hostname: hostname.to_string(),
extra_hostnames,
})
}

/// Replace an existing tunnel's ingress rules + CNAME records. Used
/// for runtime updates — e.g. a workload POSTed to `/deploy` declares
/// `expose`, the agent forwards the full current extras list to the
/// CP, and the CP calls this to re-PUT the tunnel config without
/// recreating the tunnel or touching the tunnel token. Returns the
/// resolved extra hostnames so the caller can log / store them.
pub async fn update_ingress(
http: &Client,
cf: &CfCreds,
tunnel_id: &str,
hostname: &str,
extras: &[(String, u16)],
) -> Result<Vec<String>> {
apply_ingress(http, cf, tunnel_id, hostname, extras).await
}

/// Build the ingress array (extras first, then the primary
/// `hostname → localhost:8080` rule, then the 404 catch-all), PUT
/// it to the tunnel, and upsert a CNAME for each hostname pointing
/// at `{tunnel_id}.cfargotunnel.com`.
async fn apply_ingress(
http: &Client,
cf: &CfCreds,
tunnel_id: &str,
hostname: &str,
extras: &[(String, u16)],
) -> Result<Vec<String>> {
let mut ingress: Vec<serde_json::Value> = extras
.iter()
.map(|(label, port)| {
Expand All @@ -115,25 +152,22 @@ pub async fn create(
http,
cf,
Method::PUT,
&format!("/accounts/{}/cfd_tunnel/{id}/configurations", cf.account_id),
&format!(
"/accounts/{}/cfd_tunnel/{tunnel_id}/configurations",
cf.account_id
),
Some(serde_json::json!({"config": {"ingress": ingress}})),
)
.await?;

upsert_cname(http, cf, &id, hostname).await?;
upsert_cname(http, cf, tunnel_id, hostname).await?;
let mut extra_hostnames = Vec::with_capacity(extras.len());
for (label, _) in extras {
let extra = format!("{label}.{hostname}");
upsert_cname(http, cf, &id, &extra).await?;
upsert_cname(http, cf, tunnel_id, &extra).await?;
extra_hostnames.push(extra);
}

Ok(Tunnel {
id,
token,
hostname: hostname.to_string(),
extra_hostnames,
})
Ok(extra_hostnames)
}

async fn upsert_cname(http: &Client, cf: &CfCreds, tunnel_id: &str, hostname: &str) -> Result<()> {
Expand Down
26 changes: 25 additions & 1 deletion src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ pub struct Agent {
/// Intel-verified ITA claims. Required — agents without a valid
/// token don't enter the store.
pub ita: ita::Claims,
/// CF tunnel ID (not name) — needed to re-PUT ingress at runtime
/// when a POSTed workload declares `expose`. Empty for the
/// `control-plane` pseudo-entry which doesn't take runtime slop.
#[serde(default)]
pub tunnel_id: String,
/// Currently-active per-workload ingress rules for this agent's
/// tunnel. Seeded at /register from the boot-workload `expose`
/// set; appended on each runtime /ingress/replace call. If the
/// agent relaunches, the CP re-seeds from the new register's
/// `extra_ingress` field — runtime extras are intentionally NOT
/// persisted across relaunches.
#[serde(default)]
pub extras: Vec<(String, u16)>,
}

pub type Store = Arc<Mutex<HashMap<String, Agent>>>;
Expand Down Expand Up @@ -156,7 +169,15 @@ async fn tick(
// reports its full hostname there, which would land in a
// different key than /register used (the bare tunnel name) and
// produce a duplicate /api/agents entry per agent.
store.lock().await.insert(
let mut s = store.lock().await;
// Preserve tunnel_id + extras across scrapes — they're owned by
// /register and /ingress/replace; the collector's job is
// health/metrics refresh only, not ingress bookkeeping.
let (tunnel_id, extras) = s
.get(name)
.map(|a| (a.tunnel_id.clone(), a.extras.clone()))
.unwrap_or_default();
s.insert(
name.clone(),
Agent {
agent_id: name.clone(),
Expand All @@ -183,8 +204,11 @@ async fn tick(
nets: serde_json::from_value(h["nets"].clone()).unwrap_or_default(),
disks: serde_json::from_value(h["disks"].clone()).unwrap_or_default(),
ita: claims,
tunnel_id,
extras,
},
);
drop(s);
verified += 1;
}

Expand Down
12 changes: 6 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ pub struct Agent {
pub pat: String,
pub ee_socket: String,
pub ita: Ita,
/// Extra cloudflared ingress rules requested at register time.
/// Populated from `DD_EXTRA_INGRESS` — a JSON array of
/// `{"hostname_label": "...", "port": N}` objects assembled by
/// the boot-workload builder (`apps/_infra/local-agents.sh`)
/// from `expose` hints on individual workload specs. Empty is
/// fine — the agent just gets the default dashboard rule.
/// Extra cloudflared ingress rules requested at register time,
/// parsed from `DD_EXTRA_INGRESS` (a comma-separated list of
/// `label:port` pairs, e.g. `gpu:8081,web:9000`). The boot-workload
/// builder (`apps/_infra/local-agents.sh`) collects these from
/// `expose` hints on individual workload specs. Empty is fine —
/// the agent just gets the default dashboard rule.
pub extra_ingress: Vec<(String, u16)>,
}

Expand Down
83 changes: 83 additions & 0 deletions src/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ pub async fn run() -> Result<()> {
nets: cp_m.nets,
disks: cp_m.disks,
ita: cp_claims,
// CP doesn't take per-workload runtime ingress — its own tunnel
// only routes `DD_HOSTNAME → localhost:8080`. tunnel_id stays
// empty so the runtime-ingress endpoint rejects attempts to
// target "control-plane".
tunnel_id: String::new(),
extras: Vec::new(),
},
);

Expand Down Expand Up @@ -207,6 +213,7 @@ pub async fn run() -> Result<()> {
.route("/", get(fleet))
.route("/health", get(health))
.route("/register", post(register))
.route("/ingress/replace", post(ingress_replace))
.route("/agent/{id}", get(agent_detail))
.route("/agent/{id}/logs/{app}", get(agent_logs))
.route("/api/agents", get(api_agents))
Expand Down Expand Up @@ -372,6 +379,10 @@ async fn register(
nets: Vec::new(),
disks: Vec::new(),
ita: ita_claims,
// Seeded from the boot `extra_ingress`; runtime /deploy
// requests extend this list via /ingress/replace (below).
tunnel_id: tunnel.id.clone(),
extras: extras.clone(),
},
);
}
Expand All @@ -390,6 +401,78 @@ async fn register(
})))
}

#[derive(Debug, Deserialize)]
struct IngressReplaceReq {
/// The agent's own `agent_id` (== tunnel name) as returned from
/// /register. Authenticated indirectly: the caller's PAT must
/// belong to `DD_OWNER`, and the agent must already exist in the
/// CP's store under this id (so anyone with a valid owner-PAT can
/// only target agents the CP already knows about).
agent_id: String,
/// Full replacement set of per-workload ingress rules for this
/// agent. The CP re-PUTs the tunnel config with `extras` first,
/// the primary `hostname → localhost:8080` rule, and the 404
/// catch-all. Runtime additions from /deploy live alongside the
/// boot-time `extra_ingress` from /register here — the agent
/// owns the merge.
extras: Vec<IngressPair>,
}

#[derive(Debug, Deserialize)]
struct IngressPair {
hostname_label: String,
port: u16,
}

/// POST /ingress/replace — agent pushes an updated ingress list
/// (boot extras + anything POSTed to /deploy with an `expose` field)
/// and the CP re-PUTs the tunnel config + CNAMEs. No tunnel
/// recreation — the token + tunnel id stay stable across calls.
async fn ingress_replace(
State(s): State<St>,
headers: HeaderMap,
Json(req): Json<IngressReplaceReq>,
) -> Result<Json<serde_json::Value>> {
let pat = auth::bearer(&headers).ok_or(Error::Unauthorized)?;
let _login = auth::verify_pat(&pat, &s.cfg.common.owner).await?;

let (tunnel_id, hostname) = {
let store = s.store.lock().await;
let agent = store.get(&req.agent_id).ok_or(Error::NotFound)?;
if agent.tunnel_id.is_empty() {
// Control-plane pseudo-entry, or an older store entry that
// pre-dates the tunnel_id field. Either way, nothing to update.
return Err(Error::BadRequest(format!(
"{} has no tunnel — runtime ingress applies only to agent tunnels",
req.agent_id
)));
}
(agent.tunnel_id.clone(), agent.hostname.clone())
};

let extras: Vec<(String, u16)> = req
.extras
.iter()
.map(|e| (e.hostname_label.clone(), e.port))
.collect();

let http = reqwest::Client::new();
let hostnames = cf::update_ingress(&http, &s.cfg.cf, &tunnel_id, &hostname, &extras).await?;

{
let mut store = s.store.lock().await;
if let Some(agent) = store.get_mut(&req.agent_id) {
agent.extras = extras;
}
}

eprintln!("cp: ingress/replace {} → {:?}", req.agent_id, hostnames);
Ok(Json(serde_json::json!({
"agent_id": req.agent_id,
"extra_hostnames": hostnames,
})))
}

/// Mint the CP's own ITA token at startup. Fatal on any failure —
/// the CP refuses to start without proving its own TDX measurement.
async fn mint_cp_ita(cfg: &Cfg, ee: &Ee) -> Result<String> {
Expand Down
Loading