Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions rs/consensus/src/consensus/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ impl FinalizerMetrics {
self.canister_http_success_delivered
.with_label_values(&["non_replicated"])
.inc_by(batch_stats.canister_http.single_signature_responses as u64);
self.canister_http_success_delivered
.with_label_values(&["flexible"])
.inc_by(batch_stats.canister_http.flexible_ok_responses as u64);
self.canister_http_timeouts_delivered
.inc_by(batch_stats.canister_http.timeouts as u64);
self.canister_http_divergences_delivered
Expand Down
2 changes: 2 additions & 0 deletions rs/https_outcalls/consensus/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ DEPENDENCIES = [
"//rs/registry/helpers",
"//rs/registry/subnet_type",
"//rs/replicated_state",
"//rs/types/management_canister_types",
"//rs/types/types",
"//rs/utils",
"@crate_index//:candid",
"@crate_index//:hex",
"@crate_index//:prometheus",
"@crate_index//:rand",
Expand Down
2 changes: 2 additions & 0 deletions rs/https_outcalls/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ description.workspace = true
documentation.workspace = true

[dependencies]
candid = { workspace = true }
hex = { workspace = true }
ic-consensus-utils = { path = "../../consensus/utils" }
ic-error-types = { path = "../../../packages/ic-error-types" }
Expand All @@ -15,6 +16,7 @@ ic-interfaces-adapter-client = { path = "../../interfaces/adapter_client" }
ic-interfaces-registry = { path = "../../interfaces/registry" }
ic-interfaces-state-manager = { path = "../../interfaces/state_manager" }
ic-logger = { path = "../../monitoring/logger" }
ic-management-canister-types-private = { path = "../../types/management_canister_types" }
ic-metrics = { path = "../../monitoring/metrics" }
ic-protobuf = { path = "../../protobuf" }
ic-registry-client-helpers = { path = "../../registry/helpers" }
Expand Down
51 changes: 48 additions & 3 deletions rs/https_outcalls/consensus/src/payload_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
},
},
};
use candid::{Decode, Encode};
use ic_consensus_utils::{
crypto::ConsensusCrypto, membership::Membership, registry_version_at_height,
};
Expand All @@ -27,13 +28,17 @@ use ic_interfaces::{
use ic_interfaces_registry::RegistryClient;
use ic_interfaces_state_manager::StateReader;
use ic_logger::{ReplicaLogger, warn};
use ic_management_canister_types_private::{
CanisterHttpResponsePayload, FlexibleHttpRequestResult,
};
use ic_metrics::MetricsRegistry;
use ic_registry_client_helpers::subnet::SubnetRegistry;
use ic_replicated_state::ReplicatedState;
use ic_types::{
CountBytes, Height, NodeId, NumBytes, RegistryVersion, SubnetId,
batch::{
CanisterHttpPayload, ConsensusResponse, MAX_CANISTER_HTTP_PAYLOAD_SIZE, ValidationContext,
CanisterHttpPayload, ConsensusResponse, FlexibleCanisterHttpResponses,
MAX_CANISTER_HTTP_PAYLOAD_SIZE, ValidationContext,
},
canister_http::{
CANISTER_HTTP_MAX_RESPONSES_PER_BLOCK, CANISTER_HTTP_TIMEOUT_INTERVAL,
Expand Down Expand Up @@ -69,6 +74,7 @@ pub struct CanisterHttpBatchStats {
pub timeouts: usize,
pub divergence_responses: usize,
pub single_signature_responses: usize,
pub flexible_ok_responses: usize,
pub payload_bytes: usize,
}

Expand Down Expand Up @@ -850,21 +856,60 @@ impl IntoMessages<(Vec<ConsensusResponse>, CanisterHttpBatchStats)>
)
});

let divergece_responses = messages
let divergence_responses = messages
.divergence_responses
.iter()
.filter_map(divergence_response_into_reject)
.inspect(|_| stats.divergence_responses += 1);

let flexible_ok_responses = messages
.flexible_responses
.into_iter()
.filter_map(flexible_ok_responses_into_consensus_response)
.inspect(|_| stats.flexible_ok_responses += 1);

