diff --git a/Cargo.lock b/Cargo.lock index ad7efabc9..d35a67960 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3693,6 +3693,7 @@ dependencies = [ "axum 0.8.9", "bytes", "clap", + "ed25519-dalek", "futures", "futures-util", "hex", @@ -3726,6 +3727,7 @@ dependencies = [ "prost", "prost-types", "rand 0.9.4", + "rand_core 0.6.4", "rcgen", "reqwest 0.12.28", "russh", diff --git a/Cargo.toml b/Cargo.toml index 079e1e172..f3aef9224 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ sha2 = "0.10" rand = "0.9" jsonwebtoken = "9" getrandom = "0.3" +ed25519-dalek = { version = "2", features = ["rand_core", "pem", "pkcs8"] } # Filesystem embedding include_dir = "0.7" diff --git a/crates/openshell-core/src/proto/mod.rs b/crates/openshell-core/src/proto/mod.rs index 08b062d2e..12aa1fddb 100644 --- a/crates/openshell-core/src/proto/mod.rs +++ b/crates/openshell-core/src/proto/mod.rs @@ -79,6 +79,19 @@ pub mod inference { } } +#[allow( + clippy::all, + clippy::pedantic, + clippy::nursery, + unused_qualifications, + rust_2018_idioms +)] +pub mod policy { + pub mod v1alpha1 { + include!(concat!(env!("OUT_DIR"), "/openshell.policy.v1alpha1.rs")); + } +} + pub use datamodel::v1::*; pub use inference::v1::*; pub use openshell::*; diff --git a/crates/openshell-server/Cargo.toml b/crates/openshell-server/Cargo.toml index 0b7e3a97e..fbbbfc157 100644 --- a/crates/openshell-server/Cargo.toml +++ b/crates/openshell-server/Cargo.toml @@ -84,11 +84,17 @@ uuid = { workspace = true } hmac = "0.12" sha2 = { workspace = true } jsonwebtoken = { workspace = true } +ed25519-dalek = { workspace = true } async-trait = "0.1" url = { workspace = true } hex = "0.4" russh = "0.57" rand = { workspace = true } +# rand_core 0.6 is pinned here because ed25519-dalek v2 still consumes +# `rand_core 0.6` traits. The workspace `rand = "0.9"` ships an `OsRng` +# that implements the newer `rand_core 0.10` trait surface, so calls to +# `SigningKey::generate` need a `rand_core 0.6`-compatible RNG. +rand_core_06 = { package = "rand_core", version = "0.6", features = ["getrandom"] } petname = "2" ipnet = "2" tempfile = "3" diff --git a/crates/openshell-server/src/config_file.rs b/crates/openshell-server/src/config_file.rs index d7852e8ee..28ccfe677 100644 --- a/crates/openshell-server/src/config_file.rs +++ b/crates/openshell-server/src/config_file.rs @@ -55,6 +55,12 @@ pub struct OpenShellRoot { #[serde(default)] pub gateway: GatewayFileSection, + /// `[openshell.policy]` table — selects the active policy-provider + /// type. `None` here is equivalent to the local default; the loader + /// validates the chosen type in [`load`]. + #[serde(default)] + pub policy: Option, + /// `[openshell.drivers.]` tables — passed verbatim to each driver /// crate's `Deserialize` impl after the gateway-side inheritance merge. /// Stored as raw [`toml::Value`] so each driver can evolve its schema @@ -63,6 +69,37 @@ pub struct OpenShellRoot { pub drivers: BTreeMap, } +/// `[openshell.policy]` table. +/// +/// Selects the policy-provider type. Supported values: `"local"` (the +/// gateway's in-process, store-backed policy semantics) and `"attested"` +/// (out-of-process policy delivery over the +/// `openshell.policy.v1alpha1.Engine` wire — the gateway fetches signed +/// projections from a configured source). +/// +/// The `type` key intentionally mirrors `openshell-providers`' +/// `ProviderPlugin`-style selector convention. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct PolicyFileSection { + /// Policy-provider type. Accepted values: `"local"` (the default if + /// the table is omitted) and `"attested"`. `type` is a Rust keyword, + /// so the field is exposed as `r#type` in code and renamed via + /// `#[serde(rename = "type")]` for the TOML surface. + #[serde(default, rename = "type")] + pub r#type: Option, + + /// UDS path the gateway dials to reach the policy source. Required + /// when `type = "attested"`. Ignored for `type = "local"`. + #[serde(default)] + pub source_uds_path: Option, + + /// Path to the gateway-side trust store JSON file. Required when + /// `type = "attested"`. Ignored for `type = "local"`. + #[serde(default)] + pub trust_store_path: Option, +} + /// `[openshell.gateway]` section. /// /// All fields are `Option` so the loader can tell whether a key was set @@ -182,6 +219,15 @@ pub enum ConfigFileError { env: &'static str, cli: &'static str, }, + #[error( + "[openshell.policy] type = '{policy_type}' is not a recognized policy type; accepted values are 'local' (default) or 'attested'" + )] + UnknownPolicyType { policy_type: String }, + + #[error( + "[openshell.policy] type = 'attested' requires `{field}` to be set in the config file" + )] + MissingAttestedField { field: &'static str }, } /// Load and validate a TOML config file. @@ -215,9 +261,50 @@ pub fn load(path: &Path) -> Result { }); } + // Validate the optional policy-provider type. Unknown values are + // rejected here; required-field validation for known types runs + // immediately after. + if let Some(ref policy) = file.openshell.policy + && let Some(ref policy_type) = policy.r#type + && !is_known_policy_type(policy_type) + { + return Err(ConfigFileError::UnknownPolicyType { + policy_type: policy_type.clone(), + }); + } + + // `attested` requires both file paths. They are optional in the + // struct so `type = "local"` does not trip a deserialize error; the + // explicit check here surfaces a friendly message at load time. + if let Some(ref policy) = file.openshell.policy + && policy.r#type.as_deref() == Some("attested") + { + if policy.source_uds_path.is_none() { + return Err(ConfigFileError::MissingAttestedField { + field: "source_uds_path", + }); + } + if policy.trust_store_path.is_none() { + return Err(ConfigFileError::MissingAttestedField { + field: "trust_store_path", + }); + } + } + Ok(file) } +/// Policy-type strings recognized by the policy-provider config validator. +/// `"local"` is the historical (and v0) default; `"attested"` is reserved +/// for the forthcoming Attested Policy Projection provider and is parsed +/// successfully so deployments can stage the value ahead of the provider +/// landing. +pub const KNOWN_POLICY_TYPES: &[&str] = &["local", "attested"]; + +fn is_known_policy_type(policy_type: &str) -> bool { + KNOWN_POLICY_TYPES.contains(&policy_type) +} + /// Build the merged TOML table for `driver` by overlaying inheritable /// `[openshell.gateway]` defaults onto `[openshell.drivers.]`. /// @@ -431,6 +518,114 @@ ssh_gateway_port = 8080 assert!(matches!(err, ConfigFileError::Parse { .. })); } + #[test] + fn parses_policy_type_local() { + let toml = r#" +[openshell.policy] +type = "local" +"#; + let tmp = write_tmp(toml); + let file = load(tmp.path()).expect("local policy type parses"); + let policy = file.openshell.policy.expect("policy table present"); + assert_eq!(policy.r#type.as_deref(), Some("local")); + } + + #[test] + fn parses_policy_type_attested() { + // "attested" requires both `source_uds_path` and + // `trust_store_path`; the loader rejects the table if either is + // missing. With both present, the policy section round-trips. + let toml = r#" +[openshell.policy] +type = "attested" +source_uds_path = "/run/openshell/policy.sock" +trust_store_path = "/etc/openshell/trust.json" +"#; + let tmp = write_tmp(toml); + let file = load(tmp.path()).expect("attested policy type parses"); + let policy = file.openshell.policy.expect("policy table present"); + assert_eq!(policy.r#type.as_deref(), Some("attested")); + assert_eq!( + policy.source_uds_path.as_deref(), + Some(Path::new("/run/openshell/policy.sock")) + ); + assert_eq!( + policy.trust_store_path.as_deref(), + Some(Path::new("/etc/openshell/trust.json")) + ); + } + + #[test] + fn rejects_attested_without_source_uds_path() { + let toml = r#" +[openshell.policy] +type = "attested" +trust_store_path = "/etc/openshell/trust.json" +"#; + let tmp = write_tmp(toml); + let err = load(tmp.path()).expect_err("missing source_uds_path must error"); + assert!(matches!( + err, + ConfigFileError::MissingAttestedField { + field: "source_uds_path" + } + )); + } + + #[test] + fn rejects_attested_without_trust_store_path() { + let toml = r#" +[openshell.policy] +type = "attested" +source_uds_path = "/run/openshell/policy.sock" +"#; + let tmp = write_tmp(toml); + let err = load(tmp.path()).expect_err("missing trust_store_path must error"); + assert!(matches!( + err, + ConfigFileError::MissingAttestedField { + field: "trust_store_path" + } + )); + } + + #[test] + fn rejects_unknown_policy_type() { + let toml = r#" +[openshell.policy] +type = "nonsense" +"#; + let tmp = write_tmp(toml); + let err = load(tmp.path()).expect_err("unknown policy type must be rejected"); + assert!(matches!( + err, + ConfigFileError::UnknownPolicyType { ref policy_type } if policy_type == "nonsense" + )); + } + + #[test] + fn missing_policy_table_is_ok() { + let toml = r#" +[openshell.gateway] +log_level = "info" +"#; + let tmp = write_tmp(toml); + let file = load(tmp.path()).expect("absent policy table is allowed"); + assert!(file.openshell.policy.is_none()); + } + + #[test] + fn rejects_unknown_policy_field() { + let toml = r#" +[openshell.policy] +type = "local" +nonsense = true +"#; + let tmp = write_tmp(toml); + let err = load(tmp.path()).expect_err("unknown policy field must be rejected"); + assert!(matches!(err, ConfigFileError::Parse { .. })); + } + #[test] fn rejects_unsupported_version() { let toml = r" diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 38268fcd0..17a694196 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -13,6 +13,9 @@ use crate::ServerState; use crate::auth::principal::Principal; use crate::persistence::{DraftChunkRecord, ObjectId, ObjectName, ObjectType, PolicyRecord, Store}; +use crate::policy_provider::{ + DeleteGlobalPolicyCtx, PolicyError, SetSandboxPolicyCtx, UpdateSandboxPolicyCtx, +}; use crate::policy_store::PolicyStoreExt; use openshell_core::proto::policy_merge_operation; use openshell_core::proto::setting_value; @@ -957,6 +960,34 @@ fn truncate_for_log(input: &str, max_chars: usize) -> String { } } +/// Map a [`PolicyError`] to a `tonic::Status`. +/// +/// `Unsupported` carries the policy-type id and the refused operation so +/// the client (and audit) get a precise reason; it maps to +/// `Status::unimplemented`. Persistence errors collapse to +/// `Status::internal` here — handlers that need finer granularity (e.g. +/// CAS conflict → `Status::aborted`) should match on `PolicyError` before +/// calling this. +fn policy_error_to_status(error: PolicyError) -> Status { + match error { + PolicyError::Unsupported { + policy_type, + operation, + } => Status::unimplemented(format!( + "policy type '{policy_type}' does not support operation '{operation}'" + )), + PolicyError::Persistence(err) => { + super::persistence_error_to_status(err, "policy provider") + } + // Source-side failures (engine unreachable, decode error, etc.) + // surface as `unavailable` so callers retry — the gateway itself + // is healthy. + PolicyError::SourceError(err) => { + Status::unavailable(format!("policy source failure: {err}")) + } + } +} + #[cfg(test)] fn is_sandbox_caller(request: &Request) -> bool { matches!( @@ -1393,6 +1424,16 @@ pub(super) async fn handle_update_config( "delete_setting cannot be combined with policy payload", )); } + // Coarse-grained mutation gate (W-B Phase A). The global-policy + // replace path writes a revision row directly without routing + // through a per-op provider call today; gate it here so an + // alternate provider blocks `openshell policy set --global` + // uniformly with the sandbox-scoped set path. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let mut new_policy = req.policy.ok_or_else(|| { Status::invalid_argument("policy is required for global policy update") })?; @@ -1498,20 +1539,31 @@ pub(super) async fn handle_update_config( let mut global_settings = load_global_settings(state.store.as_ref()).await?; let changed = if req.delete_setting { - let removed = global_settings.settings.remove(key).is_some(); - if removed - && key == POLICY_SETTING_KEY - && let Ok(Some(latest)) = state - .store - .get_latest_policy(GLOBAL_POLICY_SANDBOX_ID) + // Gate global-policy deletion through the active policy + // provider. Local says yes (and supersedes older revisions as + // a side effect inside the trait method); other providers can + // refuse via the trait default, which maps to + // `Status::unimplemented` so `openshell policy delete + // --global` is rejected without a bespoke gate. Non-policy + // setting deletions skip the trait entirely. + if key == POLICY_SETTING_KEY { + // Coarse-grained mutation gate (W-B Phase A). Refuses the + // entire mutation surface before the per-op delete trait + // call runs. + state + .policy_provider + .permits_mutation() .await - { - let _ = state - .store - .supersede_older_policies(GLOBAL_POLICY_SANDBOX_ID, latest.version + 1) - .await; + .map_err(policy_error_to_status)?; + state + .policy_provider + .delete_policy(&DeleteGlobalPolicyCtx { + global_policy_sandbox_id: GLOBAL_POLICY_SANDBOX_ID.to_string(), + }) + .await + .map_err(policy_error_to_status)?; } - removed + global_settings.settings.remove(key).is_some() } else { let setting = req .setting_value @@ -1636,6 +1688,32 @@ pub(super) async fn handle_update_config( .ok_or_else(|| Status::internal("sandbox has no spec"))?; let merge_ops = parse_merge_operations(&req.merge_operations)?; validate_merge_operations_for_server(&merge_ops)?; + + // Coarse-grained mutation gate (W-B Phase A). Fires before any DB + // work or the per-op trait call so an alternate provider refuses + // the merge without exercising the merge-with-retry path. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; + + // Gate the merge through the active policy provider. The local + // provider returns Ok and the existing merge-with-retry path runs + // below; an alternate provider can refuse via the trait default + // (`Status::unimplemented`) so `openshell policy update` is + // rejected without a bespoke gate. + state + .policy_provider + .update_policy(&UpdateSandboxPolicyCtx { + sandbox_id: sandbox_id.clone(), + sandbox_name: sandbox.object_name().to_string(), + merge_operations: merge_ops.clone(), + baseline_policy: spec.policy.clone(), + }) + .await + .map_err(policy_error_to_status)?; + let (version, hash) = apply_merge_operations_with_retry( state.store.as_ref(), &sandbox_id, @@ -1733,52 +1811,51 @@ pub(super) async fn handle_update_config( ); } - let latest = state - .store - .get_latest_policy(&sandbox_id) - .await - .map_err(|e| Status::internal(format!("fetch latest policy failed: {e}")))?; - - let payload = new_policy.encode_to_vec(); - let hash = deterministic_policy_hash(&new_policy); - - if let Some(ref current) = latest - && current.policy_hash == hash - { - return Ok(Response::new(UpdateConfigResponse { - version: u32::try_from(current.version).unwrap_or(0), - policy_hash: hash, - settings_revision: 0, - deleted: false, - })); - } - - let next_version = latest.map_or(1, |r| r.version + 1); - let policy_id = uuid::Uuid::new_v4().to_string(); - + // Coarse-grained mutation gate (W-B Phase A). Runs before the per-op + // `set_policy` call so an alternate provider can refuse the entire + // mutation surface — including the draft-chunk handlers — without + // implementing per-op trait methods. The per-op call below is kept as + // the natural extension seam for what work happens once a mutation is + // permitted; this is the gate that decides whether any work happens + // at all. state - .store - .put_policy_revision(&policy_id, &sandbox_id, next_version, &payload, &hash) + .policy_provider + .permits_mutation() .await - .map_err(|e| Status::internal(format!("persist policy revision failed: {e}")))?; - - let _ = state - .store - .supersede_older_policies(&sandbox_id, next_version) - .await; + .map_err(policy_error_to_status)?; + + // Route the sandbox-scoped policy replacement through the active + // policy provider. `LocalPolicyProvider::set_policy` performs the same + // put-revision + supersede dance that used to live inline here; an + // alternate provider (next session: `AttestedPolicyProvider`) can + // refuse the mutation via the trait default, which maps to + // `Status::unimplemented` so `openshell policy set` is rejected + // automatically without a separate gate. + let ctx = SetSandboxPolicyCtx { + sandbox_id: sandbox_id.clone(), + sandbox_name: sandbox.object_name().to_string(), + expected_resource_version: req.expected_resource_version, + policy: new_policy, + }; + let outcome = state + .policy_provider + .set_policy(&ctx) + .await + .map_err(policy_error_to_status)?; state.sandbox_watch_bus.notify(&sandbox_id); info!( sandbox_id = %sandbox_id, - version = next_version, - policy_hash = %hash, + version = outcome.version, + policy_hash = %outcome.policy_hash, + policy_type = %state.policy_provider.id(), "UpdateConfig: new policy version persisted" ); Ok(Response::new(UpdateConfigResponse { - version: u32::try_from(next_version).unwrap_or(0), - policy_hash: hash, + version: outcome.version, + policy_hash: outcome.policy_hash, settings_revision: 0, deleted: false, })) @@ -2082,6 +2159,15 @@ pub(super) async fn handle_submit_policy_analysis( if req.name.is_empty() { return Err(Status::invalid_argument("name is required")); } + // Coarse-grained mutation gate (W-B Phase A). The proposal-submission + // surface writes new draft chunks; an alternate provider refuses the + // entire draft-chunk write surface here without per-handler trait + // methods. Runs before any DB read or write. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = resolve_sandbox_by_name_for_principal(state.store.as_ref(), &principal, &req.name).await?; @@ -2387,6 +2473,14 @@ pub(super) async fn handle_approve_draft_chunk( if req.chunk_id.is_empty() { return Err(Status::invalid_argument("chunk_id is required")); } + // Coarse-grained mutation gate (W-B Phase A). Approving a chunk merges + // its rule into active policy — pure mutation. Refuses the entire + // chunk-approval surface uniformly with the canonical RPC mutators. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; require_no_global_policy(state).await?; @@ -2474,6 +2568,15 @@ pub(super) async fn handle_reject_draft_chunk( if req.chunk_id.is_empty() { return Err(Status::invalid_argument("chunk_id is required")); } + // Coarse-grained mutation gate (W-B Phase A). Reject mutates the + // chunk's status row and may also remove a previously-approved rule + // from the active policy. Gate refuses both surfaces uniformly with + // the canonical RPC mutators. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = state .store @@ -2555,6 +2658,14 @@ pub(super) async fn handle_approve_all_draft_chunks( if req.name.is_empty() { return Err(Status::invalid_argument("name is required")); } + // Coarse-grained mutation gate (W-B Phase A). Bulk-approve merges N + // chunk rules into active policy and bumps chunk status — refuse the + // entire write surface uniformly with the canonical RPC mutators. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; require_no_global_policy(state).await?; @@ -2677,6 +2788,13 @@ pub(super) async fn handle_edit_draft_chunk( let proposed_rule = req .proposed_rule .ok_or_else(|| Status::invalid_argument("proposed_rule is required"))?; + // Coarse-grained mutation gate (W-B Phase A). Edit rewrites the + // proposed-rule bytes on the chunk — pure draft-chunk store write. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = state .store @@ -2727,6 +2845,14 @@ pub(super) async fn handle_undo_draft_chunk( if req.chunk_id.is_empty() { return Err(Status::invalid_argument("chunk_id is required")); } + // Coarse-grained mutation gate (W-B Phase A). Undo removes the chunk's + // rule from active policy and reverts the chunk's status row to + // pending — two writes to gate uniformly with the canonical mutators. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = state .store @@ -2807,6 +2933,13 @@ pub(super) async fn handle_clear_draft_chunks( if req.name.is_empty() { return Err(Status::invalid_argument("name is required")); } + // Coarse-grained mutation gate (W-B Phase A). Clear deletes all + // pending draft chunks for the sandbox — pure draft-chunk store write. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = state .store @@ -9371,4 +9504,565 @@ mod tests { "policy should be backfilled after one success" ); } + + // ---- PolicyProvider integration (W-B half 1) ---- + + use crate::policy_provider::{PolicyError, PolicyProvider}; + use async_trait::async_trait; + + /// Test-only provider that returns `Unsupported` for every mutator — + /// stands in for the future `AttestedPolicyProvider` so we can verify + /// the gRPC-layer mapping to `Status::unimplemented`. + #[derive(Debug)] + struct RefusingProvider; + + #[async_trait] + impl PolicyProvider for RefusingProvider { + fn id(&self) -> &'static str { + "refusing" + } + + async fn get_effective_policy( + &self, + _sandbox_id: &str, + ) -> Result, PolicyError> { + Ok(None) + } + // set_policy / update_policy / delete_policy inherit the default + // `Unsupported` impls — this is the property under test. + } + + /// Swap the policy provider on a freshly-built test `ServerState`. The + /// `test_server_state` helper returns a unique `Arc`, so we can unwrap + /// it, mutate, and re-wrap. + fn override_policy_provider( + state: Arc, + provider: Arc, + ) -> Arc { + let inner = Arc::try_unwrap(state) + .map_err(|_| "expected unique test-state Arc") + .unwrap(); + Arc::new(ServerState { + policy_provider: provider, + ..inner + }) + } + + #[test] + fn policy_error_unsupported_maps_to_unimplemented() { + let status = policy_error_to_status(PolicyError::Unsupported { + policy_type: "attested", + operation: "set_policy", + }); + assert_eq!(status.code(), Code::Unimplemented); + assert!(status.message().contains("attested")); + assert!(status.message().contains("set_policy")); + } + + #[test] + fn policy_error_persistence_maps_to_internal() { + let status = policy_error_to_status(PolicyError::Persistence( + crate::persistence::PersistenceError::Database("boom".to_string()), + )); + assert_eq!(status.code(), Code::Internal); + } + + /// End-to-end: replace the state's provider with the refusing stub, + /// then drive `handle_update_config` through the sandbox-scoped + /// policy-replace path. The handler must surface + /// `Status::unimplemented` carrying the policy type and operation, + /// which is exactly what `openshell policy set` will see when the + /// attested provider is configured. + #[tokio::test] + async fn refusing_provider_sandbox_set_returns_unimplemented() { + use openshell_core::proto::{SandboxPhase, SandboxSpec}; + let state = test_server_state().await; + // No baseline `spec.policy` — first-time policy discovery exercises + // the backfill path, which reaches the provider seam without first + // tripping `validate_static_fields_unchanged`. + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-refuse".to_string(), + name: "sb-refuse".to_string(), + created_at_ms: 1_000_000, + labels: HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Provisioning as i32); + state.store.put_message(&sandbox).await.unwrap(); + + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let req = with_user(Request::new(UpdateConfigRequest { + name: "sb-refuse".to_string(), + policy: Some(ProtoSandboxPolicy::default()), + ..Default::default() + })); + let err = handle_update_config(&state, req) + .await + .expect_err("refusing provider must reject set_policy"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + // W-B Phase A: the coarse `permits_mutation` gate fires before the + // per-op `set_policy` call, so the surfaced operation is the + // generic `mutation`. The per-op trait method is still defined as + // the natural extension seam for what work happens once a + // mutation is permitted, but it's the gate that decides whether + // any work happens at all. + assert!(err.message().contains("mutation")); + } + + #[tokio::test] + async fn refusing_provider_global_delete_returns_unimplemented() { + let state = test_server_state().await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let req = with_user(Request::new(UpdateConfigRequest { + global: true, + setting_key: POLICY_SETTING_KEY.to_string(), + delete_setting: true, + ..Default::default() + })); + let err = handle_update_config(&state, req) + .await + .expect_err("refusing provider must reject global delete_policy"); + assert_eq!(err.code(), Code::Unimplemented); + // W-B Phase A: see comment in + // `refusing_provider_sandbox_set_returns_unimplemented` — the + // coarse gate fires first, so the operation is `mutation`. + assert!(err.message().contains("mutation")); + } + + #[tokio::test] + async fn refusing_provider_merge_ops_returns_unimplemented() { + use openshell_core::proto::{ + NetworkEndpoint, NetworkPolicyRule, PolicyMergeOperation, SandboxPhase, SandboxSpec, + policy_merge_operation, + }; + let state = test_server_state().await; + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-merge".to_string(), + name: "sb-merge".to_string(), + created_at_ms: 1_000_000, + labels: HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: Some(ProtoSandboxPolicy::default()), + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Provisioning as i32); + state.store.put_message(&sandbox).await.unwrap(); + + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let merge_op = PolicyMergeOperation { + operation: Some(policy_merge_operation::Operation::AddRule( + openshell_core::proto::AddNetworkRule { + rule_name: "test-rule".to_string(), + rule: Some(NetworkPolicyRule { + endpoints: vec![NetworkEndpoint { + host: "example.com".to_string(), + port: 443, + ..Default::default() + }], + ..Default::default() + }), + }, + )), + }; + let req = with_user(Request::new(UpdateConfigRequest { + name: "sb-merge".to_string(), + merge_operations: vec![merge_op], + ..Default::default() + })); + let err = handle_update_config(&state, req) + .await + .expect_err("refusing provider must reject update_policy"); + assert_eq!(err.code(), Code::Unimplemented); + // W-B Phase A: see comment in + // `refusing_provider_sandbox_set_returns_unimplemented` — the + // coarse gate fires first, so the operation is `mutation`. + assert!(err.message().contains("mutation")); + } + + /// Sanity-check that the default `local` provider preserves the + /// status-quo behavior for the sandbox-scoped policy replacement + /// path: the gRPC handler still returns version=1 and a non-empty + /// hash for a brand-new sandbox policy (first-time discovery / + /// backfill). + #[tokio::test] + async fn local_provider_sandbox_set_succeeds_with_version_and_hash() { + use openshell_core::proto::{SandboxPhase, SandboxSpec}; + let state = test_server_state().await; + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-local".to_string(), + name: "sb-local".to_string(), + created_at_ms: 1_000_000, + labels: HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Provisioning as i32); + state.store.put_message(&sandbox).await.unwrap(); + + let req = with_user(Request::new(UpdateConfigRequest { + name: "sb-local".to_string(), + policy: Some(ProtoSandboxPolicy::default()), + ..Default::default() + })); + let resp = handle_update_config(&state, req) + .await + .expect("local provider must accept set_policy"); + let resp = resp.into_inner(); + assert_eq!(resp.version, 1); + assert!(!resp.policy_hash.is_empty()); + } + + // ---- permits_mutation chunk-handler gating (W-B Phase A) ---- + // + // Each chunk handler reachable via gRPC must call `permits_mutation` + // as its first post-authz action; an alternate provider that returns + // `Unsupported` from the trait default must short-circuit every + // handler before any DB write. These tests stand in for the + // forthcoming `AttestedPolicyProvider`: today's `RefusingProvider` + // stub inherits the same default impl the attested provider will, so + // verifying the gRPC layer here is equivalent to verifying the + // attested-mode rejection path the gateway will expose in Phase B. + + /// Helper: seed a single pending draft chunk for the given sandbox so + /// the per-chunk handlers (`approve`, `reject`, `edit`, `undo`) have a + /// target row to attempt to mutate. Returns the chunk id. + async fn seed_pending_chunk( + state: &Arc, + sandbox_id: &str, + chunk_id: &str, + ) -> String { + use openshell_core::proto::{NetworkBinary, NetworkEndpoint, NetworkPolicyRule}; + let rule = NetworkPolicyRule { + name: "seed-rule".to_string(), + endpoints: vec![NetworkEndpoint { + host: "example.com".to_string(), + port: 443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + let chunk = DraftChunkRecord { + id: chunk_id.to_string(), + sandbox_id: sandbox_id.to_string(), + draft_version: 1, + status: "pending".to_string(), + rule_name: "seed-rule".to_string(), + proposed_rule: rule.encode_to_vec(), + rationale: "seed".to_string(), + security_notes: String::new(), + confidence: 1.0, + created_at_ms: 1_000_000, + decided_at_ms: None, + host: "example.com".to_string(), + port: 443, + binary: "/usr/bin/curl".to_string(), + hit_count: 1, + first_seen_ms: 0, + last_seen_ms: 0, + validation_result: String::new(), + rejection_reason: String::new(), + }; + state + .store + .put_draft_chunk(&chunk, None) + .await + .expect("seed chunk persists") + } + + /// Helper: seed an approved draft chunk (for `undo`, which requires + /// the target chunk to be in the `approved` state). + async fn seed_approved_chunk( + state: &Arc, + sandbox_id: &str, + chunk_id: &str, + ) -> String { + let id = seed_pending_chunk(state, sandbox_id, chunk_id).await; + state + .store + .update_draft_chunk_status(&id, "approved", Some(1_000_001), None) + .await + .expect("flip to approved"); + id + } + + /// Helper: snapshot all chunks for a sandbox so a test can assert the + /// refused handler made no draft-chunk write. Catches the regression + /// where someone moves the gate after a status update or row insert. + async fn chunk_snapshot( + state: &Arc, + sandbox_id: &str, + ) -> Vec<(String, String, Vec, String)> { + let chunks = state + .store + .list_draft_chunks(sandbox_id, None) + .await + .expect("list chunks"); + chunks + .into_iter() + .map(|c| (c.id, c.status, c.proposed_rule, c.rejection_reason)) + .collect() + } + + /// Helper: put a Ready sandbox with no baseline policy so the + /// chunk-handler preconditions (sandbox exists, not global-managed) + /// pass through to the provider gate. + async fn put_test_sandbox(state: &Arc, id: &str, name: &str) { + use openshell_core::proto::{SandboxPhase, SandboxSpec}; + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: id.to_string(), + name: name.to_string(), + created_at_ms: 1_000_000, + labels: HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Ready as i32); + state.store.put_message(&sandbox).await.unwrap(); + } + + #[tokio::test] + async fn refusing_provider_submit_policy_analysis_returns_unimplemented() { + use openshell_core::proto::{NetworkBinary, NetworkEndpoint, NetworkPolicyRule, PolicyChunk}; + let state = test_server_state().await; + put_test_sandbox(&state, "sb-submit", "sb-submit").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-submit").await; + let req = with_user(Request::new(SubmitPolicyAnalysisRequest { + name: "sb-submit".to_string(), + proposed_chunks: vec![PolicyChunk { + rule_name: "blocked".to_string(), + proposed_rule: Some(NetworkPolicyRule { + name: "blocked".to_string(), + endpoints: vec![NetworkEndpoint { + host: "example.com".to_string(), + port: 443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }), + confidence: 0.9, + hit_count: 1, + ..Default::default() + }], + ..Default::default() + })); + let err = handle_submit_policy_analysis(&state, req) + .await + .expect_err("refusing provider must reject submit_policy_analysis"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-submit").await; + assert_eq!( + before, after, + "refused submit_policy_analysis must not write to the draft-chunk store" + ); + } + + #[tokio::test] + async fn refusing_provider_approve_draft_chunk_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-approve", "sb-approve").await; + seed_pending_chunk(&state, "sb-approve", "chunk-approve").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-approve").await; + let req = Request::new(ApproveDraftChunkRequest { + name: "sb-approve".to_string(), + chunk_id: "chunk-approve".to_string(), + }); + let err = handle_approve_draft_chunk(&state, req) + .await + .expect_err("refusing provider must reject approve_draft_chunk"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-approve").await; + assert_eq!( + before, after, + "refused approve_draft_chunk must not flip chunk status" + ); + } + + #[tokio::test] + async fn refusing_provider_reject_draft_chunk_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-reject", "sb-reject").await; + seed_pending_chunk(&state, "sb-reject", "chunk-reject").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-reject").await; + let req = Request::new(RejectDraftChunkRequest { + name: "sb-reject".to_string(), + chunk_id: "chunk-reject".to_string(), + reason: "test".to_string(), + }); + let err = handle_reject_draft_chunk(&state, req) + .await + .expect_err("refusing provider must reject reject_draft_chunk"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-reject").await; + assert_eq!( + before, after, + "refused reject_draft_chunk must not flip chunk status or persist reason" + ); + } + + #[tokio::test] + async fn refusing_provider_approve_all_draft_chunks_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-all", "sb-all").await; + seed_pending_chunk(&state, "sb-all", "chunk-all-a").await; + seed_pending_chunk(&state, "sb-all", "chunk-all-b").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-all").await; + let req = Request::new(ApproveAllDraftChunksRequest { + name: "sb-all".to_string(), + include_security_flagged: true, + }); + let err = handle_approve_all_draft_chunks(&state, req) + .await + .expect_err("refusing provider must reject approve_all_draft_chunks"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-all").await; + assert_eq!( + before, after, + "refused approve_all_draft_chunks must not flip any chunk status" + ); + } + + #[tokio::test] + async fn refusing_provider_edit_draft_chunk_returns_unimplemented() { + use openshell_core::proto::{NetworkBinary, NetworkEndpoint, NetworkPolicyRule}; + let state = test_server_state().await; + put_test_sandbox(&state, "sb-edit", "sb-edit").await; + seed_pending_chunk(&state, "sb-edit", "chunk-edit").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-edit").await; + let req = Request::new(EditDraftChunkRequest { + name: "sb-edit".to_string(), + chunk_id: "chunk-edit".to_string(), + proposed_rule: Some(NetworkPolicyRule { + name: "edited".to_string(), + endpoints: vec![NetworkEndpoint { + host: "edited.example".to_string(), + port: 8443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/edited".to_string(), + ..Default::default() + }], + }), + }); + let err = handle_edit_draft_chunk(&state, req) + .await + .expect_err("refusing provider must reject edit_draft_chunk"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-edit").await; + assert_eq!( + before, after, + "refused edit_draft_chunk must not rewrite proposed_rule bytes" + ); + } + + #[tokio::test] + async fn refusing_provider_undo_draft_chunk_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-undo", "sb-undo").await; + seed_approved_chunk(&state, "sb-undo", "chunk-undo").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-undo").await; + let req = Request::new(UndoDraftChunkRequest { + name: "sb-undo".to_string(), + chunk_id: "chunk-undo".to_string(), + }); + let err = handle_undo_draft_chunk(&state, req) + .await + .expect_err("refusing provider must reject undo_draft_chunk"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-undo").await; + assert_eq!( + before, after, + "refused undo_draft_chunk must not flip chunk status back to pending" + ); + } + + #[tokio::test] + async fn refusing_provider_clear_draft_chunks_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-clear", "sb-clear").await; + seed_pending_chunk(&state, "sb-clear", "chunk-clear-a").await; + seed_pending_chunk(&state, "sb-clear", "chunk-clear-b").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-clear").await; + let req = Request::new(ClearDraftChunksRequest { + name: "sb-clear".to_string(), + }); + let err = handle_clear_draft_chunks(&state, req) + .await + .expect_err("refusing provider must reject clear_draft_chunks"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-clear").await; + assert_eq!( + before, after, + "refused clear_draft_chunks must not delete any chunks" + ); + } } diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index f59ec22df..8a8fe6eaa 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -30,6 +30,7 @@ mod http; mod inference; mod multiplex; mod persistence; +pub(crate) mod policy_provider; pub(crate) mod policy_store; mod provider_refresh; mod readiness; @@ -79,6 +80,15 @@ pub struct ServerState { /// Persistence store. pub store: Arc, + /// Active policy provider. + /// + /// All `openshell policy set | update | delete` requests at the gRPC + /// layer route through this provider. The `local` type writes directly + /// to [`Self::store`]; other policy types may refuse mutations via + /// `PolicyError::Unsupported`, which the gRPC layer translates to + /// `Status::unimplemented`. + pub policy_provider: Arc, + /// Compute orchestration over the configured driver. pub compute: ComputeRuntime, @@ -148,6 +158,12 @@ fn is_benign_connection_close(error: &(dyn std::error::Error + 'static)) -> bool impl ServerState { /// Create new server state. + /// + /// Selects the default `local` policy provider. Callers that need to + /// substitute a different provider (test fixtures wanting to exercise + /// `Unsupported` behavior; or, when it lands, the `attested` policy + /// type) can replace `policy_provider` on the constructed value before + /// the state is shared. #[must_use] #[allow(clippy::too_many_arguments)] pub fn new( @@ -160,9 +176,12 @@ impl ServerState { supervisor_sessions: Arc, oidc_cache: Option>, ) -> Self { + let policy_provider: Arc = + Arc::new(policy_provider::LocalPolicyProvider::new(store.clone())); Self { config, store, + policy_provider, compute, sandbox_index, sandbox_watch_bus, @@ -244,6 +263,12 @@ pub async fn run_server( oidc_cache, ); + // Override the default `local` policy provider when the config file + // selects a different policy type. + if let Some(provider) = resolve_policy_provider(config_file.as_ref(), store.clone()).await? { + state.policy_provider = provider; + } + // Load the gateway-minted sandbox JWT signing key when configured. // Optional so single-driver dev deployments without certgen continue // to start. The helm-deployed gateway and the RPM init script populate @@ -454,6 +479,82 @@ pub async fn run_server( Ok(()) } +/// Resolve the configured policy provider, if the config file selects one. +/// Returns `Ok(None)` when no override is needed (the default `local` +/// provider from `ServerState::new` already covers that case). +async fn resolve_policy_provider( + config_file: Option<&config_file::ConfigFile>, + store: Arc, +) -> Result>> { + let Some(file) = config_file else { + return Ok(None); + }; + let Some(policy) = file.openshell.policy.as_ref() else { + return Ok(None); + }; + let Some(policy_type) = policy.r#type.as_deref() else { + return Ok(None); + }; + + match policy_type { + policy_provider::LOCAL_POLICY_TYPE_ID => Ok(Some(Arc::new( + policy_provider::LocalPolicyProvider::new(store), + ))), + policy_provider::ATTESTED_POLICY_TYPE_ID => { + Ok(Some(build_attested_policy_provider(policy).await?)) + } + // Unreachable in practice — `config_file::load` already rejects + // unknown policy type names. Defensive for any straggler. + other => Err(Error::config(format!( + "unknown policy provider type '{other}'" + ))), + } +} + +/// Construct an `AttestedPolicyProvider` from the parsed `[openshell.policy]` +/// table. `config_file::load` has already validated the required fields +/// are present. +async fn build_attested_policy_provider( + policy: &config_file::PolicyFileSection, +) -> Result> { + let source_uds_path = policy + .source_uds_path + .as_ref() + .expect("source_uds_path must be present (validated at config load)"); + let trust_store_path = policy + .trust_store_path + .as_ref() + .expect("trust_store_path must be present (validated at config load)"); + + let trust_store = policy_provider::TrustStore::load(trust_store_path).map_err(|e| { + Error::config(format!( + "failed to load policy trust store from '{}': {e}", + trust_store_path.display() + )) + })?; + + let source = policy_provider::GrpcPolicySource::connect(source_uds_path) + .await + .map_err(|e| { + Error::config(format!( + "failed to connect to policy source at '{}': {e}", + source_uds_path.display() + )) + })?; + + let provider = policy_provider::AttestedPolicyProvider::new(Arc::new(source), trust_store) + .await + .map_err(|e| Error::config(format!("attested policy provider startup failed: {e}")))?; + + info!( + source_uds_path = %source_uds_path.display(), + trust_store_path = %trust_store_path.display(), + "attested policy provider initialized" + ); + + Ok(Arc::new(provider)) +} + fn gateway_listener_addresses( bind_address: SocketAddr, extra_addresses: &[SocketAddr], diff --git a/crates/openshell-server/src/policy_provider/attested.rs b/crates/openshell-server/src/policy_provider/attested.rs new file mode 100644 index 000000000..627869397 --- /dev/null +++ b/crates/openshell-server/src/policy_provider/attested.rs @@ -0,0 +1,647 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Attested policy-provider driver. +//! +//! Resolves a sandbox's effective policy by talking to a configured policy +//! source over the wire trait. The driver: +//! +//! 1. Builds a runtime context for the sandbox. +//! 2. Acquires a handle from the configured source. +//! 3. Fetches the projection envelope for the OpenShell sandbox surface. +//! 4. Verifies the envelope signature against the configured trust +//! store. +//! 5. Decodes the policy body and returns it. +//! 6. Releases the handle. +//! +//! The driver inherits the trait's default `Unsupported` impls for +//! `set_policy` / `update_policy` / `delete_policy` / `permits_mutation` — +//! mutation is not part of this driver's surface. + +use std::sync::Arc; +use std::time::SystemTime; + +use async_trait::async_trait; +use prost::Message; +use tracing::warn; + +use super::source::{ + canonical_projection_bytes, PolicySource, PolicySourceError, ProjectionEnvelope, + RuntimeContext, +}; +#[cfg(test)] +use super::source::Handle; +use super::trust_store::{TrustStore, TrustStoreError}; +use super::{PolicyError, PolicyProvider, ATTESTED_POLICY_TYPE_ID}; + +/// Surface id this driver fetches by default. Matches the gateway's +/// canonical sandbox policy schema. +const SANDBOX_POLICY_SURFACE_ID: &str = "openshell.sandbox.v1"; + +/// Attested policy provider. +/// +/// Routes `get_effective_policy` through the configured policy source and +/// admits the returned policy only if the envelope signature verifies +/// against the trust store. Inherits the trait's default `Unsupported` +/// behaviour for every mutator. +#[derive(Debug)] +pub struct AttestedPolicyProvider { + source: Arc, + trust_store: TrustStore, +} + +impl AttestedPolicyProvider { + /// Construct the driver. Runs an initial `health` round-trip against + /// the source so a misconfigured deployment surfaces at gateway + /// startup rather than on the first sandbox admission. + pub async fn new( + source: Arc, + trust_store: TrustStore, + ) -> Result { + source.health().await?; + Ok(Self { + source, + trust_store, + }) + } +} + +#[async_trait] +impl PolicyProvider for AttestedPolicyProvider { + fn id(&self) -> &'static str { + ATTESTED_POLICY_TYPE_ID + } + + async fn get_effective_policy( + &self, + sandbox_id: &str, + ) -> Result, PolicyError> { + // The gateway has not yet wired user-subject capture into this + // call path; for now the runtime context carries the sandbox id + // alone. User-subject capture is part of the auth-mode gate work + // (deferred follow-up). + let ctx = RuntimeContext { + sandbox_id: sandbox_id.to_string(), + user_subject: String::new(), + attested_at: SystemTime::now(), + signature: Vec::new(), + }; + + let handle = self + .source + .acquire_handle(&ctx) + .await + .map_err(PolicyError::from)?; + + let envelope = match self + .source + .get_projection(&handle, SANDBOX_POLICY_SURFACE_ID) + .await + { + Ok(env) => env, + Err(e) => { + // Best-effort cleanup. Release errors are not fatal here + // because the original projection error is the + // load-bearing failure to surface. + let _ = self.source.release_handle(&handle).await; + return Err(PolicyError::from(e)); + } + }; + + // Signature verification. Two valid states: + // + // - Both signature and signing_key_id are populated → verify + // against the trust store; reject on any failure. + // - Both are empty → admit with a one-time warning per call. + // This is the v0 fallback for sources that have not yet + // shipped attestation. When the source starts emitting + // signed envelopes this branch stops firing automatically. + // + // (Mismatched populated/empty pairs are filtered upstream in + // the source impl and surface as `PolicySourceError::Decode`.) + let verify_result = match (envelope.signature.is_empty(), &envelope.signing_key_id) { + (true, None) => { + warn!( + sandbox_id, + "policy source returned an unsigned envelope; admitting under v0 fallback" + ); + Ok(()) + } + (false, Some(key_id)) => { + let payload = canonical_projection_bytes(&envelope); + self.trust_store + .verify(key_id, &payload, &envelope.signature) + .map_err(PolicyError::from) + } + // Source impl rejects these combinations before returning to + // the driver; defensive handling for completeness. + _ => Err(PolicyError::SourceError(PolicySourceError::Decode( + "envelope signature/key_id presence mismatch".to_string(), + ))), + }; + + if let Err(e) = verify_result { + let _ = self.source.release_handle(&handle).await; + return Err(e); + } + + let policy = match decode_sandbox_policy(&envelope) { + Ok(p) => p, + Err(e) => { + let _ = self.source.release_handle(&handle).await; + return Err(e); + } + }; + + // Release immediately for this phase. Handle persistence — the + // story under which the gateway retains handles across sandbox + // lifetimes and releases them only on sandbox deletion — is a + // follow-up. + if let Err(release_err) = self.source.release_handle(&handle).await { + warn!( + sandbox_id, + error = %release_err, + "policy source release_handle failed; admission proceeds" + ); + } + + Ok(Some(policy)) + } +} + +fn decode_sandbox_policy( + envelope: &ProjectionEnvelope, +) -> Result { + if envelope.surface_id != SANDBOX_POLICY_SURFACE_ID { + return Err(PolicyError::SourceError(PolicySourceError::Decode(format!( + "expected surface_id '{SANDBOX_POLICY_SURFACE_ID}', got '{}'", + envelope.surface_id + )))); + } + openshell_core::proto::SandboxPolicy::decode(envelope.body.as_slice()).map_err(|e| { + PolicyError::SourceError(PolicySourceError::Decode(format!( + "decode sandbox policy body failed: {e}" + ))) + }) +} + +impl From for PolicyError { + fn from(e: TrustStoreError) -> Self { + Self::SourceError(PolicySourceError::Rejected { + reason: e.to_string(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use openshell_core::proto::SandboxPolicy; + use std::sync::Mutex; + + // ----- Mock source ------------------------------------------------- + + type HealthFn = Box Result<(), PolicySourceError> + Send + Sync>; + type AcquireFn = + Box Result + Send + Sync>; + type GetFn = Box< + dyn Fn(&Handle, &str) -> Result + Send + Sync, + >; + type ReleaseFn = Box Result<(), PolicySourceError> + Send + Sync>; + + /// Test fixture standing in for a real engine on the wire. Each call + /// site overrides the relevant closure; the rest default to "OK". + #[derive(Default)] + struct MockPolicySource { + health_fn: Mutex>, + acquire_fn: Mutex>, + get_fn: Mutex>, + release_fn: Mutex>, + release_count: std::sync::atomic::AtomicUsize, + } + + impl std::fmt::Debug for MockPolicySource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MockPolicySource").finish() + } + } + + impl MockPolicySource { + fn with_health(self, f: HealthFn) -> Self { + *self.health_fn.lock().unwrap() = Some(f); + self + } + fn with_acquire(self, f: AcquireFn) -> Self { + *self.acquire_fn.lock().unwrap() = Some(f); + self + } + fn with_get(self, f: GetFn) -> Self { + *self.get_fn.lock().unwrap() = Some(f); + self + } + #[allow(dead_code)] + fn with_release(self, f: ReleaseFn) -> Self { + *self.release_fn.lock().unwrap() = Some(f); + self + } + fn release_count(&self) -> usize { + self.release_count + .load(std::sync::atomic::Ordering::SeqCst) + } + } + + #[async_trait] + impl PolicySource for MockPolicySource { + async fn health(&self) -> Result<(), PolicySourceError> { + let guard = self.health_fn.lock().unwrap(); + match guard.as_ref() { + Some(f) => f(), + None => Ok(()), + } + } + + async fn acquire_handle( + &self, + ctx: &RuntimeContext, + ) -> Result { + let guard = self.acquire_fn.lock().unwrap(); + match guard.as_ref() { + Some(f) => f(ctx), + None => Ok(Handle::new(b"default-handle".to_vec())), + } + } + + async fn get_projection( + &self, + handle: &Handle, + surface_id: &str, + ) -> Result { + let guard = self.get_fn.lock().unwrap(); + match guard.as_ref() { + Some(f) => f(handle, surface_id), + None => Err(PolicySourceError::Decode("no get fixture set".into())), + } + } + + async fn release_handle(&self, handle: &Handle) -> Result<(), PolicySourceError> { + self.release_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let guard = self.release_fn.lock().unwrap(); + match guard.as_ref() { + Some(f) => f(handle), + None => Ok(()), + } + } + } + + // ----- Helpers ----------------------------------------------------- + + fn fresh_keypair() -> (ed25519_dalek::SigningKey, ed25519_dalek::VerifyingKey) { + use rand_core_06::OsRng; + let sk = ed25519_dalek::SigningKey::generate(&mut OsRng); + let vk = sk.verifying_key(); + (sk, vk) + } + + fn trust_store_with(key_id: &str, vk: ed25519_dalek::VerifyingKey) -> TrustStore { + let mut map = std::collections::HashMap::new(); + map.insert(key_id.to_string(), vk); + TrustStore::from_keys(map) + } + + fn signed_envelope(sk: &ed25519_dalek::SigningKey, key_id: &str) -> ProjectionEnvelope { + use ed25519_dalek::Signer; + let policy = SandboxPolicy { + version: 7, + ..Default::default() + }; + let body = policy.encode_to_vec(); + let mut env = ProjectionEnvelope { + surface_id: SANDBOX_POLICY_SURFACE_ID.to_string(), + schema_version: "1".to_string(), + policy_digest: vec![0xaa; 32], + bundle_digest: vec![0xbb; 32], + body, + signature: Vec::new(), + signing_key_id: None, + }; + let payload = canonical_projection_bytes(&env); + let sig = sk.sign(&payload).to_bytes(); + env.signature = sig.to_vec(); + env.signing_key_id = Some(key_id.to_string()); + env + } + + fn unsigned_envelope() -> ProjectionEnvelope { + let policy = SandboxPolicy { + version: 3, + ..Default::default() + }; + ProjectionEnvelope { + surface_id: SANDBOX_POLICY_SURFACE_ID.to_string(), + schema_version: "1".to_string(), + policy_digest: vec![], + bundle_digest: vec![], + body: policy.encode_to_vec(), + signature: Vec::new(), + signing_key_id: None, + } + } + + // ----- Tests ------------------------------------------------------- + + #[tokio::test] + async fn new_fails_when_source_health_fails() { + let source = Arc::new(MockPolicySource::default().with_health(Box::new(|| { + Err(PolicySourceError::Connect("nope".into())) + }))); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let err = AttestedPolicyProvider::new(source, ts) + .await + .expect_err("health failure must surface as constructor error"); + assert!(matches!(err, PolicySourceError::Connect(_))); + } + + #[tokio::test] + async fn get_effective_policy_returns_some_on_valid_signed_envelope() { + let (sk, vk) = fresh_keypair(); + let env = signed_envelope(&sk, "k-1"); + + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(move |_h, _s| Ok(env.clone()))), + ); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let policy = driver + .get_effective_policy("sb-1") + .await + .expect("get_effective_policy ok") + .expect("policy present"); + assert_eq!(policy.version, 7); + // Released exactly once after a successful round-trip. + assert_eq!(source.release_count(), 1); + } + + #[tokio::test] + async fn get_effective_policy_admits_unsigned_envelope_in_v0_fallback() { + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(|_h, _s| Ok(unsigned_envelope()))), + ); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let policy = driver + .get_effective_policy("sb-1") + .await + .expect("get_effective_policy ok") + .expect("policy present"); + assert_eq!(policy.version, 3); + assert_eq!(source.release_count(), 1); + } + + #[tokio::test] + async fn get_effective_policy_rejects_signed_envelope_with_unknown_key_id() { + let (sk_other, _) = fresh_keypair(); + let env = signed_envelope(&sk_other, "unknown-key"); + + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(move |_h, _s| Ok(env.clone()))), + ); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("unknown key must reject"); + match err { + PolicyError::SourceError(PolicySourceError::Rejected { reason }) => { + assert!( + reason.contains("unknown-key"), + "reason should mention rejected key: {reason}" + ); + } + other => panic!("expected SourceError(Rejected), got {other:?}"), + } + // Handle still released even on rejection. + assert_eq!(source.release_count(), 1); + } + + #[tokio::test] + async fn get_effective_policy_rejects_signed_envelope_with_tampered_body() { + let (sk, vk) = fresh_keypair(); + let mut env = signed_envelope(&sk, "k-1"); + // Tamper with the body after signing. + env.body = SandboxPolicy { + version: 99, + ..Default::default() + } + .encode_to_vec(); + + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(move |_h, _s| Ok(env.clone()))), + ); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("tampered body must reject"); + assert!(matches!( + err, + PolicyError::SourceError(PolicySourceError::Rejected { .. }) + )); + } + + #[tokio::test] + async fn get_effective_policy_surfaces_acquire_failure() { + let source = Arc::new(MockPolicySource::default().with_acquire(Box::new(|_ctx| { + Err(PolicySourceError::Connect("unreachable".into())) + }))); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("acquire failure must propagate"); + assert!(matches!( + err, + PolicyError::SourceError(PolicySourceError::Connect(_)) + )); + // Nothing to release. + assert_eq!(source.release_count(), 0); + } + + #[tokio::test] + async fn get_effective_policy_releases_handle_on_get_projection_failure() { + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(|_h, _s| { + Err(PolicySourceError::Decode("bad bytes".into())) + })), + ); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("get failure must propagate"); + assert!(matches!( + err, + PolicyError::SourceError(PolicySourceError::Decode(_)) + )); + assert_eq!(source.release_count(), 1); + } + + #[tokio::test] + async fn driver_id_is_attested_constant() { + let source = Arc::new(MockPolicySource::default()); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source, ts).await.expect("ok"); + assert_eq!(driver.id(), ATTESTED_POLICY_TYPE_ID); + assert_eq!(driver.id(), "attested"); + } + + #[tokio::test] + async fn driver_inherits_unsupported_for_mutators() { + let source = Arc::new(MockPolicySource::default()); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source, ts).await.expect("ok"); + + let err = driver + .permits_mutation() + .await + .expect_err("attested must refuse mutation surface"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "attested", + operation: "mutation" + } + )); + + let err = driver + .set_policy(&super::super::SetSandboxPolicyCtx { + sandbox_id: "sb".into(), + sandbox_name: "sb".into(), + expected_resource_version: 0, + policy: SandboxPolicy::default(), + }) + .await + .expect_err("attested must refuse set_policy"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "attested", + operation: "set_policy" + } + )); + + let err = driver + .update_policy(&super::super::UpdateSandboxPolicyCtx { + sandbox_id: "sb".into(), + sandbox_name: "sb".into(), + merge_operations: vec![], + baseline_policy: None, + }) + .await + .expect_err("attested must refuse update_policy"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "attested", + operation: "update_policy" + } + )); + + let err = driver + .delete_policy(&super::super::DeleteGlobalPolicyCtx { + global_policy_sandbox_id: "__global__".into(), + }) + .await + .expect_err("attested must refuse delete_policy"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "attested", + operation: "delete_policy" + } + )); + } + + #[tokio::test] + async fn get_effective_policy_rejects_envelope_with_wrong_surface_id() { + let (sk, vk) = fresh_keypair(); + // Build an envelope whose surface_id is wrong; sign it correctly + // so the signature passes verification and the surface check is + // the dispositive failure. + use ed25519_dalek::Signer; + let policy = SandboxPolicy { + version: 1, + ..Default::default() + }; + let mut env = ProjectionEnvelope { + surface_id: "openshell.something.v1".to_string(), + schema_version: "1".to_string(), + policy_digest: vec![0xaa; 32], + bundle_digest: vec![0xbb; 32], + body: policy.encode_to_vec(), + signature: Vec::new(), + signing_key_id: None, + }; + let payload = canonical_projection_bytes(&env); + env.signature = sk.sign(&payload).to_bytes().to_vec(); + env.signing_key_id = Some("k-1".to_string()); + + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(move |_h, _s| Ok(env.clone()))), + ); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("wrong surface must reject"); + assert!(matches!( + err, + PolicyError::SourceError(PolicySourceError::Decode(_)) + )); + } +} diff --git a/crates/openshell-server/src/policy_provider/local.rs b/crates/openshell-server/src/policy_provider/local.rs new file mode 100644 index 000000000..1ea3e417f --- /dev/null +++ b/crates/openshell-server/src/policy_provider/local.rs @@ -0,0 +1,345 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Local (in-process, store-backed) policy provider. +//! +//! This is the `"local"` policy type — selected by the default +//! `[openshell.policy] type = "local"` (or by omitting the table entirely). +//! +//! Wraps [`crate::persistence::Store`] (which implements +//! [`crate::policy_store::PolicyStoreExt`]) and exposes the canonical +//! `PolicyProvider` operations. Behavior here is intentionally a thin +//! pass-through to the existing DB writes — the gRPC handler retains +//! validation, audit emission, sandbox-watch notification, and CAS retry +//! responsibility. +//! +//! The next session adds `AttestedPolicyProvider` as a sibling module here. + +use std::sync::Arc; + +use async_trait::async_trait; +use prost::Message; +use sha2::{Digest, Sha256}; + +use crate::persistence::Store; +use crate::policy_store::PolicyStoreExt; + +use super::{ + DeleteGlobalPolicyCtx, PolicyError, PolicyMutationOutcome, PolicyProvider, + SetSandboxPolicyCtx, UpdateSandboxPolicyCtx, LOCAL_POLICY_TYPE_ID, +}; + +/// Local policy provider — persists all mutations directly to the gateway's +/// own store. This is today's behavior, just routed through the trait. +#[derive(Debug, Clone)] +pub struct LocalPolicyProvider { + store: Arc, +} + +impl LocalPolicyProvider { + #[must_use] + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +/// SHA-256 over the canonical-encoded policy. Mirrors the +/// `deterministic_policy_hash` in `grpc/policy.rs`, kept private here so the +/// provider can stamp hashes without crossing the module boundary. Local +/// keeps it in sync structurally; the handler module remains the single +/// reference impl used by the rest of the codebase. +fn deterministic_policy_hash(policy: &openshell_core::proto::SandboxPolicy) -> String { + let mut hasher = Sha256::new(); + hasher.update(policy.version.to_le_bytes()); + if let Some(fs) = &policy.filesystem { + hasher.update(fs.encode_to_vec()); + } + if let Some(ll) = &policy.landlock { + hasher.update(ll.encode_to_vec()); + } + if let Some(p) = &policy.process { + hasher.update(p.encode_to_vec()); + } + let mut entries: Vec<_> = policy.network_policies.iter().collect(); + entries.sort_by_key(|(k, _)| k.as_str()); + for (key, value) in entries { + hasher.update(key.as_bytes()); + hasher.update(value.encode_to_vec()); + } + hex::encode(hasher.finalize()) +} + +#[async_trait] +impl PolicyProvider for LocalPolicyProvider { + fn id(&self) -> &'static str { + LOCAL_POLICY_TYPE_ID + } + + async fn permits_mutation(&self) -> Result<(), PolicyError> { + // The local provider owns the in-process policy store, so every + // mutation surface — the three canonical RPC mutators and the + // draft-chunk approval handlers — is supported. The attested + // provider will inherit the trait default (`Unsupported`) so the + // gateway's coarse gate refuses both surfaces uniformly. + Ok(()) + } + + async fn get_effective_policy( + &self, + sandbox_id: &str, + ) -> Result, PolicyError> { + let record = self.store.get_latest_policy(sandbox_id).await?; + match record { + Some(record) => { + let policy = openshell_core::proto::SandboxPolicy::decode( + record.policy_payload.as_slice(), + ) + .map_err(|e| { + PolicyError::Persistence(crate::persistence::PersistenceError::Decode(format!( + "decode policy failed: {e}" + ))) + })?; + Ok(Some(policy)) + } + None => Ok(None), + } + } + + async fn set_policy( + &self, + ctx: &SetSandboxPolicyCtx, + ) -> Result { + let payload = ctx.policy.encode_to_vec(); + let hash = deterministic_policy_hash(&ctx.policy); + + let latest = self.store.get_latest_policy(&ctx.sandbox_id).await?; + + // Idempotent set: same hash already at HEAD → return the existing + // version without bumping. Matches the no-op short-circuit the + // handler had before the provider seam was introduced. + if let Some(ref current) = latest + && current.policy_hash == hash + { + return Ok(PolicyMutationOutcome { + version: u32::try_from(current.version).unwrap_or(0), + policy_hash: hash, + settings_revision: 0, + deleted: false, + }); + } + + let next_version = latest.map_or(1, |r| r.version + 1); + let policy_id = uuid::Uuid::new_v4().to_string(); + + self.store + .put_policy_revision(&policy_id, &ctx.sandbox_id, next_version, &payload, &hash) + .await?; + + // Best-effort cleanup of older revisions. Matches the handler's + // `let _ = ...` pattern — supersession failure is not a control-flow + // signal here; the new revision is still authoritative. + let _ = self + .store + .supersede_older_policies(&ctx.sandbox_id, next_version) + .await; + + Ok(PolicyMutationOutcome { + version: u32::try_from(next_version).unwrap_or(0), + policy_hash: hash, + settings_revision: 0, + deleted: false, + }) + } + + async fn update_policy( + &self, + ctx: &UpdateSandboxPolicyCtx, + ) -> Result { + // The merge-with-retry loop, the static-fields-unchanged validation, + // and the safety validation live in the gRPC handler because they + // emit `tonic::Status::invalid_argument` directly for client-visible + // errors. The provider just confirms the operation is supported and + // re-runs the existing handler-side helper. We surface this by + // returning a sentinel zero-version outcome that the handler ignores + // and replaces with the value `apply_merge_operations_with_retry` + // computed; only the gating semantics travel through the trait here. + // + // The attested provider will return `Unsupported` from the default + // impl, which is what `openshell policy update` needs to reject. + let _ = (&ctx.sandbox_id, &ctx.sandbox_name, &ctx.merge_operations, &ctx.baseline_policy); + Ok(PolicyMutationOutcome::default()) + } + + async fn delete_policy( + &self, + ctx: &DeleteGlobalPolicyCtx, + ) -> Result { + // Local supports the global-policy delete. Mirror the handler's + // existing logic: if a latest global revision exists, mark all + // earlier revisions superseded. The handler still owns the global + // settings map mutation and the audit emission; this method only + // gates the operation through the trait so the attested provider's + // default `Unsupported` is what the handler sees there. + if let Ok(Some(latest)) = self + .store + .get_latest_policy(&ctx.global_policy_sandbox_id) + .await + { + let _ = self + .store + .supersede_older_policies(&ctx.global_policy_sandbox_id, latest.version + 1) + .await; + } + Ok(PolicyMutationOutcome::default()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::persistence::Store; + use openshell_core::proto::SandboxPolicy; + + async fn fresh_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:?cache=shared") + .await + .expect("in-memory sqlite connects"), + ) + } + + #[tokio::test] + async fn permits_mutation_is_ok_for_local_provider() { + let p = LocalPolicyProvider::new(fresh_store().await); + p.permits_mutation() + .await + .expect("local provider permits the entire mutation surface"); + } + + #[tokio::test] + async fn id_is_local() { + let p = LocalPolicyProvider::new(fresh_store().await); + assert_eq!(p.id(), LOCAL_POLICY_TYPE_ID); + assert_eq!(p.id(), "local"); + } + + #[tokio::test] + async fn get_effective_policy_returns_none_when_no_revision() { + let p = LocalPolicyProvider::new(fresh_store().await); + let got = p.get_effective_policy("sb-fresh").await.unwrap(); + assert!(got.is_none()); + } + + #[tokio::test] + async fn set_policy_persists_and_get_effective_policy_returns_it() { + let store = fresh_store().await; + let p = LocalPolicyProvider::new(store.clone()); + let policy = SandboxPolicy { + version: 1, + ..Default::default() + }; + let out = p + .set_policy(&SetSandboxPolicyCtx { + sandbox_id: "sb-1".to_string(), + sandbox_name: "sb-1".to_string(), + expected_resource_version: 0, + policy: policy.clone(), + }) + .await + .expect("set_policy succeeds"); + assert_eq!(out.version, 1); + assert!(!out.policy_hash.is_empty()); + + let fetched = p + .get_effective_policy("sb-1") + .await + .expect("get_effective_policy ok") + .expect("policy present"); + assert_eq!(fetched.version, 1); + } + + #[tokio::test] + async fn set_policy_is_idempotent_on_same_hash() { + let store = fresh_store().await; + let p = LocalPolicyProvider::new(store); + let policy = SandboxPolicy { + version: 1, + ..Default::default() + }; + let ctx = SetSandboxPolicyCtx { + sandbox_id: "sb-2".to_string(), + sandbox_name: "sb-2".to_string(), + expected_resource_version: 0, + policy: policy.clone(), + }; + let first = p.set_policy(&ctx).await.unwrap(); + let second = p.set_policy(&ctx).await.unwrap(); + assert_eq!(first.version, second.version); + assert_eq!(first.policy_hash, second.policy_hash); + } + + #[tokio::test] + async fn set_policy_bumps_version_on_distinct_hash() { + let store = fresh_store().await; + let p = LocalPolicyProvider::new(store); + let policy_a = SandboxPolicy { + version: 1, + ..Default::default() + }; + let policy_b = SandboxPolicy { + version: 2, + ..Default::default() + }; + let ctx_a = SetSandboxPolicyCtx { + sandbox_id: "sb-3".to_string(), + sandbox_name: "sb-3".to_string(), + expected_resource_version: 0, + policy: policy_a, + }; + let ctx_b = SetSandboxPolicyCtx { + sandbox_id: "sb-3".to_string(), + sandbox_name: "sb-3".to_string(), + expected_resource_version: 0, + policy: policy_b, + }; + let a = p.set_policy(&ctx_a).await.unwrap(); + let b = p.set_policy(&ctx_b).await.unwrap(); + assert_eq!(a.version, 1); + assert_eq!(b.version, 2); + assert_ne!(a.policy_hash, b.policy_hash); + } + + #[tokio::test] + async fn delete_policy_is_ok_when_no_global_revision_exists() { + let p = LocalPolicyProvider::new(fresh_store().await); + // Should not error even though there is no global revision to + // supersede; matches the handler's `let _ = ...` semantics. + p.delete_policy(&DeleteGlobalPolicyCtx { + global_policy_sandbox_id: "__global__".to_string(), + }) + .await + .expect("delete with no revision is a no-op"); + } + + #[tokio::test] + async fn update_policy_is_ok_for_local_provider() { + let p = LocalPolicyProvider::new(fresh_store().await); + // The merge work lives in the handler today; the trait method just + // gates the operation. Local says "supported", attested would say + // Unsupported via the default impl. + let out = p + .update_policy(&UpdateSandboxPolicyCtx { + sandbox_id: "sb-4".to_string(), + sandbox_name: "sb-4".to_string(), + merge_operations: vec![], + baseline_policy: None, + }) + .await + .expect("update_policy supported on local"); + // Local returns the default sentinel — the handler retains the + // merge-with-retry work and builds the final response itself. + assert_eq!(out.version, 0); + assert!(out.policy_hash.is_empty()); + assert!(!out.deleted); + } +} diff --git a/crates/openshell-server/src/policy_provider/mod.rs b/crates/openshell-server/src/policy_provider/mod.rs new file mode 100644 index 000000000..e15b87a7b --- /dev/null +++ b/crates/openshell-server/src/policy_provider/mod.rs @@ -0,0 +1,334 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Pluggable policy-provider subsystem. +//! +//! The gateway today resolves an effective policy and accepts policy +//! mutations through inline calls to [`crate::persistence::Store`] from the +//! gRPC layer. This module promotes that surface into a trait so an +//! alternate provider (`AttestedPolicyProvider`, which consumes signed +//! projections from an out-of-process policy engine) can refuse the +//! mutator methods while still serving an authoritative effective policy +//! at admission time. +//! +//! The error type carries an `Unsupported { policy_type, operation }` +//! variant that maps to `tonic::Status::unimplemented` at the gRPC edge. +//! Resolution of `[openshell.policy] type` to the concrete provider lives +//! at the call site (`crate::resolve_policy_provider`) — a direct `match` +//! suffices for the small number of provider shapes. + +mod attested; +mod local; +mod source; +mod trust_store; + +use async_trait::async_trait; + +use crate::persistence::PersistenceError; + +pub use attested::AttestedPolicyProvider; +pub use local::LocalPolicyProvider; +pub use source::{GrpcPolicySource, PolicySourceError}; +pub use trust_store::TrustStore; + +/// Policy-type id for the in-process, store-backed policy provider. +pub const LOCAL_POLICY_TYPE_ID: &str = "local"; + +/// Policy-type id for the (forthcoming) Attested Policy Projection provider. +/// +/// Declared here so config validation can produce a friendly "policy type +/// not yet available" error rather than the generic "unknown policy type" +/// error a follow-up implementer's typo would produce. +pub const ATTESTED_POLICY_TYPE_ID: &str = "attested"; + +// --------------------------------------------------------------------------- +// Error type +// --------------------------------------------------------------------------- + +/// Errors returned by [`PolicyProvider`] implementations. +/// +/// `Unsupported` mirrors `openshell_providers::ProviderError::UnsupportedProvider` +/// — it carries enough context for the gRPC layer to surface the refusal as a +/// `Status::unimplemented` reply naming both the policy type and the +/// operation it refused. +#[derive(Debug, thiserror::Error)] +pub enum PolicyError { + /// The active policy provider does not implement this operation. The + /// default trait impl of `set_policy` / `update_policy` / + /// `delete_policy` returns this so a provider only needs to override + /// the operations it supports. The `policy_type` field carries the + /// provider's `id()` — the same string the config selector uses — so + /// audit and error messages can name it precisely. + #[error("policy type '{policy_type}' does not support operation '{operation}'")] + Unsupported { + policy_type: &'static str, + operation: &'static str, + }, + + /// Wraps a persistence-layer failure produced by the local provider. + /// The gRPC layer maps this back to the same status it would have + /// produced before the provider seam existed. + #[error("policy persistence error: {0}")] + Persistence(#[from] PersistenceError), + + /// Wraps an out-of-process policy source failure. The gRPC layer + /// maps this to `Status::unavailable` so callers retry rather than + /// treating the gateway as the source of the failure. + #[error("policy source error: {0}")] + SourceError(#[from] PolicySourceError), +} + +/// Context describing the canonical sandbox-scoped policy replacement +/// requested by a CLI `openshell policy set` (or equivalent gRPC +/// `UpdateConfig` call with `policy` set and no `merge_operations`). +#[derive(Debug, Clone)] +pub struct SetSandboxPolicyCtx { + pub sandbox_id: String, + pub sandbox_name: String, + pub expected_resource_version: u64, + pub policy: openshell_core::proto::SandboxPolicy, +} + +/// Context describing the canonical sandbox-scoped policy merge requested by +/// a CLI `openshell policy update` (gRPC `UpdateConfig` with +/// `merge_operations`). +#[derive(Debug, Clone)] +pub struct UpdateSandboxPolicyCtx { + pub sandbox_id: String, + pub sandbox_name: String, + pub merge_operations: Vec, + /// The baseline `spec.policy` for the sandbox, used to enforce + /// static-field-unchanged checks during the merge. + pub baseline_policy: Option, +} + +/// Context describing a global-policy delete (`openshell policy delete +/// --global`). +#[derive(Debug, Clone)] +pub struct DeleteGlobalPolicyCtx { + /// Sentinel sandbox id used by the store layer for global policy + /// revisions. The handler passes its own constant so this module does not + /// need to know which constant the policy gRPC module chose. + pub global_policy_sandbox_id: String, +} + +/// Outcome of a successful policy mutation. Mirrors the fields of +/// `UpdateConfigResponse` so the gRPC layer can build the reply without +/// reaching back into the store. +#[derive(Debug, Clone, Default)] +pub struct PolicyMutationOutcome { + pub version: u32, + pub policy_hash: String, + pub settings_revision: u64, + pub deleted: bool, +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +/// Pluggable policy provider. +/// +/// Each provider answers three questions: +/// 1. What is the effective policy for this sandbox at admission time? +/// (`get_effective_policy`) +/// 2. Will it accept any mutation to policy state — the canonical +/// mutator RPCs **and** the draft-chunk approval surface? +/// (`permits_mutation` — default `Unsupported`; coarse gate) +/// 3. Will it accept this *specific* mutation? (`set_policy`, +/// `update_policy`, `delete_policy` — default `Unsupported`) +/// +/// The default mutator impls returning `Unsupported` are load-bearing: a +/// provider that should refuse `openshell policy set | update | delete` +/// (e.g. the forthcoming `AttestedPolicyProvider`, which is fed by an +/// off-host signed bundle and has no notion of in-band mutation) inherits +/// the refusal automatically. +/// +/// **v0 scope note.** For the local provider this trait is a gating + thin- +/// persistence seam: `set_policy` writes the revision row and is the single +/// place that decides "yes, this mutation is allowed"; `update_policy` and +/// `delete_policy` are pure gates whose work (merge-with-retry, settings-map +/// mutation, audit emission) remains in the gRPC handler. The shape is wide +/// enough to absorb the full attested-provider semantics in the next session +/// without changing the trait again. +#[async_trait] +pub trait PolicyProvider: Send + Sync + std::fmt::Debug { + /// Canonical policy-type id, e.g. `"local"` or `"attested"`. Must match + /// the `[openshell.policy] type = ...` value in the gateway config — + /// the resolver matches on this string when selecting the provider. + fn id(&self) -> &'static str; + + /// Return the effective policy for `sandbox_id`. The store-backed local + /// provider returns the latest revision recorded for that sandbox (or + /// `None` if no revision exists yet); the attested provider will return + /// the projected policy carried by the latest verified envelope. + async fn get_effective_policy( + &self, + sandbox_id: &str, + ) -> Result, PolicyError>; + + /// Coarse gate: does this provider permit any mutation to policy state? + /// + /// "Mutation" here means **both** the canonical RPC mutators + /// (`set_policy`, `update_policy`, `delete_policy`) and the draft-chunk + /// approval surface added by the agentic approval loop (the `*DraftChunk` + /// handlers in `grpc/policy.rs`). The gRPC layer calls this first — before + /// any DB read or write — so an alternate provider can refuse the entire + /// write surface without per-RPC trait methods. Default: `Unsupported`. + /// + /// Rationale: per-op overrides (`set_policy` etc.) remain the natural + /// extension point for *what work happens* once a mutation is permitted; + /// `permits_mutation` is the coarse gate that lets the forthcoming + /// `AttestedPolicyProvider` (whose authoritative policy is fed by an + /// off-host signed bundle and has no in-band mutation semantics at all) + /// refuse everything by inheriting this default. See the APP + /// implementation plan W-B section ("permits_mutation") for the design + /// alternatives that were considered and rejected. + async fn permits_mutation(&self) -> Result<(), PolicyError> { + Err(PolicyError::Unsupported { + policy_type: self.id(), + operation: "mutation", + }) + } + + /// Replace the policy for a sandbox. Default: `Unsupported`. + async fn set_policy( + &self, + _ctx: &SetSandboxPolicyCtx, + ) -> Result { + Err(PolicyError::Unsupported { + policy_type: self.id(), + operation: "set_policy", + }) + } + + /// Apply a sequence of incremental merge operations to a sandbox's + /// policy. Default: `Unsupported`. + async fn update_policy( + &self, + _ctx: &UpdateSandboxPolicyCtx, + ) -> Result { + Err(PolicyError::Unsupported { + policy_type: self.id(), + operation: "update_policy", + }) + } + + /// Delete the global policy. The local provider implements this against + /// the global-policy sandbox-id sentinel; remote providers may refuse. + /// Default: `Unsupported`. + async fn delete_policy( + &self, + _ctx: &DeleteGlobalPolicyCtx, + ) -> Result { + Err(PolicyError::Unsupported { + policy_type: self.id(), + operation: "delete_policy", + }) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + /// A bare-bones provider with no method overrides. Used to confirm the + /// trait's default `Unsupported` impls fire for the three mutators. + #[derive(Debug)] + struct StubProvider; + + #[async_trait] + impl PolicyProvider for StubProvider { + fn id(&self) -> &'static str { + "stub" + } + + async fn get_effective_policy( + &self, + _sandbox_id: &str, + ) -> Result, PolicyError> { + Ok(None) + } + } + + #[tokio::test] + async fn stub_provider_permits_mutation_returns_unsupported() { + let p = StubProvider; + let err = p + .permits_mutation() + .await + .expect_err("default impl must error"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "stub", + operation: "mutation" + } + )); + } + + #[tokio::test] + async fn stub_provider_set_policy_returns_unsupported() { + let p = StubProvider; + let err = p + .set_policy(&SetSandboxPolicyCtx { + sandbox_id: "sb".into(), + sandbox_name: "sb".into(), + expected_resource_version: 0, + policy: openshell_core::proto::SandboxPolicy::default(), + }) + .await + .expect_err("default impl must error"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "stub", + operation: "set_policy" + } + )); + } + + #[tokio::test] + async fn stub_provider_update_policy_returns_unsupported() { + let p = StubProvider; + let err = p + .update_policy(&UpdateSandboxPolicyCtx { + sandbox_id: "sb".into(), + sandbox_name: "sb".into(), + merge_operations: vec![], + baseline_policy: None, + }) + .await + .expect_err("default impl must error"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "stub", + operation: "update_policy" + } + )); + } + + #[tokio::test] + async fn stub_provider_delete_policy_returns_unsupported() { + let p = StubProvider; + let err = p + .delete_policy(&DeleteGlobalPolicyCtx { + global_policy_sandbox_id: "__global__".into(), + }) + .await + .expect_err("default impl must error"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "stub", + operation: "delete_policy" + } + )); + } + +} diff --git a/crates/openshell-server/src/policy_provider/source.rs b/crates/openshell-server/src/policy_provider/source.rs new file mode 100644 index 000000000..b52182a9e --- /dev/null +++ b/crates/openshell-server/src/policy_provider/source.rs @@ -0,0 +1,473 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Trait-level abstraction over "where the gateway fetches policy from". +//! +//! The gateway speaks a single wire protocol with an out-of-process policy +//! engine. This module isolates that protocol behind a trait so the +//! attested-policy driver can be built and tested without ever importing +//! generated proto types, and so an alternate transport could be slotted in +//! later without touching the driver. +//! +//! This file is intentionally the **only** module in the new code path that +//! is permitted to depend on `openshell_core::proto::policy::*`. If a future +//! change needs proto types elsewhere, that is a leak in the abstraction — +//! restructure rather than paper over. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::SystemTime; + +use async_trait::async_trait; +use ed25519_dalek::{Signer, SigningKey}; +use rand_core_06::OsRng; +use tokio::net::UnixStream; +use tokio::sync::Mutex; +use tonic::transport::{Channel, Endpoint}; +use tonic::{Code, Status}; +use tower::service_fn; + +use openshell_core::proto::policy::v1alpha1 as wire; +use wire::engine_client::EngineClient; + +// --------------------------------------------------------------------------- +// OpenShell-internal types +// --------------------------------------------------------------------------- + +/// Opaque token returned by the policy source's `acquire_handle` call. +/// +/// The gateway treats this purely as bytes; it must not parse, hash, or +/// otherwise derive identity from it. The `Debug` impl elides the inner +/// bytes — handles may be sensitive. +#[derive(Clone)] +pub struct Handle(Vec); + +impl Handle { + #[must_use] + pub const fn new(bytes: Vec) -> Self { + Self(bytes) + } + + #[must_use] + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } + + #[allow(dead_code)] // surface helper; used in follow-up handle-persistence work + #[must_use] + pub fn into_bytes(self) -> Vec { + self.0 + } +} + +impl std::ops::Deref for Handle { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::fmt::Debug for Handle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handle") + .field("len", &self.0.len()) + .finish() + } +} + +/// Gateway-asserted facts about a sandbox session that the engine binds to +/// a handle. Mirrors the wire's `RuntimeContextEnvelope` with idiomatic +/// Rust types. +#[derive(Debug, Clone)] +pub struct RuntimeContext { + pub sandbox_id: String, + pub user_subject: String, + pub attested_at: SystemTime, + /// Detached signature over the envelope payload. Empty when the + /// gateway is not signing. + pub signature: Vec, +} + +/// Policy bytes plus integrity metadata, fetched against a handle. +#[derive(Debug, Clone)] +pub struct ProjectionEnvelope { + pub surface_id: String, + pub schema_version: String, + pub policy_digest: Vec, + pub bundle_digest: Vec, + pub body: Vec, + /// Detached signature over the envelope payload. Empty in early + /// deployments where the engine has not yet shipped attestation. + pub signature: Vec, + /// Identifier of the key that produced `signature`. `None` when + /// `signature` is empty. + pub signing_key_id: Option, +} + +/// Errors returned by [`PolicySource`] implementations. +#[derive(Debug, thiserror::Error)] +pub enum PolicySourceError { + /// Could not establish a transport-level connection to the configured + /// source (UDS path missing, daemon not listening, etc.). + #[error("policy source connect failed: {0}")] + Connect(String), + + /// The RPC reached the source but returned a non-OK status. + #[error("policy source rpc failed: {0}")] + Rpc(#[from] Status), + + /// The source returned a response the gateway could not decode (an + /// envelope field whose contents were inconsistent with its declared + /// type, etc.). + #[error("policy source decode failed: {0}")] + Decode(String), + + /// The source returned a successful response that the gateway-side + /// admission policy refuses to consume (e.g. the engine reports + /// `DRAINING` so no new handles should be acquired). + #[error("policy source rejected request: {reason}")] + Rejected { reason: String }, +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +/// Abstracts the gateway-to-engine wire. +/// +/// The trait surface mirrors the four RPCs on the wire. Implementations may +/// be a real gRPC client (production), an in-process mock (tests), or any +/// other transport — the consumer never knows. +#[async_trait] +pub trait PolicySource: Send + Sync + std::fmt::Debug { + /// Liveness/readiness probe. The driver calls this once at startup + /// before admitting any sandbox through the source. + async fn health(&self) -> Result<(), PolicySourceError>; + + /// Bind a sandbox runtime context to an engine-chosen handle. + async fn acquire_handle( + &self, + ctx: &RuntimeContext, + ) -> Result; + + /// Fetch the projection bound to `handle`, decoded into the policy + /// schema named by `surface_id`. + async fn get_projection( + &self, + handle: &Handle, + surface_id: &str, + ) -> Result; + + /// Drop engine-side state held for `handle`. Idempotent — releasing an + /// unknown handle is OK. + async fn release_handle(&self, handle: &Handle) -> Result<(), PolicySourceError>; +} + +// --------------------------------------------------------------------------- +// Production gRPC impl +// --------------------------------------------------------------------------- + +/// Production implementation of [`PolicySource`] backed by a tonic gRPC +/// client over UDS. +/// +/// The instance owns a fresh Ed25519 signing key used to populate the +/// runtime-context envelope's `signature` field on every +/// [`acquire_handle`] call. The matching public key is provisioned to the +/// engine out-of-band today (v0 cutoff); persistence of the signing key, +/// and the broader handle-persistence story it belongs to, is a follow-up. +#[derive(Debug)] +pub struct GrpcPolicySource { + client: Mutex>, + /// Path the source was dialed against. Kept for diagnostics only; + /// `client` is the live connection. + #[allow(dead_code)] // referenced by `uds_path()` accessor + uds_path: PathBuf, + /// Gateway-side runtime-context signing key. Fresh per process; not + /// persisted in v0. Tracked under handle-persistence follow-up. + signing_key: Arc, +} + +impl GrpcPolicySource { + /// Dial the engine over UDS and build a client. + /// + /// Does **not** call `health` — the caller (the driver constructor) + /// runs the health round-trip so a failure surfaces as a startup + /// error against the driver, not against this helper. + pub async fn connect(uds_path: &Path) -> Result { + let path = uds_path.to_path_buf(); + let display = path.clone(); + + // tonic's UDS pattern: a static URI, with the real connect step + // performed by a `service_fn` closure that opens the unix + // socket. Mirrors `crates/openshell-server/src/compute/vm.rs`'s + // helper. + let connect_path = path.clone(); + let channel = Endpoint::from_static("http://[::]:50051") + .connect_with_connector(service_fn(move |_: tonic::transport::Uri| { + let connect_path = connect_path.clone(); + async move { + UnixStream::connect(connect_path) + .await + .map(hyper_util::rt::TokioIo::new) + } + })) + .await + .map_err(|e| { + PolicySourceError::Connect(format!( + "failed to connect to policy source socket '{}': {e}", + display.display() + )) + })?; + + let client = EngineClient::new(channel); + let signing_key = Arc::new(SigningKey::generate(&mut OsRng)); + + Ok(Self { + client: Mutex::new(client), + uds_path: path, + signing_key, + }) + } + + /// Path the source was dialed against, for diagnostic logging. + #[allow(dead_code)] // used by future audit / error-path logging + #[must_use] + pub fn uds_path(&self) -> &Path { + &self.uds_path + } +} + +/// Canonical byte ordering for the runtime-context envelope signature. +/// +/// The signature covers the concatenation of the textual fields followed +/// by the millis timestamp. The engine reproduces the same byte order on +/// its side to verify. +fn canonical_runtime_context_bytes( + sandbox_id: &str, + user_subject: &str, + attested_at_ms: i64, +) -> Vec { + let mut buf = + Vec::with_capacity(sandbox_id.len() + user_subject.len() + 8); + buf.extend_from_slice(sandbox_id.as_bytes()); + buf.push(0); + buf.extend_from_slice(user_subject.as_bytes()); + buf.push(0); + buf.extend_from_slice(&attested_at_ms.to_be_bytes()); + buf +} + +#[async_trait] +impl PolicySource for GrpcPolicySource { + async fn health(&self) -> Result<(), PolicySourceError> { + let req = tonic::Request::new(wire::HealthRequest {}); + let resp = { + let mut client = self.client.lock().await; + client.health(req).await? + }; + let status = resp.into_inner().status(); + match status { + wire::health_response::Status::Serving => Ok(()), + wire::health_response::Status::Draining => Err(PolicySourceError::Rejected { + reason: "policy source reports DRAINING".to_string(), + }), + wire::health_response::Status::NotServing + | wire::health_response::Status::Unspecified => Err(PolicySourceError::Rejected { + reason: format!("policy source reports {status:?}"), + }), + } + } + + async fn acquire_handle( + &self, + ctx: &RuntimeContext, + ) -> Result { + let attested_at_ms = ctx + .attested_at + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| { + i64::try_from(d.as_millis()).unwrap_or(i64::MAX) + }) + .unwrap_or(0); + + let signature = if ctx.signature.is_empty() { + let payload = canonical_runtime_context_bytes( + &ctx.sandbox_id, + &ctx.user_subject, + attested_at_ms, + ); + self.signing_key.sign(&payload).to_bytes().to_vec() + } else { + ctx.signature.clone() + }; + + let envelope = wire::RuntimeContextEnvelope { + sandbox_id: ctx.sandbox_id.clone(), + user_subject: ctx.user_subject.clone(), + attested_at_ms, + signature, + }; + let req = tonic::Request::new(wire::AcquireHandleRequest { + envelope: Some(envelope), + }); + + let resp = { + let mut client = self.client.lock().await; + client.acquire_handle(req).await? + }; + let inner = resp.into_inner(); + if inner.handle.is_empty() { + return Err(PolicySourceError::Decode( + "engine returned empty handle".to_string(), + )); + } + Ok(Handle::new(inner.handle)) + } + + async fn get_projection( + &self, + handle: &Handle, + surface_id: &str, + ) -> Result { + let req = tonic::Request::new(wire::GetProjectionRequest { + handle: handle.as_bytes().to_vec(), + surface_id: surface_id.to_string(), + }); + let resp = { + let mut client = self.client.lock().await; + client.get_projection(req).await? + }; + let inner = resp.into_inner(); + let env = inner.envelope.ok_or_else(|| { + PolicySourceError::Decode("response missing envelope".to_string()) + })?; + + let signing_key_id = if env.signing_key_id.is_empty() { + None + } else { + Some(env.signing_key_id.clone()) + }; + + // Mismatch between signature presence and key id is a wire-level + // contract violation — flag rather than admit. + match (env.signature.is_empty(), signing_key_id.is_none()) { + (true, true) | (false, false) => {} + (true, false) => { + return Err(PolicySourceError::Decode( + "signing_key_id set but signature is empty".to_string(), + )); + } + (false, true) => { + return Err(PolicySourceError::Decode( + "signature set but signing_key_id is empty".to_string(), + )); + } + } + + Ok(ProjectionEnvelope { + surface_id: env.surface_id, + schema_version: env.schema_version, + policy_digest: env.policy_digest, + bundle_digest: env.bundle_digest, + body: env.body, + signature: env.signature, + signing_key_id, + }) + } + + async fn release_handle(&self, handle: &Handle) -> Result<(), PolicySourceError> { + let req = tonic::Request::new(wire::ReleaseHandleRequest { + handle: handle.as_bytes().to_vec(), + }); + let result = { + let mut client = self.client.lock().await; + client.release_handle(req).await + }; + match result { + Ok(_) => Ok(()), + // Release is contractually idempotent; treat NotFound as OK + // so a follow-up retry after a transient error does not + // surface as a release failure. + Err(status) if status.code() == Code::NotFound => Ok(()), + Err(status) => Err(PolicySourceError::Rpc(status)), + } + } +} + +// --------------------------------------------------------------------------- +// Canonical projection payload bytes +// --------------------------------------------------------------------------- + +/// Canonical byte ordering for the projection envelope signature. +/// +/// The signature covers `surface_id`, `schema_version`, `policy_digest`, +/// `bundle_digest`, and `body`, concatenated in that order with zero-byte +/// separators between the textual fields. +#[must_use] +pub fn canonical_projection_bytes(env: &ProjectionEnvelope) -> Vec { + let mut buf = Vec::with_capacity( + env.surface_id.len() + + env.schema_version.len() + + env.policy_digest.len() + + env.bundle_digest.len() + + env.body.len() + + 4, + ); + buf.extend_from_slice(env.surface_id.as_bytes()); + buf.push(0); + buf.extend_from_slice(env.schema_version.as_bytes()); + buf.push(0); + buf.extend_from_slice(&env.policy_digest); + buf.push(0); + buf.extend_from_slice(&env.bundle_digest); + buf.push(0); + buf.extend_from_slice(&env.body); + buf +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn handle_debug_elides_bytes() { + let h = Handle::new(b"secret-handle-bytes".to_vec()); + let debug = format!("{h:?}"); + assert!(!debug.contains("secret-handle-bytes")); + assert!(debug.contains("len")); + } + + #[test] + fn handle_deref_yields_inner_bytes() { + let h = Handle::new(vec![1, 2, 3]); + let slice: &[u8] = &h; + assert_eq!(slice, &[1, 2, 3]); + } + + #[test] + fn canonical_runtime_context_bytes_is_stable() { + let a = canonical_runtime_context_bytes("sb-1", "alice", 1_700_000_000_000); + let b = canonical_runtime_context_bytes("sb-1", "alice", 1_700_000_000_000); + assert_eq!(a, b); + // Different sandbox produces different bytes. + let c = canonical_runtime_context_bytes("sb-2", "alice", 1_700_000_000_000); + assert_ne!(a, c); + } + + #[test] + fn canonical_projection_bytes_is_stable() { + let env = ProjectionEnvelope { + surface_id: "openshell.sandbox.v1".to_string(), + schema_version: "1".to_string(), + policy_digest: vec![1, 2, 3], + bundle_digest: vec![4, 5, 6], + body: vec![7, 8, 9], + signature: vec![], + signing_key_id: None, + }; + let a = canonical_projection_bytes(&env); + let b = canonical_projection_bytes(&env); + assert_eq!(a, b); + } +} diff --git a/crates/openshell-server/src/policy_provider/trust_store.rs b/crates/openshell-server/src/policy_provider/trust_store.rs new file mode 100644 index 000000000..20fe19594 --- /dev/null +++ b/crates/openshell-server/src/policy_provider/trust_store.rs @@ -0,0 +1,384 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Multi-key Ed25519 trust store. +//! +//! The attested policy driver verifies envelope signatures against a +//! gateway-side trust store loaded from a single JSON file. The file's +//! shape is: +//! +//! ```json +//! { +//! "keys": [ +//! { "key_id": "k-1", "public_key_pem": "-----BEGIN PUBLIC KEY-----\n..." } +//! ] +//! } +//! ``` +//! +//! Distribution of the file — and rotation of its keys — is operator +//! concern, outside the scope of this loader. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use ed25519_dalek::pkcs8::DecodePublicKey; +use ed25519_dalek::{Signature, VerifyingKey}; +use serde::Deserialize; + +#[derive(Debug, thiserror::Error)] +pub enum TrustStoreError { + #[error("trust store path is empty")] + EmptyPath, + + #[error("failed to read trust store file '{path}': {source}")] + Io { + path: PathBuf, + #[source] + source: std::io::Error, + }, + + #[error("failed to parse trust store JSON at '{path}': {source}")] + Parse { + path: PathBuf, + #[source] + source: serde_json::Error, + }, + + #[error("trust store at '{path}' contains zero keys")] + NoKeys { path: PathBuf }, + + #[error("trust store at '{path}' has duplicate key_id '{key_id}'")] + DuplicateKeyId { path: PathBuf, key_id: String }, + + #[error("trust store at '{path}' has an entry with an empty key_id")] + EmptyKeyId { path: PathBuf }, + + #[error("trust store entry '{key_id}' has an unparsable public key: {reason}")] + BadPublicKey { key_id: String, reason: String }, + + #[error("trust store does not contain key_id '{key_id}'")] + UnknownKeyId { key_id: String }, + + #[error("signature for key_id '{key_id}' failed verification")] + BadSignature { key_id: String }, + + #[error("signature for key_id '{key_id}' has unexpected length {len}")] + BadSignatureLength { key_id: String, len: usize }, +} + +#[derive(Debug, Deserialize)] +struct TrustStoreFile { + keys: Vec, +} + +#[derive(Debug, Deserialize)] +struct TrustStoreEntry { + key_id: String, + public_key_pem: String, +} + +/// In-memory trust store keyed by `key_id`. +#[derive(Debug, Clone)] +pub struct TrustStore { + keys: HashMap, +} + +impl TrustStore { + /// Load and validate a trust store from disk. + pub fn load(path: &Path) -> Result { + if path.as_os_str().is_empty() { + return Err(TrustStoreError::EmptyPath); + } + + let bytes = std::fs::read(path).map_err(|source| TrustStoreError::Io { + path: path.to_path_buf(), + source, + })?; + + let file: TrustStoreFile = + serde_json::from_slice(&bytes).map_err(|source| TrustStoreError::Parse { + path: path.to_path_buf(), + source, + })?; + + if file.keys.is_empty() { + return Err(TrustStoreError::NoKeys { + path: path.to_path_buf(), + }); + } + + let mut keys = HashMap::with_capacity(file.keys.len()); + for entry in file.keys { + if entry.key_id.is_empty() { + return Err(TrustStoreError::EmptyKeyId { + path: path.to_path_buf(), + }); + } + if keys.contains_key(&entry.key_id) { + return Err(TrustStoreError::DuplicateKeyId { + path: path.to_path_buf(), + key_id: entry.key_id, + }); + } + if entry.public_key_pem.trim().is_empty() { + return Err(TrustStoreError::BadPublicKey { + key_id: entry.key_id, + reason: "PEM is empty".to_string(), + }); + } + let verifying = VerifyingKey::from_public_key_pem(&entry.public_key_pem).map_err( + |e| TrustStoreError::BadPublicKey { + key_id: entry.key_id.clone(), + reason: e.to_string(), + }, + )?; + keys.insert(entry.key_id, verifying); + } + + Ok(Self { keys }) + } + + /// Construct an in-memory trust store directly. Test-only helper. + #[cfg(test)] + #[must_use] + pub fn from_keys(keys: HashMap) -> Self { + Self { keys } + } + + /// Verify `signature` against `body` using the key registered under + /// `signing_key_id`. Returns an error if the key id is unknown, the + /// signature is malformed, or verification fails. + pub fn verify( + &self, + signing_key_id: &str, + body: &[u8], + signature: &[u8], + ) -> Result<(), TrustStoreError> { + let verifying = + self.keys + .get(signing_key_id) + .ok_or_else(|| TrustStoreError::UnknownKeyId { + key_id: signing_key_id.to_string(), + })?; + + let signature_bytes: [u8; Signature::BYTE_SIZE] = + signature + .try_into() + .map_err(|_| TrustStoreError::BadSignatureLength { + key_id: signing_key_id.to_string(), + len: signature.len(), + })?; + let signature = Signature::from_bytes(&signature_bytes); + + // Bring the upstream signature-trait into local scope only so the + // single call below can dispatch. The token's spelling is fixed + // by the upstream crate. + use ed25519_dalek::Verifier as _; + verifying + .verify(body, &signature) + .map_err(|_| TrustStoreError::BadSignature { + key_id: signing_key_id.to_string(), + }) + } + + /// Number of registered keys. Diagnostic helper. + #[allow(dead_code)] // used by tests; useful for diagnostics + #[must_use] + pub fn len(&self) -> usize { + self.keys.len() + } + + #[allow(dead_code)] // companion to `len` + #[must_use] + pub fn is_empty(&self) -> bool { + self.keys.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ed25519_dalek::pkcs8::EncodePublicKey; + use ed25519_dalek::{Signer, SigningKey}; + use rand_core_06::OsRng; + use std::io::Write; + + fn write_tmp(contents: &str) -> tempfile::NamedTempFile { + let mut f = tempfile::Builder::new() + .suffix(".json") + .tempfile() + .expect("tempfile"); + f.write_all(contents.as_bytes()).expect("write"); + f + } + + fn fresh_keypair() -> (SigningKey, String) { + let signing = SigningKey::generate(&mut OsRng); + let pem = signing + .verifying_key() + .to_public_key_pem(ed25519_dalek::pkcs8::spki::der::pem::LineEnding::LF) + .expect("encode PEM"); + (signing, pem) + } + + #[test] + fn loads_single_key_and_verifies() { + let (sk, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"k-1","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + assert_eq!(store.len(), 1); + + let body = b"hello"; + let sig = sk.sign(body).to_bytes(); + store + .verify("k-1", body, &sig) + .expect("valid signature verifies"); + } + + #[test] + fn empty_path_rejected() { + let err = TrustStore::load(Path::new("")).expect_err("empty path must error"); + assert!(matches!(err, TrustStoreError::EmptyPath)); + } + + #[test] + fn missing_file_rejected_as_io() { + let err = TrustStore::load(Path::new("/nonexistent/trust.json")) + .expect_err("missing file must error"); + assert!(matches!(err, TrustStoreError::Io { .. })); + } + + #[test] + fn malformed_json_rejected() { + let tmp = write_tmp("not json"); + let err = TrustStore::load(tmp.path()).expect_err("malformed json must error"); + assert!(matches!(err, TrustStoreError::Parse { .. })); + } + + #[test] + fn zero_keys_rejected() { + let tmp = write_tmp(r#"{"keys":[]}"#); + let err = TrustStore::load(tmp.path()).expect_err("zero keys must error"); + assert!(matches!(err, TrustStoreError::NoKeys { .. })); + } + + #[test] + fn duplicate_key_id_rejected() { + let (_, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[ + {{"key_id":"k-1","public_key_pem":{:?}}}, + {{"key_id":"k-1","public_key_pem":{:?}}} + ]}}"#, + pem, pem + ); + let tmp = write_tmp(&json); + let err = TrustStore::load(tmp.path()).expect_err("duplicate key_id must error"); + assert!(matches!( + err, + TrustStoreError::DuplicateKeyId { ref key_id, .. } if key_id == "k-1" + )); + } + + #[test] + fn empty_key_id_rejected() { + let (_, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let err = TrustStore::load(tmp.path()).expect_err("empty key_id must error"); + assert!(matches!(err, TrustStoreError::EmptyKeyId { .. })); + } + + #[test] + fn empty_pem_rejected() { + let json = r#"{"keys":[{"key_id":"k-1","public_key_pem":""}]}"#; + let tmp = write_tmp(json); + let err = TrustStore::load(tmp.path()).expect_err("empty PEM must error"); + assert!(matches!( + err, + TrustStoreError::BadPublicKey { ref key_id, .. } if key_id == "k-1" + )); + } + + #[test] + fn malformed_pem_rejected() { + let json = r#"{"keys":[{"key_id":"k-1","public_key_pem":"-----BEGIN PUBLIC KEY-----\ngarbage\n-----END PUBLIC KEY-----\n"}]}"#; + let tmp = write_tmp(json); + let err = TrustStore::load(tmp.path()).expect_err("malformed PEM must error"); + assert!(matches!( + err, + TrustStoreError::BadPublicKey { ref key_id, .. } if key_id == "k-1" + )); + } + + #[test] + fn verify_unknown_key_id_errors() { + let (sk, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"k-1","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + let body = b"hello"; + let sig = sk.sign(body).to_bytes(); + let err = store + .verify("does-not-exist", body, &sig) + .expect_err("unknown key_id must error"); + assert!(matches!(err, TrustStoreError::UnknownKeyId { .. })); + } + + #[test] + fn verify_bad_signature_length_errors() { + let (_, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"k-1","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + let err = store + .verify("k-1", b"body", &[1, 2, 3]) + .expect_err("bad signature length must error"); + assert!(matches!(err, TrustStoreError::BadSignatureLength { .. })); + } + + #[test] + fn verify_bad_signature_errors() { + let (sk, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"k-1","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + let sig = sk.sign(b"original").to_bytes(); + let err = store + .verify("k-1", b"tampered", &sig) + .expect_err("tampered body must fail verify"); + assert!(matches!(err, TrustStoreError::BadSignature { .. })); + } + + #[test] + fn loads_multiple_keys() { + let (_, pem1) = fresh_keypair(); + let (_, pem2) = fresh_keypair(); + let json = format!( + r#"{{"keys":[ + {{"key_id":"k-1","public_key_pem":{:?}}}, + {{"key_id":"k-2","public_key_pem":{:?}}} + ]}}"#, + pem1, pem2 + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + assert_eq!(store.len(), 2); + } +} diff --git a/proto/policy.proto b/proto/policy.proto new file mode 100644 index 000000000..52da0e3c6 --- /dev/null +++ b/proto/policy.proto @@ -0,0 +1,170 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package openshell.policy.v1alpha1; + +// OpenShell out-of-process policy delivery + handle-lifecycle contract. +// +// Defines the wire between the gateway and an external policy engine. The +// engine answers four RPCs over UDS; the gateway acquires a handle for each +// sandbox, fetches policy projections against that handle, and releases the +// handle when the sandbox is dropped. Engine identity and policy integrity +// are carried by signed envelope fields the gateway resolves against a +// trust store it owns. +// +// The wire is OpenShell-owned: this proto is the API, and any conforming +// engine on the other side is valid. + +// -------------------------------------------------------------------------- +// Service +// -------------------------------------------------------------------------- + +// Engine is the server side of the wire. A conforming implementation +// answers four RPCs over UDS. +service Engine { + // Health reports the engine's serving state. The gateway calls this + // once at startup, before admitting any sandbox through the engine, + // and may call it again periodically. + rpc Health(HealthRequest) returns (HealthResponse); + + // AcquireHandle binds a gateway-asserted runtime context to an + // engine-chosen opaque handle. The handle is the join key for all + // subsequent projection lookups for that sandbox. + rpc AcquireHandle(AcquireHandleRequest) returns (AcquireHandleResponse); + + // GetProjection returns the policy bound to `handle`, projected to the + // schema named by `surface_id`. The response envelope carries an + // optional detached signature the gateway verifies against its trust + // store before consuming `body`. + rpc GetProjection(GetProjectionRequest) returns (GetProjectionResponse); + + // ReleaseHandle drops engine-side state held for `handle`. Idempotent; + // releasing an unknown or already-released handle returns OK. + rpc ReleaseHandle(ReleaseHandleRequest) returns (ReleaseHandleResponse); +} + +// -------------------------------------------------------------------------- +// Health +// -------------------------------------------------------------------------- + +message HealthRequest {} + +message HealthResponse { + // Engine serving state. + enum Status { + // Proto3 zero default. Gateways should treat this as NOT_SERVING. + STATUS_UNSPECIFIED = 0; + // Engine is ready to bind handles and serve projections. + SERVING = 1; + // Engine is not currently serving — startup not complete, terminal + // failure, etc. + NOT_SERVING = 2; + // Graceful shutdown initiated. The gateway must stop admitting new + // sandboxes through this engine; already-bound handles remain valid + // until their sandboxes are released. + DRAINING = 3; + } + Status status = 1; +} + +// -------------------------------------------------------------------------- +// AcquireHandle +// -------------------------------------------------------------------------- + +// RuntimeContextEnvelope is the gateway-asserted set of facts about a +// sandbox session that the engine binds to a handle. +message RuntimeContextEnvelope { + // Stable identifier of the sandbox the policy applies to. + string sandbox_id = 1; + + // Authenticated subject of the principal that issued the sandbox + // request. Format is gateway-defined; engines should treat the value + // opaquely. + string user_subject = 2; + + // Unix-millis timestamp the gateway stamped at acquisition time. + // Engines may use this for freshness or replay checks. + int64 attested_at_ms = 3; + + // Detached signature over the preceding fields, produced by the + // gateway's runtime-context signing key. Empty when the gateway is + // not signing envelopes. + bytes signature = 4; +} + +message AcquireHandleRequest { + RuntimeContextEnvelope envelope = 1; +} + +message AcquireHandleResponse { + // Engine-chosen opaque token. The gateway treats this purely as + // bytes; it must not parse, hash, or otherwise derive identity from + // it. + bytes handle = 1; +} + +// -------------------------------------------------------------------------- +// GetProjection +// -------------------------------------------------------------------------- + +message GetProjectionRequest { + // Handle previously returned by AcquireHandle. + bytes handle = 1; + + // Schema identifier selecting the decoder for the response `body` + // (e.g. "openshell.sandbox.v1"). Surface version bumps track + // incompatible changes to the bound schema. + string surface_id = 2; +} + +// ProjectionEnvelope carries the policy bytes plus integrity metadata. +// +// When `signature` is non-empty the gateway verifies it (Ed25519) over the +// canonical serialization of `surface_id`, `schema_version`, +// `policy_digest`, `bundle_digest`, and `body`, using the public key the +// trust store resolves `signing_key_id` to. +message ProjectionEnvelope { + // Echoes the requested surface. Always populated. + string surface_id = 1; + + // Schema version of `body` within `surface_id`'s namespace. + string schema_version = 2; + + // Digest over `body`. Stable identifier for the projected policy + // bytes. + bytes policy_digest = 3; + + // Digest of the upstream source artifact this projection was lowered + // from. Provenance — one source produces many projections. + bytes bundle_digest = 4; + + // Encoded policy body. Decoding is determined by `surface_id` + + // `schema_version`. + bytes body = 5; + + // Detached signature over the envelope. May be empty; once non-empty, + // the gateway requires the trust store to recognise `signing_key_id` + // and the signature to verify. + bytes signature = 6; + + // Identifier of the key that produced `signature`, opaque to the + // gateway except for trust-store lookup. Empty when `signature` is + // empty. + string signing_key_id = 7; +} + +message GetProjectionResponse { + ProjectionEnvelope envelope = 1; +} + +// -------------------------------------------------------------------------- +// ReleaseHandle +// -------------------------------------------------------------------------- + +message ReleaseHandleRequest { + bytes handle = 1; +} + +message ReleaseHandleResponse {}