Skip to content

Commit

Permalink
fix: Record 'dropped' status for silently dropped best-effort respons…
Browse files Browse the repository at this point in the history
…es. (#3162)

The assumption that `push_input()` returning `Ok(())` implies that the
message was successfully inducted is correct for guaranteed responses
(otherwise it's a critical error) but not for best-effort responses
since those may be dropped silently. It is therefore necessary to
distinguish between these two cases by returning `Ok(true/false)`
instead to record successful induction in the metrics only for actual
inductions and 'dropped' otherwise.
  • Loading branch information
stiegerc authored Dec 16, 2024
1 parent 8081c04 commit 5f9c28b
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 242 deletions.
1 change: 1 addition & 0 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,7 @@ impl SchedulerImpl {
state.metadata.own_subnet_type,
InputQueueType::LocalSubnet,
)
.map(|_| ())
.map_err(|(err, msg)| {
error!(
self.log,
Expand Down
1 change: 1 addition & 0 deletions rs/messaging/src/routing/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl StreamBuilderImpl {
// Arbitrary large amount, pushing a response always returns memory.
&mut (i64::MAX / 2),
)
.map(|_| ())
.unwrap_or_else(|(err, response)| {
// Local request, we should never get a `CanisterNotFound`, `CanisterStopped` or
// `NonMatchingResponse` error.
Expand Down
4 changes: 2 additions & 2 deletions rs/messaging/src/routing/stream_builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,14 +1038,14 @@ fn consume_output_queues(state: &ReplicatedState) -> ReplicatedState {
/// Pushes the message into the given canister's corresponding input queue.
fn push_input(canister_state: &mut CanisterState, msg: RequestOrResponse) {
let mut subnet_available_memory = 1 << 30;
canister_state
assert!(canister_state
.push_input(
msg,
&mut subnet_available_memory,
SubnetType::Application,
InputQueueType::RemoteSubnet,
)
.unwrap()
.unwrap());
}

/// Asserts that the values of the `METRIC_ROUTED_MESSAGES` metric
Expand Down
8 changes: 7 additions & 1 deletion rs/messaging/src/routing/stream_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const METRIC_XNET_MESSAGE_BACKLOG: &str = "mr_xnet_message_backlog";

const LABEL_STATUS: &str = "status";
const LABEL_VALUE_SUCCESS: &str = "success";
const LABEL_VALUE_DROPPED: &str = "dropped";
const LABEL_VALUE_SENDER_SUBNET_MISMATCH: &str = "SenderSubnetMismatch";
const LABEL_VALUE_RECEIVER_SUBNET_MISMATCH: &str = "ReceiverSubnetMismatch";
const LABEL_VALUE_REQUEST_MISROUTED: &str = "RequestMisrouted";
Expand Down Expand Up @@ -809,11 +810,16 @@ impl StreamHandlerImpl {
Some(host_subnet) if host_subnet == self.subnet_id => {
match state.push_input(msg, available_guaranteed_response_memory) {
// Message successfully inducted, all done.
Ok(()) => {
Ok(true) => {
self.observe_inducted_message_status(msg_type, LABEL_VALUE_SUCCESS);
self.observe_inducted_payload_size(payload_size);
}

// Message silently dropped, all done.
Ok(false) => {
self.observe_inducted_message_status(msg_type, LABEL_VALUE_DROPPED);
}

// Message not inducted.
Err((err, msg)) => {
self.observe_inducted_message_status(msg_type, err.to_label_value());
Expand Down
117 changes: 87 additions & 30 deletions rs/messaging/src/routing/stream_handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use ic_test_utilities_types::ids::{user_test_id, SUBNET_12, SUBNET_23, SUBNET_27
use ic_test_utilities_types::messages::{RequestBuilder, ResponseBuilder};
use ic_test_utilities_types::xnet::StreamHeaderBuilder;
use ic_types::messages::{CallbackId, Payload, MAX_RESPONSE_COUNT_BYTES, NO_DEADLINE};
use ic_types::time::UNIX_EPOCH;
use ic_types::time::{CoarseTime, UNIX_EPOCH};
use ic_types::xnet::{RejectReason, RejectSignal, StreamFlags, StreamIndexedQueue};
use ic_types::{CanisterId, CountBytes, Cycles};
use lazy_static::lazy_static;
Expand Down Expand Up @@ -1945,8 +1945,7 @@ fn check_stream_handler_generated_reject_signal_canister_stopped() {
i64::MAX / 2, // `available_guaranteed_response_memory`
&|state| {
state
.canister_states
.get_mut(&LOCAL_CANISTER)
.canister_state_mut(&LOCAL_CANISTER)
.unwrap()
.system_state
.set_status(CanisterStatus::Stopped);
Expand All @@ -1961,8 +1960,7 @@ fn check_stream_handler_generated_reject_signal_canister_stopping() {
i64::MAX / 2, // `available_guaranteed_response_memory`
&|state| {
state
.canister_states
.get_mut(&LOCAL_CANISTER)
.canister_state_mut(&LOCAL_CANISTER)
.unwrap()
.system_state
.set_status(CanisterStatus::Stopping {
Expand All @@ -1980,11 +1978,14 @@ fn check_stream_handler_generated_reject_signal_queue_full() {
i64::MAX / 2, // `available_guaranteed_response_memory`
&|state| {
let mut callback_id = 2;
while let Ok(()) = state.push_input(
Request(*LOCAL_CANISTER, *LOCAL_CANISTER)
.build_with(CallbackId::new(callback_id), 0),
&mut (i64::MAX / 2),
) {
while state
.push_input(
Request(*LOCAL_CANISTER, *LOCAL_CANISTER)
.build_with(CallbackId::new(callback_id), 0),
&mut (i64::MAX / 2),
)
.is_ok()
{
callback_id += 1;
}
},
Expand Down Expand Up @@ -2017,6 +2018,49 @@ fn check_stream_handler_generated_reject_signal_canister_migrating() {
);
}

#[test]
fn duplicate_best_effort_response_is_dropped() {
with_local_test_setup(
btreemap![LOCAL_SUBNET => StreamConfig {
begin: 21,
messages: vec![BestEffortResponse(*LOCAL_CANISTER, *LOCAL_CANISTER, CoarseTime::from_secs_since_unix_epoch(123))],
signals_end: 21,
..StreamConfig::default()
}],
|stream_handler, mut state, metrics| {
let response = message_in_stream(state.get_stream(&LOCAL_SUBNET), 21).clone();

let mut expected_state = state.clone();
// The expected state has the response inducted...
push_input(&mut expected_state, response.clone());
// ...and an empty loopback stream with begin advanced.
let loopback_stream = stream_from_config(StreamConfig {
begin: 23,
signals_end: 23,
..StreamConfig::default()
});
expected_state.with_streams(btreemap![LOCAL_SUBNET => loopback_stream]);

// Push the clone of the best effort response onto the loopback stream.
state.modify_streams(|streams| streams.get_mut(&LOCAL_SUBNET).unwrap().push(response));

let inducted_state = stream_handler.induct_loopback_stream(state, &mut (i64::MAX / 2));
assert_eq!(inducted_state, expected_state);

// No critical errors raised.
metrics.assert_eq_critical_errors(CriticalErrorCounts::default());
// Only one response was recorded by the metrics.
metrics.assert_inducted_xnet_messages_eq(&[
// Response @21 is inducted successfully.
(LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_SUCCESS, 1),
// Duplicate Response @22 is dropped.
(LABEL_VALUE_TYPE_RESPONSE, LABEL_VALUE_DROPPED, 1),
]);
assert_eq!(1, metrics.fetch_inducted_payload_sizes_stats().count);
},
);
}

// TODO: Remove legacy tests once certification versions < V19 can be phased out safely.
/// Common implementation for tests checking reject responses generated by the `StreamHandler`
/// directly.
Expand Down Expand Up @@ -2073,15 +2117,10 @@ fn legacy_check_stream_handler_generated_reject_response_canister_stopped() {
i64::MAX / 2, // `available_guaranteed_response_memory`
&|state| {
state
.canister_states
.get_mut(&LOCAL_CANISTER)
.canister_state_mut(&LOCAL_CANISTER)
.unwrap()
.system_state = SystemState::new_stopped_for_testing(
*LOCAL_CANISTER,
PrincipalId::default(),
Cycles::new(u128::MAX / 2),
NumSeconds::from(0),
);
.system_state
.set_status(CanisterStatus::Stopped);
},
RejectReason::CanisterStopped,
);
Expand All @@ -2094,8 +2133,7 @@ fn legacy_check_stream_handler_generated_reject_response_canister_stopping() {
i64::MAX / 2, // `available_guaranteed_response_memory`
&|state| {
state
.canister_states
.get_mut(&LOCAL_CANISTER)
.canister_state_mut(&LOCAL_CANISTER)
.unwrap()
.system_state = SystemState::new_stopping_for_testing(
*LOCAL_CANISTER,
Expand All @@ -2115,11 +2153,14 @@ fn legacy_check_stream_handler_generated_reject_response_queue_full() {
i64::MAX / 2, // `available_guaranteed_response_memory`
&|state| {
let mut callback_id = 2;
while let Ok(()) = state.push_input(
Request(*LOCAL_CANISTER, *LOCAL_CANISTER)
.build_with(CallbackId::new(callback_id), 0),
&mut (i64::MAX / 2),
) {
while state
.push_input(
Request(*LOCAL_CANISTER, *LOCAL_CANISTER)
.build_with(CallbackId::new(callback_id), 0),
&mut (i64::MAX / 2),
)
.is_ok()
{
callback_id += 1;
}
},
Expand Down Expand Up @@ -3896,10 +3937,15 @@ fn with_test_setup_and_config(
.into_iter()
.enumerate()
.map(|(payload_size_bytes, builder)| {
let (respondent, originator) = match builder {
Request(sender, receiver) => (receiver, sender),
Response(respondent, originator) => (respondent, originator),
RejectResponse(respondent, originator, _) => (respondent, originator),
let (respondent, originator, deadline) = match builder {
Request(sender, receiver) => (receiver, sender, NO_DEADLINE),
Response(respondent, originator) => (respondent, originator, NO_DEADLINE),
BestEffortResponse(respondent, originator, deadline) => {
(respondent, originator, deadline)
}
RejectResponse(respondent, originator, _) => {
(respondent, originator, NO_DEADLINE)
}
};

// Register a callback and make an input queue reservation if `msg_config`
Expand All @@ -3910,7 +3956,7 @@ fn with_test_setup_and_config(
&mut canister_state,
originator,
respondent,
NO_DEADLINE,
deadline,
);

// Make an input queue reservation.
Expand All @@ -3920,6 +3966,7 @@ fn with_test_setup_and_config(
.sender(originator)
.receiver(respondent)
.sender_reply_callback(callback_id)
.deadline(deadline)
.build()
.into(),
UNIX_EPOCH,
Expand Down Expand Up @@ -4247,6 +4294,8 @@ enum MessageBuilder {
Request(CanisterId, CanisterId),
// `(respondent, originator)`
Response(CanisterId, CanisterId),
// `(respondent, originator, deadline)`
BestEffortResponse(CanisterId, CanisterId, CoarseTime),
// `(respondent, originator, reason)`
RejectResponse(CanisterId, CanisterId, RejectReason),
}
Expand All @@ -4268,6 +4317,14 @@ impl MessageBuilder {
.response_payload(Payload::Data(vec![0_u8; payload_size_bytes]))
.build()
.into(),
Self::BestEffortResponse(respondent, originator, deadline) => ResponseBuilder::new()
.respondent(respondent)
.originator(originator)
.originator_reply_callback(callback_id)
.response_payload(Payload::Data(vec![0_u8; payload_size_bytes]))
.deadline(deadline)
.build()
.into(),
Self::RejectResponse(respondent, originator, reason) => generate_reject_response_for(
reason,
&RequestBuilder::new()
Expand Down
2 changes: 1 addition & 1 deletion rs/replicated_state/src/canister_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl CanisterState {
subnet_available_memory: &mut i64,
own_subnet_type: SubnetType,
input_queue_type: InputQueueType,
) -> Result<(), (StateError, RequestOrResponse)> {
) -> Result<bool, (StateError, RequestOrResponse)> {
self.system_state.push_input(
msg,
subnet_available_memory,
Expand Down
26 changes: 15 additions & 11 deletions rs/replicated_state/src/canister_state/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,20 +694,23 @@ impl CanisterQueues {

/// Enqueues a canister-to-canister message into the induction pool.
///
/// If the message is a `Request` and is enqueued successfully, this will also
/// reserve a slot in the corresponding output queue for the eventual response.
/// If the message is a `Request` and it is enqueued successfully `Ok(true)` is
/// returned; and a slot is reserved in the corresponding output queue for the
/// eventual response.
///
/// If the message is a `Response`, `SystemState` will have already checked for
/// a matching callback:
///
/// * If this is a guaranteed `Response`, the protocol should have reserved a
/// slot for it, so the push should not fail for lack of one (although an
/// error may be returned in case of a bug in the upper layers).
/// error may be returned in case of a bug in the upper layers) and `Ok(true)`
/// is returned.
/// * If this is a best-effort `Response`, a slot is available and no duplicate
/// (time out) response is already enqueued, it is enqueued.
/// (time out) response is already enqueued, it is enqueued and `Ok(true)` is
/// returned.
/// * If this is a best-effort `Response` and a duplicate (time out) response
/// is already enqueued (which is implicitly true when no slot is available),
/// the response is silently dropped and `Ok(())` is returned.
/// the response is silently dropped and `Ok(false)` is returned.
///
/// If the message was enqueued, adds the sender to the appropriate input
/// schedule (local or remote), if not already there.
Expand All @@ -726,7 +729,7 @@ impl CanisterQueues {
&mut self,
msg: RequestOrResponse,
input_queue_type: InputQueueType,
) -> Result<(), (StateError, RequestOrResponse)> {
) -> Result<bool, (StateError, RequestOrResponse)> {
let sender = msg.sender();
let input_queue = match msg {
RequestOrResponse::Request(_) => {
Expand Down Expand Up @@ -762,7 +765,7 @@ impl CanisterQueues {
));
} else {
// But it's OK for a best-effort response. Silently drop it.
return Ok(());
return Ok(false);
}
}
queue
Expand All @@ -785,7 +788,7 @@ impl CanisterQueues {
debug_assert!(self
.callbacks_with_enqueued_response
.contains(&response.originator_reply_callback));
return Ok(());
return Ok(false);
}
}
}
Expand All @@ -807,7 +810,7 @@ impl CanisterQueues {

debug_assert_eq!(Ok(()), self.test_invariants());
debug_assert_eq!(Ok(()), self.schedules_ok(&|_| InputQueueType::RemoteSubnet));
Ok(())
Ok(true)
}

/// Enqueues a "deadline expired" compact response for the given best-effort
Expand Down Expand Up @@ -1124,6 +1127,7 @@ impl CanisterQueues {
deadline: request.deadline,
}));
self.push_input(response, InputQueueType::LocalSubnet)
.map(|_| ())
.map_err(|(e, _msg)| e)
}

Expand Down Expand Up @@ -2056,7 +2060,7 @@ pub mod testing {
&mut self,
msg: RequestOrResponse,
input_queue_type: InputQueueType,
) -> Result<(), (StateError, RequestOrResponse)>;
) -> Result<bool, (StateError, RequestOrResponse)>;

/// Publicly exposes the local sender input_schedule.
fn local_sender_schedule(&self) -> &VecDeque<CanisterId>;
Expand Down Expand Up @@ -2089,7 +2093,7 @@ pub mod testing {
&mut self,
msg: RequestOrResponse,
input_queue_type: InputQueueType,
) -> Result<(), (StateError, RequestOrResponse)> {
) -> Result<bool, (StateError, RequestOrResponse)> {
self.push_input(msg, input_queue_type)
}

Expand Down
Loading

0 comments on commit 5f9c28b

Please sign in to comment.