diff --git a/Cargo.lock b/Cargo.lock index 5fdc8e22e3f5..b82231ce04f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10160,6 +10160,7 @@ name = "ic-https-outcalls-consensus" version = "0.9.0" dependencies = [ "assert_matches", + "candid", "hex", "ic-artifact-pool", "ic-consensus-mocks", @@ -10171,6 +10172,7 @@ dependencies = [ "ic-interfaces-registry", "ic-interfaces-state-manager", "ic-logger", + "ic-management-canister-types-private", "ic-metrics", "ic-protobuf", "ic-registry-client-helpers", diff --git a/rs/consensus/src/consensus/metrics.rs b/rs/consensus/src/consensus/metrics.rs index 5df0ff7dec96..10129ada5828 100644 --- a/rs/consensus/src/consensus/metrics.rs +++ b/rs/consensus/src/consensus/metrics.rs @@ -304,6 +304,9 @@ impl FinalizerMetrics { self.canister_http_success_delivered .with_label_values(&["non_replicated"]) .inc_by(batch_stats.canister_http.single_signature_responses as u64); + self.canister_http_success_delivered + .with_label_values(&["flexible"]) + .inc_by(batch_stats.canister_http.flexible_ok_responses as u64); self.canister_http_timeouts_delivered .inc_by(batch_stats.canister_http.timeouts as u64); self.canister_http_divergences_delivered diff --git a/rs/https_outcalls/consensus/BUILD.bazel b/rs/https_outcalls/consensus/BUILD.bazel index 51533f899f8a..edf6aa6e271c 100644 --- a/rs/https_outcalls/consensus/BUILD.bazel +++ b/rs/https_outcalls/consensus/BUILD.bazel @@ -16,8 +16,10 @@ DEPENDENCIES = [ "//rs/registry/helpers", "//rs/registry/subnet_type", "//rs/replicated_state", + "//rs/types/management_canister_types", "//rs/types/types", "//rs/utils", + "@crate_index//:candid", "@crate_index//:hex", "@crate_index//:prometheus", "@crate_index//:rand", diff --git a/rs/https_outcalls/consensus/Cargo.toml b/rs/https_outcalls/consensus/Cargo.toml index 2ec74ed63239..20b031f3535e 100644 --- a/rs/https_outcalls/consensus/Cargo.toml +++ b/rs/https_outcalls/consensus/Cargo.toml @@ -7,6 +7,7 @@ description.workspace = true documentation.workspace = true [dependencies] +candid = { workspace = true } hex = { workspace = true } ic-consensus-utils = { path = "../../consensus/utils" } ic-error-types = { path = "../../../packages/ic-error-types" } @@ -15,6 +16,7 @@ ic-interfaces-adapter-client = { path = "../../interfaces/adapter_client" } ic-interfaces-registry = { path = "../../interfaces/registry" } ic-interfaces-state-manager = { path = "../../interfaces/state_manager" } ic-logger = { path = "../../monitoring/logger" } +ic-management-canister-types-private = { path = "../../types/management_canister_types" } ic-metrics = { path = "../../monitoring/metrics" } ic-protobuf = { path = "../../protobuf" } ic-registry-client-helpers = { path = "../../registry/helpers" } diff --git a/rs/https_outcalls/consensus/src/payload_builder.rs b/rs/https_outcalls/consensus/src/payload_builder.rs index 61d5b4bed932..d2f33ed247f8 100644 --- a/rs/https_outcalls/consensus/src/payload_builder.rs +++ b/rs/https_outcalls/consensus/src/payload_builder.rs @@ -10,6 +10,7 @@ use crate::{ }, }, }; +use candid::{Decode, Encode}; use ic_consensus_utils::{ crypto::ConsensusCrypto, membership::Membership, registry_version_at_height, }; @@ -27,13 +28,17 @@ use ic_interfaces::{ use ic_interfaces_registry::RegistryClient; use ic_interfaces_state_manager::StateReader; use ic_logger::{ReplicaLogger, warn}; +use ic_management_canister_types_private::{ + CanisterHttpResponsePayload, FlexibleHttpRequestResult, +}; use ic_metrics::MetricsRegistry; use ic_registry_client_helpers::subnet::SubnetRegistry; use ic_replicated_state::ReplicatedState; use ic_types::{ CountBytes, Height, NodeId, NumBytes, RegistryVersion, SubnetId, batch::{ - CanisterHttpPayload, ConsensusResponse, MAX_CANISTER_HTTP_PAYLOAD_SIZE, ValidationContext, + CanisterHttpPayload, ConsensusResponse, FlexibleCanisterHttpResponses, + MAX_CANISTER_HTTP_PAYLOAD_SIZE, ValidationContext, }, canister_http::{ CANISTER_HTTP_MAX_RESPONSES_PER_BLOCK, CANISTER_HTTP_TIMEOUT_INTERVAL, @@ -69,6 +74,7 @@ pub struct CanisterHttpBatchStats { pub timeouts: usize, pub divergence_responses: usize, pub single_signature_responses: usize, + pub flexible_ok_responses: usize, pub payload_bytes: usize, } @@ -850,21 +856,60 @@ impl IntoMessages<(Vec, CanisterHttpBatchStats)> ) }); - let divergece_responses = messages + let divergence_responses = messages .divergence_responses .iter() .filter_map(divergence_response_into_reject) .inspect(|_| stats.divergence_responses += 1); + let flexible_ok_responses = messages + .flexible_responses + .into_iter() + .filter_map(flexible_ok_responses_into_consensus_response) + .inspect(|_| stats.flexible_ok_responses += 1); + let responses = responses .chain(timeouts) - .chain(divergece_responses) + .chain(divergence_responses) + .chain(flexible_ok_responses) .collect(); (responses, stats) } } +/// Converts a [`FlexibleCanisterHttpResponses`] into a [`ConsensusResponse`]. +/// +/// Returns `None` if Candid decoding/encoding fails, which leads to skipping +/// the delivery of this response. This should never occur, but if it does, +/// eventually a timeout will gracefully end the outstanding callback. +fn flexible_ok_responses_into_consensus_response( + response_group: FlexibleCanisterHttpResponses, +) -> Option { + let payloads: Vec<_> = response_group + .responses + .into_iter() + .filter_map(|entry| match entry.response.content { + CanisterHttpResponseContent::Success(data) => { + Some(Decode!(&data, CanisterHttpResponsePayload).ok()) + } + CanisterHttpResponseContent::Reject(_) => { + // Unreachable: payload building/validation ensure + // that there are no rejects in the ok-responses. + None + } + }) + // Decoding errors short-circuit the collection and None is returned. + .collect::>()?; + + let bytes = Encode!(&FlexibleHttpRequestResult::Ok(payloads)).ok()?; + + Some(ConsensusResponse::new( + response_group.callback_id, + Payload::Data(bytes), + )) +} + /// Turns a [`CanisterHttpResponseDivergence`] into a [`ConsensusResponse`] containing a rejection. /// /// This function generates a detailed error message. diff --git a/rs/https_outcalls/consensus/src/payload_builder/tests.rs b/rs/https_outcalls/consensus/src/payload_builder/tests.rs index 7ca409997211..60da62805e14 100644 --- a/rs/https_outcalls/consensus/src/payload_builder/tests.rs +++ b/rs/https_outcalls/consensus/src/payload_builder/tests.rs @@ -9,11 +9,12 @@ use crate::payload_builder::{ parse::{bytes_to_payload, payload_to_bytes}, }; use assert_matches::assert_matches; +use candid::{Decode, Encode}; use ic_artifact_pool::canister_http_pool::CanisterHttpPoolImpl; use ic_consensus_mocks::{Dependencies, dependencies_with_subnet_params}; use ic_error_types::RejectCode; use ic_interfaces::{ - batch_payload::{BatchPayloadBuilder, PastPayload, ProposalContext}, + batch_payload::{BatchPayloadBuilder, IntoMessages, PastPayload, ProposalContext}, canister_http::{ CanisterHttpChangeAction, CanisterHttpChangeSet, CanisterHttpPayloadValidationFailure, InvalidCanisterHttpPayloadReason, @@ -24,6 +25,9 @@ use ic_interfaces::{ }; use ic_interfaces_mocks::crypto::MockCrypto; use ic_logger::replica_logger::no_op_logger; +use ic_management_canister_types_private::{ + CanisterHttpResponsePayload, FlexibleHttpRequestResult, HttpHeader, +}; use ic_metrics::MetricsRegistry; use ic_registry_subnet_features::SubnetFeatures; use ic_test_utilities::state_manager::RefMockStateManager; @@ -2562,6 +2566,150 @@ fn flexible_invalid_signature_error() { ); } +#[test] +fn flexible_ok_responses_into_messages_success_round_trip() { + let callback_id = CallbackId::from(42); + + let payload_a = CanisterHttpResponsePayload { + status: 200, + headers: vec![HttpHeader { + name: "content-type".to_string(), + value: "text/plain".to_string(), + }], + body: b"hello from node A".to_vec(), + }; + let payload_b = CanisterHttpResponsePayload { + status: 201, + headers: vec![], + body: b"hello from node B".to_vec(), + }; + + let entry_a = flexible_response(42, 0, &Encode!(&payload_a).unwrap()); + let entry_b = flexible_response(42, 1, &Encode!(&payload_b).unwrap()); + + let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { + callback_id, + responses: vec![entry_a, entry_b], + }]); + let bytes = payload_to_bytes_max_4mb(payload); + + let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + + assert_eq!(responses.len(), 1); + assert_eq!(responses[0].callback, callback_id); + let Payload::Data(ref data) = responses[0].payload else { + panic!("Expected Payload::Data, got {:?}", responses[0].payload); + }; + let result = Decode!(data, FlexibleHttpRequestResult).unwrap(); + let FlexibleHttpRequestResult::Ok(payloads) = result else { + panic!("Expected Ok variant, got {result:?}"); + }; + assert_eq!(payloads.len(), 2); + assert_eq!(payloads[0], payload_a); + assert_eq!(payloads[1], payload_b); + assert_eq!(stats.flexible_ok_responses, 1); +} + +#[test] +fn flexible_ok_responses_into_messages_skips_reject_entries() { + let callback_id = CallbackId::from(99); + + let good_payload = CanisterHttpResponsePayload { + status: 200, + headers: vec![], + body: b"ok".to_vec(), + }; + let success_entry = flexible_response(99, 0, &Encode!(&good_payload).unwrap()); + + let (reject_response, reject_metadata) = test_response_and_metadata_with_content( + 99, + CanisterHttpResponseContent::Reject(ic_types::canister_http::CanisterHttpReject { + reject_code: RejectCode::SysTransient, + message: "adapter error".to_string(), + }), + ); + let reject_entry = FlexibleCanisterHttpResponseWithProof { + response: reject_response, + proof: metadata_to_share(1, &reject_metadata), + }; + + let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { + callback_id, + responses: vec![success_entry, reject_entry], + }]); + let bytes = payload_to_bytes_max_4mb(payload); + + let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + + assert_eq!(responses.len(), 1); + let Payload::Data(ref data) = responses[0].payload else { + panic!("Expected Payload::Data"); + }; + let result = Decode!(data, FlexibleHttpRequestResult).unwrap(); + let FlexibleHttpRequestResult::Ok(payloads) = result else { + panic!("Expected Ok variant, got {result:?}"); + }; + assert_eq!(payloads.len(), 1, "Reject entry should be filtered out"); + assert_eq!(payloads[0], good_payload); + assert_eq!(stats.flexible_ok_responses, 1); +} + +#[test] +fn flexible_ok_responses_into_messages_stats_count_multiple_groups() { + let payload_data = Encode!(&CanisterHttpResponsePayload { + status: 200, + headers: vec![], + body: vec![], + }) + .unwrap(); + + let group_a = FlexibleCanisterHttpResponses { + callback_id: CallbackId::from(1), + responses: vec![flexible_response(1, 0, &payload_data)], + }; + let group_b = FlexibleCanisterHttpResponses { + callback_id: CallbackId::from(2), + responses: vec![flexible_response(2, 1, &payload_data)], + }; + let group_c = FlexibleCanisterHttpResponses { + callback_id: CallbackId::from(3), + responses: vec![flexible_response(3, 2, &payload_data)], + }; + + let payload = flexible_payload(vec![group_a, group_b, group_c]); + let bytes = payload_to_bytes_max_4mb(payload); + + let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + + assert_eq!(responses.len(), 3); + assert_eq!(stats.flexible_ok_responses, 3); +} + +#[test] +fn flexible_ok_responses_into_messages_decode_failure_is_skipped() { + let callback_id = CallbackId::from(42); + + let valid_data = Encode!(&CanisterHttpResponsePayload { + status: 200, + headers: vec![], + body: vec![], + }) + .unwrap(); + let valid_entry = flexible_response(42, 0, &valid_data); + let invalid_entry = flexible_response(42, 1, b"this is invalid candid"); + + let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { + callback_id, + responses: vec![valid_entry, invalid_entry], + }]); + let bytes = payload_to_bytes_max_4mb(payload); + + let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + + assert_eq!(responses.len(), 0); + assert_eq!(stats.flexible_ok_responses, 0); +} + fn setup_test_with_contexts( num_nodes: usize, contexts: Vec<(CallbackId, CanisterHttpRequestContext)>,