let responses = responses
.chain(timeouts)
.chain(divergece_responses)
.chain(divergence_responses)
.chain(flexible_ok_responses)
.collect();

(responses, stats)
}
}

/// Converts a [`FlexibleCanisterHttpResponses`] into a [`ConsensusResponse`].
///
/// Returns `None` if Candid decoding/encoding fails, which leads to skipping
/// the delivery of this response. This should never occur, but if it does,
/// eventually a timeout will gracefully end the outstanding callback.
Comment thread
fspreiss marked this conversation as resolved.
fn flexible_ok_responses_into_consensus_response(
response_group: FlexibleCanisterHttpResponses,
) -> Option<ConsensusResponse> {
let payloads: Vec<_> = response_group
.responses
.into_iter()
.filter_map(|entry| match entry.response.content {
CanisterHttpResponseContent::Success(data) => {
Some(Decode!(&data, CanisterHttpResponsePayload).ok())
}
CanisterHttpResponseContent::Reject(_) => {
// Unreachable: payload building/validation ensure
// that there are no rejects in the ok-responses.
None
}
})
// Decoding errors short-circuit the collection and None is returned.
.collect::<Option<_>>()?;

let bytes = Encode!(&FlexibleHttpRequestResult::Ok(payloads)).ok()?;
Comment thread
eichhorl marked this conversation as resolved.

Some(ConsensusResponse::new(
response_group.callback_id,
Payload::Data(bytes),
))
}

/// Turns a [`CanisterHttpResponseDivergence`] into a [`ConsensusResponse`] containing a rejection.
///
/// This function generates a detailed error message.
Expand Down
150 changes: 149 additions & 1 deletion rs/https_outcalls/consensus/src/payload_builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use crate::payload_builder::{
parse::{bytes_to_payload, payload_to_bytes},
};
use assert_matches::assert_matches;
use candid::{Decode, Encode};
use ic_artifact_pool::canister_http_pool::CanisterHttpPoolImpl;
use ic_consensus_mocks::{Dependencies, dependencies_with_subnet_params};
use ic_error_types::RejectCode;
use ic_interfaces::{
batch_payload::{BatchPayloadBuilder, PastPayload, ProposalContext},
batch_payload::{BatchPayloadBuilder, IntoMessages, PastPayload, ProposalContext},
canister_http::{
CanisterHttpChangeAction, CanisterHttpChangeSet, CanisterHttpPayloadValidationFailure,
InvalidCanisterHttpPayloadReason,
Expand All @@ -24,6 +25,9 @@ use ic_interfaces::{
};
use ic_interfaces_mocks::crypto::MockCrypto;
use ic_logger::replica_logger::no_op_logger;
use ic_management_canister_types_private::{
CanisterHttpResponsePayload, FlexibleHttpRequestResult, HttpHeader,
};
use ic_metrics::MetricsRegistry;
use ic_registry_subnet_features::SubnetFeatures;
use ic_test_utilities::state_manager::RefMockStateManager;
Expand Down Expand Up @@ -2562,6 +2566,150 @@ fn flexible_invalid_signature_error() {
);
}

#[test]
fn flexible_ok_responses_into_messages_success_round_trip() {
let callback_id = CallbackId::from(42);

let payload_a = CanisterHttpResponsePayload {
status: 200,
headers: vec![HttpHeader {
name: "content-type".to_string(),
value: "text/plain".to_string(),
}],
body: b"hello from node A".to_vec(),
};
let payload_b = CanisterHttpResponsePayload {
status: 201,
headers: vec![],
body: b"hello from node B".to_vec(),
};

let entry_a = flexible_response(42, 0, &Encode!(&payload_a).unwrap());
let entry_b = flexible_response(42, 1, &Encode!(&payload_b).unwrap());

let payload = flexible_payload(vec![FlexibleCanisterHttpResponses {
callback_id,
responses: vec![entry_a, entry_b],
}]);
let bytes = payload_to_bytes_max_4mb(payload);

let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes);

assert_eq!(responses.len(), 1);
assert_eq!(responses[0].callback, callback_id);
let Payload::Data(ref data) = responses[0].payload else {
panic!("Expected Payload::Data, got {:?}", responses[0].payload);
};
let result = Decode!(data, FlexibleHttpRequestResult).unwrap();
let FlexibleHttpRequestResult::Ok(payloads) = result else {
panic!("Expected Ok variant, got {result:?}");
};
assert_eq!(payloads.len(), 2);
assert_eq!(payloads[0], payload_a);
assert_eq!(payloads[1], payload_b);
assert_eq!(stats.flexible_ok_responses, 1);
}

