diff --git a/crates/openshell-server/src/config_file.rs b/crates/openshell-server/src/config_file.rs index d7852e8ee..fadd5905d 100644 --- a/crates/openshell-server/src/config_file.rs +++ b/crates/openshell-server/src/config_file.rs @@ -55,6 +55,12 @@ pub struct OpenShellRoot { #[serde(default)] pub gateway: GatewayFileSection, + /// `[openshell.policy]` table — selects the active policy-provider + /// type. `None` here is equivalent to the local default; the loader + /// validates the chosen type in [`load`]. + #[serde(default)] + pub policy: Option, + /// `[openshell.drivers.]` tables — passed verbatim to each driver /// crate's `Deserialize` impl after the gateway-side inheritance merge. /// Stored as raw [`toml::Value`] so each driver can evolve its schema @@ -63,6 +69,30 @@ pub struct OpenShellRoot { pub drivers: BTreeMap, } +/// `[openshell.policy]` table. +/// +/// Selects the policy-provider type. Today the only fully-supported value +/// is `"local"`, which keeps the gateway's historical in-process, +/// store-backed policy semantics. `"attested"` is reserved for the +/// Attested Policy Projection provider (forthcoming session); declaring it +/// today is parsed successfully but rejected at gateway startup with a +/// clear "policy type not yet available" error. +/// +/// The `type` key intentionally mirrors `openshell-providers`' +/// `ProviderPlugin`-style selector convention rather than the APF/RFC +/// "driver" vocabulary. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct PolicyFileSection { + /// Policy-provider type. Accepted values: `"local"` (the default if + /// the table is omitted) and `"attested"` (declared but not yet + /// implemented). `type` is a Rust keyword, so the field is exposed as + /// `r#type` in code and renamed via `#[serde(rename = "type")]` for + /// the TOML surface. + #[serde(default, rename = "type")] + pub r#type: Option, +} + /// `[openshell.gateway]` section. /// /// All fields are `Option` so the loader can tell whether a key was set @@ -182,6 +212,10 @@ pub enum ConfigFileError { env: &'static str, cli: &'static str, }, + #[error( + "[openshell.policy] type = '{policy_type}' is not a recognized policy type; accepted values are 'local' (default) or 'attested' (not yet available)" + )] + UnknownPolicyType { policy_type: String }, } /// Load and validate a TOML config file. @@ -215,9 +249,33 @@ pub fn load(path: &Path) -> Result { }); } + // Validate the optional policy-provider type. The "attested" value is + // accepted at parse time because the config file may be written ahead + // of the provider landing; startup is responsible for turning that + // into a clear "policy type not yet available" error. + if let Some(ref policy) = file.openshell.policy + && let Some(ref policy_type) = policy.r#type + && !is_known_policy_type(policy_type) + { + return Err(ConfigFileError::UnknownPolicyType { + policy_type: policy_type.clone(), + }); + } + Ok(file) } +/// Policy-type strings recognized by the policy-provider config validator. +/// `"local"` is the historical (and v0) default; `"attested"` is reserved +/// for the forthcoming Attested Policy Projection provider and is parsed +/// successfully so deployments can stage the value ahead of the provider +/// landing. +pub const KNOWN_POLICY_TYPES: &[&str] = &["local", "attested"]; + +fn is_known_policy_type(policy_type: &str) -> bool { + KNOWN_POLICY_TYPES.contains(&policy_type) +} + /// Build the merged TOML table for `driver` by overlaying inheritable /// `[openshell.gateway]` defaults onto `[openshell.drivers.]`. /// @@ -431,6 +489,70 @@ ssh_gateway_port = 8080 assert!(matches!(err, ConfigFileError::Parse { .. })); } + #[test] + fn parses_policy_type_local() { + let toml = r#" +[openshell.policy] +type = "local" +"#; + let tmp = write_tmp(toml); + let file = load(tmp.path()).expect("local policy type parses"); + let policy = file.openshell.policy.expect("policy table present"); + assert_eq!(policy.r#type.as_deref(), Some("local")); + } + + #[test] + fn parses_policy_type_attested() { + // "attested" is accepted at parse time; the gateway startup turns + // this into a clear "policy type not yet available" error so + // deployments can stage the value ahead of the provider landing. + let toml = r#" +[openshell.policy] +type = "attested" +"#; + let tmp = write_tmp(toml); + let file = load(tmp.path()).expect("attested policy type parses"); + let policy = file.openshell.policy.expect("policy table present"); + assert_eq!(policy.r#type.as_deref(), Some("attested")); + } + + #[test] + fn rejects_unknown_policy_type() { + let toml = r#" +[openshell.policy] +type = "nonsense" +"#; + let tmp = write_tmp(toml); + let err = load(tmp.path()).expect_err("unknown policy type must be rejected"); + assert!(matches!( + err, + ConfigFileError::UnknownPolicyType { ref policy_type } if policy_type == "nonsense" + )); + } + + #[test] + fn missing_policy_table_is_ok() { + let toml = r#" +[openshell.gateway] +log_level = "info" +"#; + let tmp = write_tmp(toml); + let file = load(tmp.path()).expect("absent policy table is allowed"); + assert!(file.openshell.policy.is_none()); + } + + #[test] + fn rejects_unknown_policy_field() { + let toml = r#" +[openshell.policy] +type = "local" +nonsense = true +"#; + let tmp = write_tmp(toml); + let err = load(tmp.path()).expect_err("unknown policy field must be rejected"); + assert!(matches!(err, ConfigFileError::Parse { .. })); + } + #[test] fn rejects_unsupported_version() { let toml = r" diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 38268fcd0..cb7623fb0 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -13,6 +13,9 @@ use crate::ServerState; use crate::auth::principal::Principal; use crate::persistence::{DraftChunkRecord, ObjectId, ObjectName, ObjectType, PolicyRecord, Store}; +use crate::policy_provider::{ + DeleteGlobalPolicyCtx, PolicyError, SetSandboxPolicyCtx, UpdateSandboxPolicyCtx, +}; use crate::policy_store::PolicyStoreExt; use openshell_core::proto::policy_merge_operation; use openshell_core::proto::setting_value; @@ -957,6 +960,28 @@ fn truncate_for_log(input: &str, max_chars: usize) -> String { } } +/// Map a [`PolicyError`] to a `tonic::Status`. +/// +/// `Unsupported` carries the policy-type id and the refused operation so +/// the client (and audit) get a precise reason; it maps to +/// `Status::unimplemented`. Persistence errors collapse to +/// `Status::internal` here — handlers that need finer granularity (e.g. +/// CAS conflict → `Status::aborted`) should match on `PolicyError` before +/// calling this. +fn policy_error_to_status(error: PolicyError) -> Status { + match error { + PolicyError::Unsupported { + policy_type, + operation, + } => Status::unimplemented(format!( + "policy type '{policy_type}' does not support operation '{operation}'" + )), + PolicyError::Persistence(err) => { + super::persistence_error_to_status(err, "policy provider") + } + } +} + #[cfg(test)] fn is_sandbox_caller(request: &Request) -> bool { matches!( @@ -1393,6 +1418,16 @@ pub(super) async fn handle_update_config( "delete_setting cannot be combined with policy payload", )); } + // Coarse-grained mutation gate (W-B Phase A). The global-policy + // replace path writes a revision row directly without routing + // through a per-op provider call today; gate it here so an + // alternate provider blocks `openshell policy set --global` + // uniformly with the sandbox-scoped set path. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let mut new_policy = req.policy.ok_or_else(|| { Status::invalid_argument("policy is required for global policy update") })?; @@ -1498,20 +1533,31 @@ pub(super) async fn handle_update_config( let mut global_settings = load_global_settings(state.store.as_ref()).await?; let changed = if req.delete_setting { - let removed = global_settings.settings.remove(key).is_some(); - if removed - && key == POLICY_SETTING_KEY - && let Ok(Some(latest)) = state - .store - .get_latest_policy(GLOBAL_POLICY_SANDBOX_ID) + // Gate global-policy deletion through the active policy + // provider. Local says yes (and supersedes older revisions as + // a side effect inside the trait method); other providers can + // refuse via the trait default, which maps to + // `Status::unimplemented` so `openshell policy delete + // --global` is rejected without a bespoke gate. Non-policy + // setting deletions skip the trait entirely. + if key == POLICY_SETTING_KEY { + // Coarse-grained mutation gate (W-B Phase A). Refuses the + // entire mutation surface before the per-op delete trait + // call runs. + state + .policy_provider + .permits_mutation() .await - { - let _ = state - .store - .supersede_older_policies(GLOBAL_POLICY_SANDBOX_ID, latest.version + 1) - .await; + .map_err(policy_error_to_status)?; + state + .policy_provider + .delete_policy(&DeleteGlobalPolicyCtx { + global_policy_sandbox_id: GLOBAL_POLICY_SANDBOX_ID.to_string(), + }) + .await + .map_err(policy_error_to_status)?; } - removed + global_settings.settings.remove(key).is_some() } else { let setting = req .setting_value @@ -1636,6 +1682,32 @@ pub(super) async fn handle_update_config( .ok_or_else(|| Status::internal("sandbox has no spec"))?; let merge_ops = parse_merge_operations(&req.merge_operations)?; validate_merge_operations_for_server(&merge_ops)?; + + // Coarse-grained mutation gate (W-B Phase A). Fires before any DB + // work or the per-op trait call so an alternate provider refuses + // the merge without exercising the merge-with-retry path. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; + + // Gate the merge through the active policy provider. The local + // provider returns Ok and the existing merge-with-retry path runs + // below; an alternate provider can refuse via the trait default + // (`Status::unimplemented`) so `openshell policy update` is + // rejected without a bespoke gate. + state + .policy_provider + .update_policy(&UpdateSandboxPolicyCtx { + sandbox_id: sandbox_id.clone(), + sandbox_name: sandbox.object_name().to_string(), + merge_operations: merge_ops.clone(), + baseline_policy: spec.policy.clone(), + }) + .await + .map_err(policy_error_to_status)?; + let (version, hash) = apply_merge_operations_with_retry( state.store.as_ref(), &sandbox_id, @@ -1733,52 +1805,51 @@ pub(super) async fn handle_update_config( ); } - let latest = state - .store - .get_latest_policy(&sandbox_id) - .await - .map_err(|e| Status::internal(format!("fetch latest policy failed: {e}")))?; - - let payload = new_policy.encode_to_vec(); - let hash = deterministic_policy_hash(&new_policy); - - if let Some(ref current) = latest - && current.policy_hash == hash - { - return Ok(Response::new(UpdateConfigResponse { - version: u32::try_from(current.version).unwrap_or(0), - policy_hash: hash, - settings_revision: 0, - deleted: false, - })); - } - - let next_version = latest.map_or(1, |r| r.version + 1); - let policy_id = uuid::Uuid::new_v4().to_string(); - + // Coarse-grained mutation gate (W-B Phase A). Runs before the per-op + // `set_policy` call so an alternate provider can refuse the entire + // mutation surface — including the draft-chunk handlers — without + // implementing per-op trait methods. The per-op call below is kept as + // the natural extension seam for what work happens once a mutation is + // permitted; this is the gate that decides whether any work happens + // at all. state - .store - .put_policy_revision(&policy_id, &sandbox_id, next_version, &payload, &hash) + .policy_provider + .permits_mutation() .await - .map_err(|e| Status::internal(format!("persist policy revision failed: {e}")))?; - - let _ = state - .store - .supersede_older_policies(&sandbox_id, next_version) - .await; + .map_err(policy_error_to_status)?; + + // Route the sandbox-scoped policy replacement through the active + // policy provider. `LocalPolicyProvider::set_policy` performs the same + // put-revision + supersede dance that used to live inline here; an + // alternate provider (next session: `AttestedPolicyProvider`) can + // refuse the mutation via the trait default, which maps to + // `Status::unimplemented` so `openshell policy set` is rejected + // automatically without a separate gate. + let ctx = SetSandboxPolicyCtx { + sandbox_id: sandbox_id.clone(), + sandbox_name: sandbox.object_name().to_string(), + expected_resource_version: req.expected_resource_version, + policy: new_policy, + }; + let outcome = state + .policy_provider + .set_policy(&ctx) + .await + .map_err(policy_error_to_status)?; state.sandbox_watch_bus.notify(&sandbox_id); info!( sandbox_id = %sandbox_id, - version = next_version, - policy_hash = %hash, + version = outcome.version, + policy_hash = %outcome.policy_hash, + policy_type = %state.policy_provider.id(), "UpdateConfig: new policy version persisted" ); Ok(Response::new(UpdateConfigResponse { - version: u32::try_from(next_version).unwrap_or(0), - policy_hash: hash, + version: outcome.version, + policy_hash: outcome.policy_hash, settings_revision: 0, deleted: false, })) @@ -2082,6 +2153,15 @@ pub(super) async fn handle_submit_policy_analysis( if req.name.is_empty() { return Err(Status::invalid_argument("name is required")); } + // Coarse-grained mutation gate (W-B Phase A). The proposal-submission + // surface writes new draft chunks; an alternate provider refuses the + // entire draft-chunk write surface here without per-handler trait + // methods. Runs before any DB read or write. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = resolve_sandbox_by_name_for_principal(state.store.as_ref(), &principal, &req.name).await?; @@ -2387,6 +2467,14 @@ pub(super) async fn handle_approve_draft_chunk( if req.chunk_id.is_empty() { return Err(Status::invalid_argument("chunk_id is required")); } + // Coarse-grained mutation gate (W-B Phase A). Approving a chunk merges + // its rule into active policy — pure mutation. Refuses the entire + // chunk-approval surface uniformly with the canonical RPC mutators. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; require_no_global_policy(state).await?; @@ -2474,6 +2562,15 @@ pub(super) async fn handle_reject_draft_chunk( if req.chunk_id.is_empty() { return Err(Status::invalid_argument("chunk_id is required")); } + // Coarse-grained mutation gate (W-B Phase A). Reject mutates the + // chunk's status row and may also remove a previously-approved rule + // from the active policy. Gate refuses both surfaces uniformly with + // the canonical RPC mutators. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = state .store @@ -2555,6 +2652,14 @@ pub(super) async fn handle_approve_all_draft_chunks( if req.name.is_empty() { return Err(Status::invalid_argument("name is required")); } + // Coarse-grained mutation gate (W-B Phase A). Bulk-approve merges N + // chunk rules into active policy and bumps chunk status — refuse the + // entire write surface uniformly with the canonical RPC mutators. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; require_no_global_policy(state).await?; @@ -2677,6 +2782,13 @@ pub(super) async fn handle_edit_draft_chunk( let proposed_rule = req .proposed_rule .ok_or_else(|| Status::invalid_argument("proposed_rule is required"))?; + // Coarse-grained mutation gate (W-B Phase A). Edit rewrites the + // proposed-rule bytes on the chunk — pure draft-chunk store write. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = state .store @@ -2727,6 +2839,14 @@ pub(super) async fn handle_undo_draft_chunk( if req.chunk_id.is_empty() { return Err(Status::invalid_argument("chunk_id is required")); } + // Coarse-grained mutation gate (W-B Phase A). Undo removes the chunk's + // rule from active policy and reverts the chunk's status row to + // pending — two writes to gate uniformly with the canonical mutators. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = state .store @@ -2807,6 +2927,13 @@ pub(super) async fn handle_clear_draft_chunks( if req.name.is_empty() { return Err(Status::invalid_argument("name is required")); } + // Coarse-grained mutation gate (W-B Phase A). Clear deletes all + // pending draft chunks for the sandbox — pure draft-chunk store write. + state + .policy_provider + .permits_mutation() + .await + .map_err(policy_error_to_status)?; let sandbox = state .store @@ -9371,4 +9498,565 @@ mod tests { "policy should be backfilled after one success" ); } + + // ---- PolicyProvider integration (W-B half 1) ---- + + use crate::policy_provider::{PolicyError, PolicyProvider}; + use async_trait::async_trait; + + /// Test-only provider that returns `Unsupported` for every mutator — + /// stands in for the future `AttestedPolicyProvider` so we can verify + /// the gRPC-layer mapping to `Status::unimplemented`. + #[derive(Debug)] + struct RefusingProvider; + + #[async_trait] + impl PolicyProvider for RefusingProvider { + fn id(&self) -> &'static str { + "refusing" + } + + async fn get_effective_policy( + &self, + _sandbox_id: &str, + ) -> Result, PolicyError> { + Ok(None) + } + // set_policy / update_policy / delete_policy inherit the default + // `Unsupported` impls — this is the property under test. + } + + /// Swap the policy provider on a freshly-built test `ServerState`. The + /// `test_server_state` helper returns a unique `Arc`, so we can unwrap + /// it, mutate, and re-wrap. + fn override_policy_provider( + state: Arc, + provider: Arc, + ) -> Arc { + let inner = Arc::try_unwrap(state) + .map_err(|_| "expected unique test-state Arc") + .unwrap(); + Arc::new(ServerState { + policy_provider: provider, + ..inner + }) + } + + #[test] + fn policy_error_unsupported_maps_to_unimplemented() { + let status = policy_error_to_status(PolicyError::Unsupported { + policy_type: "attested", + operation: "set_policy", + }); + assert_eq!(status.code(), Code::Unimplemented); + assert!(status.message().contains("attested")); + assert!(status.message().contains("set_policy")); + } + + #[test] + fn policy_error_persistence_maps_to_internal() { + let status = policy_error_to_status(PolicyError::Persistence( + crate::persistence::PersistenceError::Database("boom".to_string()), + )); + assert_eq!(status.code(), Code::Internal); + } + + /// End-to-end: replace the state's provider with the refusing stub, + /// then drive `handle_update_config` through the sandbox-scoped + /// policy-replace path. The handler must surface + /// `Status::unimplemented` carrying the policy type and operation, + /// which is exactly what `openshell policy set` will see when the + /// attested provider is configured. + #[tokio::test] + async fn refusing_provider_sandbox_set_returns_unimplemented() { + use openshell_core::proto::{SandboxPhase, SandboxSpec}; + let state = test_server_state().await; + // No baseline `spec.policy` — first-time policy discovery exercises + // the backfill path, which reaches the provider seam without first + // tripping `validate_static_fields_unchanged`. + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-refuse".to_string(), + name: "sb-refuse".to_string(), + created_at_ms: 1_000_000, + labels: HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Provisioning as i32); + state.store.put_message(&sandbox).await.unwrap(); + + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let req = with_user(Request::new(UpdateConfigRequest { + name: "sb-refuse".to_string(), + policy: Some(ProtoSandboxPolicy::default()), + ..Default::default() + })); + let err = handle_update_config(&state, req) + .await + .expect_err("refusing provider must reject set_policy"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + // W-B Phase A: the coarse `permits_mutation` gate fires before the + // per-op `set_policy` call, so the surfaced operation is the + // generic `mutation`. The per-op trait method is still defined as + // the natural extension seam for what work happens once a + // mutation is permitted, but it's the gate that decides whether + // any work happens at all. + assert!(err.message().contains("mutation")); + } + + #[tokio::test] + async fn refusing_provider_global_delete_returns_unimplemented() { + let state = test_server_state().await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let req = with_user(Request::new(UpdateConfigRequest { + global: true, + setting_key: POLICY_SETTING_KEY.to_string(), + delete_setting: true, + ..Default::default() + })); + let err = handle_update_config(&state, req) + .await + .expect_err("refusing provider must reject global delete_policy"); + assert_eq!(err.code(), Code::Unimplemented); + // W-B Phase A: see comment in + // `refusing_provider_sandbox_set_returns_unimplemented` — the + // coarse gate fires first, so the operation is `mutation`. + assert!(err.message().contains("mutation")); + } + + #[tokio::test] + async fn refusing_provider_merge_ops_returns_unimplemented() { + use openshell_core::proto::{ + NetworkEndpoint, NetworkPolicyRule, PolicyMergeOperation, SandboxPhase, SandboxSpec, + policy_merge_operation, + }; + let state = test_server_state().await; + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-merge".to_string(), + name: "sb-merge".to_string(), + created_at_ms: 1_000_000, + labels: HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: Some(ProtoSandboxPolicy::default()), + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Provisioning as i32); + state.store.put_message(&sandbox).await.unwrap(); + + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let merge_op = PolicyMergeOperation { + operation: Some(policy_merge_operation::Operation::AddRule( + openshell_core::proto::AddNetworkRule { + rule_name: "test-rule".to_string(), + rule: Some(NetworkPolicyRule { + endpoints: vec![NetworkEndpoint { + host: "example.com".to_string(), + port: 443, + ..Default::default() + }], + ..Default::default() + }), + }, + )), + }; + let req = with_user(Request::new(UpdateConfigRequest { + name: "sb-merge".to_string(), + merge_operations: vec![merge_op], + ..Default::default() + })); + let err = handle_update_config(&state, req) + .await + .expect_err("refusing provider must reject update_policy"); + assert_eq!(err.code(), Code::Unimplemented); + // W-B Phase A: see comment in + // `refusing_provider_sandbox_set_returns_unimplemented` — the + // coarse gate fires first, so the operation is `mutation`. + assert!(err.message().contains("mutation")); + } + + /// Sanity-check that the default `local` provider preserves the + /// status-quo behavior for the sandbox-scoped policy replacement + /// path: the gRPC handler still returns version=1 and a non-empty + /// hash for a brand-new sandbox policy (first-time discovery / + /// backfill). + #[tokio::test] + async fn local_provider_sandbox_set_succeeds_with_version_and_hash() { + use openshell_core::proto::{SandboxPhase, SandboxSpec}; + let state = test_server_state().await; + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-local".to_string(), + name: "sb-local".to_string(), + created_at_ms: 1_000_000, + labels: HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Provisioning as i32); + state.store.put_message(&sandbox).await.unwrap(); + + let req = with_user(Request::new(UpdateConfigRequest { + name: "sb-local".to_string(), + policy: Some(ProtoSandboxPolicy::default()), + ..Default::default() + })); + let resp = handle_update_config(&state, req) + .await + .expect("local provider must accept set_policy"); + let resp = resp.into_inner(); + assert_eq!(resp.version, 1); + assert!(!resp.policy_hash.is_empty()); + } + + // ---- permits_mutation chunk-handler gating (W-B Phase A) ---- + // + // Each chunk handler reachable via gRPC must call `permits_mutation` + // as its first post-authz action; an alternate provider that returns + // `Unsupported` from the trait default must short-circuit every + // handler before any DB write. These tests stand in for the + // forthcoming `AttestedPolicyProvider`: today's `RefusingProvider` + // stub inherits the same default impl the attested provider will, so + // verifying the gRPC layer here is equivalent to verifying the + // attested-mode rejection path the gateway will expose in Phase B. + + /// Helper: seed a single pending draft chunk for the given sandbox so + /// the per-chunk handlers (`approve`, `reject`, `edit`, `undo`) have a + /// target row to attempt to mutate. Returns the chunk id. + async fn seed_pending_chunk( + state: &Arc, + sandbox_id: &str, + chunk_id: &str, + ) -> String { + use openshell_core::proto::{NetworkBinary, NetworkEndpoint, NetworkPolicyRule}; + let rule = NetworkPolicyRule { + name: "seed-rule".to_string(), + endpoints: vec![NetworkEndpoint { + host: "example.com".to_string(), + port: 443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + let chunk = DraftChunkRecord { + id: chunk_id.to_string(), + sandbox_id: sandbox_id.to_string(), + draft_version: 1, + status: "pending".to_string(), + rule_name: "seed-rule".to_string(), + proposed_rule: rule.encode_to_vec(), + rationale: "seed".to_string(), + security_notes: String::new(), + confidence: 1.0, + created_at_ms: 1_000_000, + decided_at_ms: None, + host: "example.com".to_string(), + port: 443, + binary: "/usr/bin/curl".to_string(), + hit_count: 1, + first_seen_ms: 0, + last_seen_ms: 0, + validation_result: String::new(), + rejection_reason: String::new(), + }; + state + .store + .put_draft_chunk(&chunk, None) + .await + .expect("seed chunk persists") + } + + /// Helper: seed an approved draft chunk (for `undo`, which requires + /// the target chunk to be in the `approved` state). + async fn seed_approved_chunk( + state: &Arc, + sandbox_id: &str, + chunk_id: &str, + ) -> String { + let id = seed_pending_chunk(state, sandbox_id, chunk_id).await; + state + .store + .update_draft_chunk_status(&id, "approved", Some(1_000_001), None) + .await + .expect("flip to approved"); + id + } + + /// Helper: snapshot all chunks for a sandbox so a test can assert the + /// refused handler made no draft-chunk write. Catches the regression + /// where someone moves the gate after a status update or row insert. + async fn chunk_snapshot( + state: &Arc, + sandbox_id: &str, + ) -> Vec<(String, String, Vec, String)> { + let chunks = state + .store + .list_draft_chunks(sandbox_id, None) + .await + .expect("list chunks"); + chunks + .into_iter() + .map(|c| (c.id, c.status, c.proposed_rule, c.rejection_reason)) + .collect() + } + + /// Helper: put a Ready sandbox with no baseline policy so the + /// chunk-handler preconditions (sandbox exists, not global-managed) + /// pass through to the provider gate. + async fn put_test_sandbox(state: &Arc, id: &str, name: &str) { + use openshell_core::proto::{SandboxPhase, SandboxSpec}; + let mut sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: id.to_string(), + name: name.to_string(), + created_at_ms: 1_000_000, + labels: HashMap::new(), + resource_version: 0, + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + ..Default::default() + }; + sandbox.set_phase(SandboxPhase::Ready as i32); + state.store.put_message(&sandbox).await.unwrap(); + } + + #[tokio::test] + async fn refusing_provider_submit_policy_analysis_returns_unimplemented() { + use openshell_core::proto::{NetworkBinary, NetworkEndpoint, NetworkPolicyRule, PolicyChunk}; + let state = test_server_state().await; + put_test_sandbox(&state, "sb-submit", "sb-submit").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-submit").await; + let req = with_user(Request::new(SubmitPolicyAnalysisRequest { + name: "sb-submit".to_string(), + proposed_chunks: vec![PolicyChunk { + rule_name: "blocked".to_string(), + proposed_rule: Some(NetworkPolicyRule { + name: "blocked".to_string(), + endpoints: vec![NetworkEndpoint { + host: "example.com".to_string(), + port: 443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }), + confidence: 0.9, + hit_count: 1, + ..Default::default() + }], + ..Default::default() + })); + let err = handle_submit_policy_analysis(&state, req) + .await + .expect_err("refusing provider must reject submit_policy_analysis"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-submit").await; + assert_eq!( + before, after, + "refused submit_policy_analysis must not write to the draft-chunk store" + ); + } + + #[tokio::test] + async fn refusing_provider_approve_draft_chunk_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-approve", "sb-approve").await; + seed_pending_chunk(&state, "sb-approve", "chunk-approve").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-approve").await; + let req = Request::new(ApproveDraftChunkRequest { + name: "sb-approve".to_string(), + chunk_id: "chunk-approve".to_string(), + }); + let err = handle_approve_draft_chunk(&state, req) + .await + .expect_err("refusing provider must reject approve_draft_chunk"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-approve").await; + assert_eq!( + before, after, + "refused approve_draft_chunk must not flip chunk status" + ); + } + + #[tokio::test] + async fn refusing_provider_reject_draft_chunk_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-reject", "sb-reject").await; + seed_pending_chunk(&state, "sb-reject", "chunk-reject").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-reject").await; + let req = Request::new(RejectDraftChunkRequest { + name: "sb-reject".to_string(), + chunk_id: "chunk-reject".to_string(), + reason: "test".to_string(), + }); + let err = handle_reject_draft_chunk(&state, req) + .await + .expect_err("refusing provider must reject reject_draft_chunk"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-reject").await; + assert_eq!( + before, after, + "refused reject_draft_chunk must not flip chunk status or persist reason" + ); + } + + #[tokio::test] + async fn refusing_provider_approve_all_draft_chunks_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-all", "sb-all").await; + seed_pending_chunk(&state, "sb-all", "chunk-all-a").await; + seed_pending_chunk(&state, "sb-all", "chunk-all-b").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-all").await; + let req = Request::new(ApproveAllDraftChunksRequest { + name: "sb-all".to_string(), + include_security_flagged: true, + }); + let err = handle_approve_all_draft_chunks(&state, req) + .await + .expect_err("refusing provider must reject approve_all_draft_chunks"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-all").await; + assert_eq!( + before, after, + "refused approve_all_draft_chunks must not flip any chunk status" + ); + } + + #[tokio::test] + async fn refusing_provider_edit_draft_chunk_returns_unimplemented() { + use openshell_core::proto::{NetworkBinary, NetworkEndpoint, NetworkPolicyRule}; + let state = test_server_state().await; + put_test_sandbox(&state, "sb-edit", "sb-edit").await; + seed_pending_chunk(&state, "sb-edit", "chunk-edit").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-edit").await; + let req = Request::new(EditDraftChunkRequest { + name: "sb-edit".to_string(), + chunk_id: "chunk-edit".to_string(), + proposed_rule: Some(NetworkPolicyRule { + name: "edited".to_string(), + endpoints: vec![NetworkEndpoint { + host: "edited.example".to_string(), + port: 8443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/edited".to_string(), + ..Default::default() + }], + }), + }); + let err = handle_edit_draft_chunk(&state, req) + .await + .expect_err("refusing provider must reject edit_draft_chunk"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-edit").await; + assert_eq!( + before, after, + "refused edit_draft_chunk must not rewrite proposed_rule bytes" + ); + } + + #[tokio::test] + async fn refusing_provider_undo_draft_chunk_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-undo", "sb-undo").await; + seed_approved_chunk(&state, "sb-undo", "chunk-undo").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-undo").await; + let req = Request::new(UndoDraftChunkRequest { + name: "sb-undo".to_string(), + chunk_id: "chunk-undo".to_string(), + }); + let err = handle_undo_draft_chunk(&state, req) + .await + .expect_err("refusing provider must reject undo_draft_chunk"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-undo").await; + assert_eq!( + before, after, + "refused undo_draft_chunk must not flip chunk status back to pending" + ); + } + + #[tokio::test] + async fn refusing_provider_clear_draft_chunks_returns_unimplemented() { + let state = test_server_state().await; + put_test_sandbox(&state, "sb-clear", "sb-clear").await; + seed_pending_chunk(&state, "sb-clear", "chunk-clear-a").await; + seed_pending_chunk(&state, "sb-clear", "chunk-clear-b").await; + let state = override_policy_provider(state, Arc::new(RefusingProvider)); + + let before = chunk_snapshot(&state, "sb-clear").await; + let req = Request::new(ClearDraftChunksRequest { + name: "sb-clear".to_string(), + }); + let err = handle_clear_draft_chunks(&state, req) + .await + .expect_err("refusing provider must reject clear_draft_chunks"); + assert_eq!(err.code(), Code::Unimplemented); + assert!(err.message().contains("refusing")); + assert!(err.message().contains("mutation")); + + let after = chunk_snapshot(&state, "sb-clear").await; + assert_eq!( + before, after, + "refused clear_draft_chunks must not delete any chunks" + ); + } } diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index f59ec22df..348fa5e86 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -30,6 +30,7 @@ mod http; mod inference; mod multiplex; mod persistence; +pub(crate) mod policy_provider; pub(crate) mod policy_store; mod provider_refresh; mod readiness; @@ -79,6 +80,15 @@ pub struct ServerState { /// Persistence store. pub store: Arc, + /// Active policy provider. + /// + /// All `openshell policy set | update | delete` requests at the gRPC + /// layer route through this provider. The `local` type writes directly + /// to [`Self::store`]; other policy types may refuse mutations via + /// `PolicyError::Unsupported`, which the gRPC layer translates to + /// `Status::unimplemented`. + pub policy_provider: Arc, + /// Compute orchestration over the configured driver. pub compute: ComputeRuntime, @@ -148,6 +158,12 @@ fn is_benign_connection_close(error: &(dyn std::error::Error + 'static)) -> bool impl ServerState { /// Create new server state. + /// + /// Selects the default `local` policy provider. Callers that need to + /// substitute a different provider (test fixtures wanting to exercise + /// `Unsupported` behavior; or, when it lands, the `attested` policy + /// type) can replace `policy_provider` on the constructed value before + /// the state is shared. #[must_use] #[allow(clippy::too_many_arguments)] pub fn new( @@ -160,9 +176,12 @@ impl ServerState { supervisor_sessions: Arc, oidc_cache: Option>, ) -> Self { + let policy_provider: Arc = + Arc::new(policy_provider::LocalPolicyProvider::new(store.clone())); Self { config, store, + policy_provider, compute, sandbox_index, sandbox_watch_bus, @@ -244,6 +263,15 @@ pub async fn run_server( oidc_cache, ); + // Override the default `local` policy provider when the config file + // selects a different policy type. The `attested` type is reserved for + // the forthcoming Attested Policy Projection work; declaring it today + // returns a startup error rather than a generic "unknown policy type" + // message so deployments staging the value get a clear signal. + if let Some(provider) = resolve_policy_provider(config_file.as_ref(), store.clone())? { + state.policy_provider = provider; + } + // Load the gateway-minted sandbox JWT signing key when configured. // Optional so single-driver dev deployments without certgen continue // to start. The helm-deployed gateway and the RPM init script populate @@ -454,6 +482,54 @@ pub async fn run_server( Ok(()) } +/// Build the policy-provider registry for this gateway process. Currently +/// holds only `local`; the next session adds the `attested` policy type +/// here. +fn build_policy_provider_registry(store: Arc) -> policy_provider::PolicyProviderRegistry { + let mut registry = policy_provider::PolicyProviderRegistry::new(); + registry.register(policy_provider::LocalPolicyProvider::new(store)); + registry +} + +/// Resolve the configured policy provider, if the config file selects one. +/// Returns `Ok(None)` when no override is needed (the default `local` +/// provider from `ServerState::new` already covers that case). +/// +/// `"local"` returns a fresh provider via the registry. `"attested"` is +/// parsed at config-file load time but the registry does not yet contain +/// the provider — startup returns a clear "policy type not yet available" +/// error so deployments staging the value get a clear signal rather than +/// silently falling back to local. +fn resolve_policy_provider( + config_file: Option<&config_file::ConfigFile>, + store: Arc, +) -> Result>> { + let Some(file) = config_file else { + return Ok(None); + }; + let Some(policy) = file.openshell.policy.as_ref() else { + return Ok(None); + }; + let Some(policy_type) = policy.r#type.as_deref() else { + return Ok(None); + }; + let registry = build_policy_provider_registry(store); + if let Some(provider) = registry.get(policy_type) { + return Ok(Some(provider)); + } + if policy_type == policy_provider::ATTESTED_POLICY_TYPE_ID { + return Err(Error::config( + "[openshell.policy] type = 'attested' is not yet available in this build; \ + use 'local' or omit the [openshell.policy] table", + )); + } + // Unreachable in practice — `config_file::load` already rejects + // unknown policy type names. Treat any straggler defensively. + Err(Error::config(format!( + "unknown policy provider type '{policy_type}'" + ))) +} + fn gateway_listener_addresses( bind_address: SocketAddr, extra_addresses: &[SocketAddr], diff --git a/crates/openshell-server/src/policy_provider/local.rs b/crates/openshell-server/src/policy_provider/local.rs new file mode 100644 index 000000000..1ea3e417f --- /dev/null +++ b/crates/openshell-server/src/policy_provider/local.rs @@ -0,0 +1,345 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Local (in-process, store-backed) policy provider. +//! +//! This is the `"local"` policy type — selected by the default +//! `[openshell.policy] type = "local"` (or by omitting the table entirely). +//! +//! Wraps [`crate::persistence::Store`] (which implements +//! [`crate::policy_store::PolicyStoreExt`]) and exposes the canonical +//! `PolicyProvider` operations. Behavior here is intentionally a thin +//! pass-through to the existing DB writes — the gRPC handler retains +//! validation, audit emission, sandbox-watch notification, and CAS retry +//! responsibility. +//! +//! The next session adds `AttestedPolicyProvider` as a sibling module here. + +use std::sync::Arc; + +use async_trait::async_trait; +use prost::Message; +use sha2::{Digest, Sha256}; + +use crate::persistence::Store; +use crate::policy_store::PolicyStoreExt; + +use super::{ + DeleteGlobalPolicyCtx, PolicyError, PolicyMutationOutcome, PolicyProvider, + SetSandboxPolicyCtx, UpdateSandboxPolicyCtx, LOCAL_POLICY_TYPE_ID, +}; + +/// Local policy provider — persists all mutations directly to the gateway's +/// own store. This is today's behavior, just routed through the trait. +#[derive(Debug, Clone)] +pub struct LocalPolicyProvider { + store: Arc, +} + +impl LocalPolicyProvider { + #[must_use] + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +/// SHA-256 over the canonical-encoded policy. Mirrors the +/// `deterministic_policy_hash` in `grpc/policy.rs`, kept private here so the +/// provider can stamp hashes without crossing the module boundary. Local +/// keeps it in sync structurally; the handler module remains the single +/// reference impl used by the rest of the codebase. +fn deterministic_policy_hash(policy: &openshell_core::proto::SandboxPolicy) -> String { + let mut hasher = Sha256::new(); + hasher.update(policy.version.to_le_bytes()); + if let Some(fs) = &policy.filesystem { + hasher.update(fs.encode_to_vec()); + } + if let Some(ll) = &policy.landlock { + hasher.update(ll.encode_to_vec()); + } + if let Some(p) = &policy.process { + hasher.update(p.encode_to_vec()); + } + let mut entries: Vec<_> = policy.network_policies.iter().collect(); + entries.sort_by_key(|(k, _)| k.as_str()); + for (key, value) in entries { + hasher.update(key.as_bytes()); + hasher.update(value.encode_to_vec()); + } + hex::encode(hasher.finalize()) +} + +#[async_trait] +impl PolicyProvider for LocalPolicyProvider { + fn id(&self) -> &'static str { + LOCAL_POLICY_TYPE_ID + } + + async fn permits_mutation(&self) -> Result<(), PolicyError> { + // The local provider owns the in-process policy store, so every + // mutation surface — the three canonical RPC mutators and the + // draft-chunk approval handlers — is supported. The attested + // provider will inherit the trait default (`Unsupported`) so the + // gateway's coarse gate refuses both surfaces uniformly. + Ok(()) + } + + async fn get_effective_policy( + &self, + sandbox_id: &str, + ) -> Result, PolicyError> { + let record = self.store.get_latest_policy(sandbox_id).await?; + match record { + Some(record) => { + let policy = openshell_core::proto::SandboxPolicy::decode( + record.policy_payload.as_slice(), + ) + .map_err(|e| { + PolicyError::Persistence(crate::persistence::PersistenceError::Decode(format!( + "decode policy failed: {e}" + ))) + })?; + Ok(Some(policy)) + } + None => Ok(None), + } + } + + async fn set_policy( + &self, + ctx: &SetSandboxPolicyCtx, + ) -> Result { + let payload = ctx.policy.encode_to_vec(); + let hash = deterministic_policy_hash(&ctx.policy); + + let latest = self.store.get_latest_policy(&ctx.sandbox_id).await?; + + // Idempotent set: same hash already at HEAD → return the existing + // version without bumping. Matches the no-op short-circuit the + // handler had before the provider seam was introduced. + if let Some(ref current) = latest + && current.policy_hash == hash + { + return Ok(PolicyMutationOutcome { + version: u32::try_from(current.version).unwrap_or(0), + policy_hash: hash, + settings_revision: 0, + deleted: false, + }); + } + + let next_version = latest.map_or(1, |r| r.version + 1); + let policy_id = uuid::Uuid::new_v4().to_string(); + + self.store + .put_policy_revision(&policy_id, &ctx.sandbox_id, next_version, &payload, &hash) + .await?; + + // Best-effort cleanup of older revisions. Matches the handler's + // `let _ = ...` pattern — supersession failure is not a control-flow + // signal here; the new revision is still authoritative. + let _ = self + .store + .supersede_older_policies(&ctx.sandbox_id, next_version) + .await; + + Ok(PolicyMutationOutcome { + version: u32::try_from(next_version).unwrap_or(0), + policy_hash: hash, + settings_revision: 0, + deleted: false, + }) + } + + async fn update_policy( + &self, + ctx: &UpdateSandboxPolicyCtx, + ) -> Result { + // The merge-with-retry loop, the static-fields-unchanged validation, + // and the safety validation live in the gRPC handler because they + // emit `tonic::Status::invalid_argument` directly for client-visible + // errors. The provider just confirms the operation is supported and + // re-runs the existing handler-side helper. We surface this by + // returning a sentinel zero-version outcome that the handler ignores + // and replaces with the value `apply_merge_operations_with_retry` + // computed; only the gating semantics travel through the trait here. + // + // The attested provider will return `Unsupported` from the default + // impl, which is what `openshell policy update` needs to reject. + let _ = (&ctx.sandbox_id, &ctx.sandbox_name, &ctx.merge_operations, &ctx.baseline_policy); + Ok(PolicyMutationOutcome::default()) + } + + async fn delete_policy( + &self, + ctx: &DeleteGlobalPolicyCtx, + ) -> Result { + // Local supports the global-policy delete. Mirror the handler's + // existing logic: if a latest global revision exists, mark all + // earlier revisions superseded. The handler still owns the global + // settings map mutation and the audit emission; this method only + // gates the operation through the trait so the attested provider's + // default `Unsupported` is what the handler sees there. + if let Ok(Some(latest)) = self + .store + .get_latest_policy(&ctx.global_policy_sandbox_id) + .await + { + let _ = self + .store + .supersede_older_policies(&ctx.global_policy_sandbox_id, latest.version + 1) + .await; + } + Ok(PolicyMutationOutcome::default()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::persistence::Store; + use openshell_core::proto::SandboxPolicy; + + async fn fresh_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:?cache=shared") + .await + .expect("in-memory sqlite connects"), + ) + } + + #[tokio::test] + async fn permits_mutation_is_ok_for_local_provider() { + let p = LocalPolicyProvider::new(fresh_store().await); + p.permits_mutation() + .await + .expect("local provider permits the entire mutation surface"); + } + + #[tokio::test] + async fn id_is_local() { + let p = LocalPolicyProvider::new(fresh_store().await); + assert_eq!(p.id(), LOCAL_POLICY_TYPE_ID); + assert_eq!(p.id(), "local"); + } + + #[tokio::test] + async fn get_effective_policy_returns_none_when_no_revision() { + let p = LocalPolicyProvider::new(fresh_store().await); + let got = p.get_effective_policy("sb-fresh").await.unwrap(); + assert!(got.is_none()); + } + + #[tokio::test] + async fn set_policy_persists_and_get_effective_policy_returns_it() { + let store = fresh_store().await; + let p = LocalPolicyProvider::new(store.clone()); + let policy = SandboxPolicy { + version: 1, + ..Default::default() + }; + let out = p + .set_policy(&SetSandboxPolicyCtx { + sandbox_id: "sb-1".to_string(), + sandbox_name: "sb-1".to_string(), + expected_resource_version: 0, + policy: policy.clone(), + }) + .await + .expect("set_policy succeeds"); + assert_eq!(out.version, 1); + assert!(!out.policy_hash.is_empty()); + + let fetched = p + .get_effective_policy("sb-1") + .await + .expect("get_effective_policy ok") + .expect("policy present"); + assert_eq!(fetched.version, 1); + } + + #[tokio::test] + async fn set_policy_is_idempotent_on_same_hash() { + let store = fresh_store().await; + let p = LocalPolicyProvider::new(store); + let policy = SandboxPolicy { + version: 1, + ..Default::default() + }; + let ctx = SetSandboxPolicyCtx { + sandbox_id: "sb-2".to_string(), + sandbox_name: "sb-2".to_string(), + expected_resource_version: 0, + policy: policy.clone(), + }; + let first = p.set_policy(&ctx).await.unwrap(); + let second = p.set_policy(&ctx).await.unwrap(); + assert_eq!(first.version, second.version); + assert_eq!(first.policy_hash, second.policy_hash); + } + + #[tokio::test] + async fn set_policy_bumps_version_on_distinct_hash() { + let store = fresh_store().await; + let p = LocalPolicyProvider::new(store); + let policy_a = SandboxPolicy { + version: 1, + ..Default::default() + }; + let policy_b = SandboxPolicy { + version: 2, + ..Default::default() + }; + let ctx_a = SetSandboxPolicyCtx { + sandbox_id: "sb-3".to_string(), + sandbox_name: "sb-3".to_string(), + expected_resource_version: 0, + policy: policy_a, + }; + let ctx_b = SetSandboxPolicyCtx { + sandbox_id: "sb-3".to_string(), + sandbox_name: "sb-3".to_string(), + expected_resource_version: 0, + policy: policy_b, + }; + let a = p.set_policy(&ctx_a).await.unwrap(); + let b = p.set_policy(&ctx_b).await.unwrap(); + assert_eq!(a.version, 1); + assert_eq!(b.version, 2); + assert_ne!(a.policy_hash, b.policy_hash); + } + + #[tokio::test] + async fn delete_policy_is_ok_when_no_global_revision_exists() { + let p = LocalPolicyProvider::new(fresh_store().await); + // Should not error even though there is no global revision to + // supersede; matches the handler's `let _ = ...` semantics. + p.delete_policy(&DeleteGlobalPolicyCtx { + global_policy_sandbox_id: "__global__".to_string(), + }) + .await + .expect("delete with no revision is a no-op"); + } + + #[tokio::test] + async fn update_policy_is_ok_for_local_provider() { + let p = LocalPolicyProvider::new(fresh_store().await); + // The merge work lives in the handler today; the trait method just + // gates the operation. Local says "supported", attested would say + // Unsupported via the default impl. + let out = p + .update_policy(&UpdateSandboxPolicyCtx { + sandbox_id: "sb-4".to_string(), + sandbox_name: "sb-4".to_string(), + merge_operations: vec![], + baseline_policy: None, + }) + .await + .expect("update_policy supported on local"); + // Local returns the default sentinel — the handler retains the + // merge-with-retry work and builds the final response itself. + assert_eq!(out.version, 0); + assert!(out.policy_hash.is_empty()); + assert!(!out.deleted); + } +} diff --git a/crates/openshell-server/src/policy_provider/mod.rs b/crates/openshell-server/src/policy_provider/mod.rs new file mode 100644 index 000000000..cc9d7c0cc --- /dev/null +++ b/crates/openshell-server/src/policy_provider/mod.rs @@ -0,0 +1,388 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Pluggable policy-provider subsystem. +//! +//! The gateway today resolves an effective policy and accepts policy mutations +//! through inline calls to [`crate::persistence::Store`] from the gRPC layer. +//! This module promotes that surface into a trait + registry so an alternate +//! provider (next session: `AttestedPolicyProvider`, which consumes signed +//! projections from a Runtime Policy Verifier daemon) can refuse the mutator +//! methods while still serving an `Authoritative` effective policy at +//! admission time. See the Attested Policy Projection RFC and +//! `runtime-policy-verifier/docs/app-implementation-plan.md` W-B. +//! +//! Structure intentionally mirrors `openshell-providers::ProviderPlugin` / +//! `ProviderRegistry`: a trait, a `dyn`-safe registry keyed by canonical +//! policy-type id (`type` in TOML, matching `ProviderPlugin`'s selector +//! convention), and an error type with an `Unsupported { policy_type, +//! operation }` variant that maps to `tonic::Status::unimplemented` at the +//! gRPC edge. + +mod local; + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::persistence::PersistenceError; + +pub use local::LocalPolicyProvider; + +/// Policy-type id for the in-process, store-backed policy provider. +pub const LOCAL_POLICY_TYPE_ID: &str = "local"; + +/// Policy-type id for the (forthcoming) Attested Policy Projection provider. +/// +/// Declared here so config validation can produce a friendly "policy type +/// not yet available" error rather than the generic "unknown policy type" +/// error a follow-up implementer's typo would produce. +pub const ATTESTED_POLICY_TYPE_ID: &str = "attested"; + +// --------------------------------------------------------------------------- +// Error type +// --------------------------------------------------------------------------- + +/// Errors returned by [`PolicyProvider`] implementations. +/// +/// `Unsupported` mirrors `openshell_providers::ProviderError::UnsupportedProvider` +/// — it carries enough context for the gRPC layer to surface the refusal as a +/// `Status::unimplemented` reply naming both the policy type and the +/// operation it refused. +#[derive(Debug, thiserror::Error)] +pub enum PolicyError { + /// The active policy provider does not implement this operation. The + /// default trait impl of `set_policy` / `update_policy` / + /// `delete_policy` returns this so a provider only needs to override + /// the operations it supports. The `policy_type` field carries the + /// provider's `id()` — the same string the config selector uses — so + /// audit and error messages can name it precisely. + #[error("policy type '{policy_type}' does not support operation '{operation}'")] + Unsupported { + policy_type: &'static str, + operation: &'static str, + }, + + /// Wraps a persistence-layer failure produced by the local provider. + /// The gRPC layer maps this back to the same status it would have + /// produced before the provider seam existed. + #[error("policy persistence error: {0}")] + Persistence(#[from] PersistenceError), +} + +/// Context describing the canonical sandbox-scoped policy replacement +/// requested by a CLI `openshell policy set` (or equivalent gRPC +/// `UpdateConfig` call with `policy` set and no `merge_operations`). +#[derive(Debug, Clone)] +pub struct SetSandboxPolicyCtx { + pub sandbox_id: String, + pub sandbox_name: String, + pub expected_resource_version: u64, + pub policy: openshell_core::proto::SandboxPolicy, +} + +/// Context describing the canonical sandbox-scoped policy merge requested by +/// a CLI `openshell policy update` (gRPC `UpdateConfig` with +/// `merge_operations`). +#[derive(Debug, Clone)] +pub struct UpdateSandboxPolicyCtx { + pub sandbox_id: String, + pub sandbox_name: String, + pub merge_operations: Vec, + /// The baseline `spec.policy` for the sandbox, used to enforce + /// static-field-unchanged checks during the merge. + pub baseline_policy: Option, +} + +/// Context describing a global-policy delete (`openshell policy delete +/// --global`). +#[derive(Debug, Clone)] +pub struct DeleteGlobalPolicyCtx { + /// Sentinel sandbox id used by the store layer for global policy + /// revisions. The handler passes its own constant so this module does not + /// need to know which constant the policy gRPC module chose. + pub global_policy_sandbox_id: String, +} + +/// Outcome of a successful policy mutation. Mirrors the fields of +/// `UpdateConfigResponse` so the gRPC layer can build the reply without +/// reaching back into the store. +#[derive(Debug, Clone, Default)] +pub struct PolicyMutationOutcome { + pub version: u32, + pub policy_hash: String, + pub settings_revision: u64, + pub deleted: bool, +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +/// Pluggable policy provider. +/// +/// Each provider answers three questions: +/// 1. What is the effective policy for this sandbox at admission time? +/// (`get_effective_policy`) +/// 2. Will it accept any mutation to policy state — the canonical +/// mutator RPCs **and** the draft-chunk approval surface? +/// (`permits_mutation` — default `Unsupported`; coarse gate) +/// 3. Will it accept this *specific* mutation? (`set_policy`, +/// `update_policy`, `delete_policy` — default `Unsupported`) +/// +/// The default mutator impls returning `Unsupported` are load-bearing: a +/// provider that should refuse `openshell policy set | update | delete` +/// (e.g. the forthcoming `AttestedPolicyProvider`, which is fed by an +/// off-host signed bundle and has no notion of in-band mutation) inherits +/// the refusal automatically. +/// +/// **v0 scope note.** For the local provider this trait is a gating + thin- +/// persistence seam: `set_policy` writes the revision row and is the single +/// place that decides "yes, this mutation is allowed"; `update_policy` and +/// `delete_policy` are pure gates whose work (merge-with-retry, settings-map +/// mutation, audit emission) remains in the gRPC handler. The shape is wide +/// enough to absorb the full attested-provider semantics in the next session +/// without changing the trait again. +#[async_trait] +pub trait PolicyProvider: Send + Sync + std::fmt::Debug { + /// Canonical policy-type id, e.g. `"local"` or `"attested"`. Must match + /// the string the registry uses to look this provider up and the + /// `[openshell.policy] type = ...` value in the gateway config. + fn id(&self) -> &'static str; + + /// Return the effective policy for `sandbox_id`. The store-backed local + /// provider returns the latest revision recorded for that sandbox (or + /// `None` if no revision exists yet); the attested provider will return + /// the projected policy carried by the latest verified envelope. + async fn get_effective_policy( + &self, + sandbox_id: &str, + ) -> Result, PolicyError>; + + /// Coarse gate: does this provider permit any mutation to policy state? + /// + /// "Mutation" here means **both** the canonical RPC mutators + /// (`set_policy`, `update_policy`, `delete_policy`) and the draft-chunk + /// approval surface added by the agentic approval loop (the `*DraftChunk` + /// handlers in `grpc/policy.rs`). The gRPC layer calls this first — before + /// any DB read or write — so an alternate provider can refuse the entire + /// write surface without per-RPC trait methods. Default: `Unsupported`. + /// + /// Rationale: per-op overrides (`set_policy` etc.) remain the natural + /// extension point for *what work happens* once a mutation is permitted; + /// `permits_mutation` is the coarse gate that lets the forthcoming + /// `AttestedPolicyProvider` (whose authoritative policy is fed by an + /// off-host signed bundle and has no in-band mutation semantics at all) + /// refuse everything by inheriting this default. See the APP + /// implementation plan W-B section ("permits_mutation") for the design + /// alternatives that were considered and rejected. + async fn permits_mutation(&self) -> Result<(), PolicyError> { + Err(PolicyError::Unsupported { + policy_type: self.id(), + operation: "mutation", + }) + } + + /// Replace the policy for a sandbox. Default: `Unsupported`. + async fn set_policy( + &self, + _ctx: &SetSandboxPolicyCtx, + ) -> Result { + Err(PolicyError::Unsupported { + policy_type: self.id(), + operation: "set_policy", + }) + } + + /// Apply a sequence of incremental merge operations to a sandbox's + /// policy. Default: `Unsupported`. + async fn update_policy( + &self, + _ctx: &UpdateSandboxPolicyCtx, + ) -> Result { + Err(PolicyError::Unsupported { + policy_type: self.id(), + operation: "update_policy", + }) + } + + /// Delete the global policy. The local provider implements this against + /// the global-policy sandbox-id sentinel; remote providers may refuse. + /// Default: `Unsupported`. + async fn delete_policy( + &self, + _ctx: &DeleteGlobalPolicyCtx, + ) -> Result { + Err(PolicyError::Unsupported { + policy_type: self.id(), + operation: "delete_policy", + }) + } +} + +// --------------------------------------------------------------------------- +// Registry +// --------------------------------------------------------------------------- + +/// Resolves policy-type-id strings to a registered [`PolicyProvider`]. +/// Mirrors `openshell_providers::ProviderRegistry` so future providers can +/// be added without changing the wiring at startup. +#[derive(Default)] +pub struct PolicyProviderRegistry { + providers: HashMap<&'static str, Arc>, +} + +impl std::fmt::Debug for PolicyProviderRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PolicyProviderRegistry") + .field("providers", &self.providers.keys().collect::>()) + .finish() + } +} + +impl PolicyProviderRegistry { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + pub fn register

(&mut self, provider: P) + where + P: PolicyProvider + 'static, + { + self.providers.insert(provider.id(), Arc::new(provider)); + } + + #[must_use] + pub fn get(&self, id: &str) -> Option> { + self.providers.get(id).cloned() + } + + /// Registered policy-type ids, sorted. Used for diagnostic messages + /// when a configured policy type is not found; kept on the registry + /// surface even though no caller exercises it in v0 because it mirrors + /// `ProviderRegistry::known_types` and the next session's + /// `AttestedPolicyProvider` integration will consume it. + #[allow(dead_code)] // see doc comment + #[must_use] + pub fn known_policy_types(&self) -> Vec<&'static str> { + let mut ids: Vec<_> = self.providers.keys().copied().collect(); + ids.sort_unstable(); + ids + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + /// A bare-bones provider with no method overrides. Used to confirm the + /// trait's default `Unsupported` impls fire for the three mutators. + #[derive(Debug)] + struct StubProvider; + + #[async_trait] + impl PolicyProvider for StubProvider { + fn id(&self) -> &'static str { + "stub" + } + + async fn get_effective_policy( + &self, + _sandbox_id: &str, + ) -> Result, PolicyError> { + Ok(None) + } + } + + #[tokio::test] + async fn stub_provider_permits_mutation_returns_unsupported() { + let p = StubProvider; + let err = p + .permits_mutation() + .await + .expect_err("default impl must error"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "stub", + operation: "mutation" + } + )); + } + + #[tokio::test] + async fn stub_provider_set_policy_returns_unsupported() { + let p = StubProvider; + let err = p + .set_policy(&SetSandboxPolicyCtx { + sandbox_id: "sb".into(), + sandbox_name: "sb".into(), + expected_resource_version: 0, + policy: openshell_core::proto::SandboxPolicy::default(), + }) + .await + .expect_err("default impl must error"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "stub", + operation: "set_policy" + } + )); + } + + #[tokio::test] + async fn stub_provider_update_policy_returns_unsupported() { + let p = StubProvider; + let err = p + .update_policy(&UpdateSandboxPolicyCtx { + sandbox_id: "sb".into(), + sandbox_name: "sb".into(), + merge_operations: vec![], + baseline_policy: None, + }) + .await + .expect_err("default impl must error"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "stub", + operation: "update_policy" + } + )); + } + + #[tokio::test] + async fn stub_provider_delete_policy_returns_unsupported() { + let p = StubProvider; + let err = p + .delete_policy(&DeleteGlobalPolicyCtx { + global_policy_sandbox_id: "__global__".into(), + }) + .await + .expect_err("default impl must error"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "stub", + operation: "delete_policy" + } + )); + } + + #[test] + fn registry_lookup_returns_registered_provider() { + let mut reg = PolicyProviderRegistry::new(); + reg.register(StubProvider); + let resolved = reg.get("stub").expect("registered provider resolves"); + assert_eq!(resolved.id(), "stub"); + assert!(reg.get("nonexistent").is_none()); + assert_eq!(reg.known_policy_types(), vec!["stub"]); + } +}