#[test]
fn flexible_ok_responses_into_messages_skips_reject_entries() {
let callback_id = CallbackId::from(99);

let good_payload = CanisterHttpResponsePayload {
status: 200,
headers: vec![],
body: b"ok".to_vec(),
};
let success_entry = flexible_response(99, 0, &Encode!(&good_payload).unwrap());

let (reject_response, reject_metadata) = test_response_and_metadata_with_content(
99,
CanisterHttpResponseContent::Reject(ic_types::canister_http::CanisterHttpReject {
reject_code: RejectCode::SysTransient,
message: "adapter error".to_string(),
}),
);
let reject_entry = FlexibleCanisterHttpResponseWithProof {
response: reject_response,
proof: metadata_to_share(1, &reject_metadata),
};

let payload = flexible_payload(vec![FlexibleCanisterHttpResponses {
callback_id,
responses: vec![success_entry, reject_entry],
}]);
let bytes = payload_to_bytes_max_4mb(payload);

let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes);

assert_eq!(responses.len(), 1);
let Payload::Data(ref data) = responses[0].payload else {
panic!("Expected Payload::Data");
};
let result = Decode!(data, FlexibleHttpRequestResult).unwrap();
let FlexibleHttpRequestResult::Ok(payloads) = result else {
panic!("Expected Ok variant, got {result:?}");
};
assert_eq!(payloads.len(), 1, "Reject entry should be filtered out");
assert_eq!(payloads[0], good_payload);
assert_eq!(stats.flexible_ok_responses, 1);
}

#[test]
fn flexible_ok_responses_into_messages_stats_count_multiple_groups() {
let payload_data = Encode!(&CanisterHttpResponsePayload {
status: 200,
headers: vec![],
body: vec![],
})
.unwrap();

let group_a = FlexibleCanisterHttpResponses {
callback_id: CallbackId::from(1),
responses: vec![flexible_response(1, 0, &payload_data)],
};
let group_b = FlexibleCanisterHttpResponses {
callback_id: CallbackId::from(2),
responses: vec![flexible_response(2, 1, &payload_data)],
};
let group_c = FlexibleCanisterHttpResponses {
callback_id: CallbackId::from(3),
responses: vec![flexible_response(3, 2, &payload_data)],
};

let payload = flexible_payload(vec![group_a, group_b, group_c]);
let bytes = payload_to_bytes_max_4mb(payload);

let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes);

assert_eq!(responses.len(), 3);
assert_eq!(stats.flexible_ok_responses, 3);
}

#[test]
fn flexible_ok_responses_into_messages_decode_failure_is_skipped() {
let callback_id = CallbackId::from(42);

let valid_data = Encode!(&CanisterHttpResponsePayload {
status: 200,
headers: vec![],
body: vec![],
})
.unwrap();
let valid_entry = flexible_response(42, 0, &valid_data);
let invalid_entry = flexible_response(42, 1, b"this is invalid candid");

let payload = flexible_payload(vec![FlexibleCanisterHttpResponses {
callback_id,
responses: vec![valid_entry, invalid_entry],
}]);
let bytes = payload_to_bytes_max_4mb(payload);

let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes);

assert_eq!(responses.len(), 0);
assert_eq!(stats.flexible_ok_responses, 0);
}

fn setup_test_with_contexts(
num_nodes: usize,
contexts: Vec<(CallbackId, CanisterHttpRequestContext)>,
Expand Down
Loading