Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Traffic Control] Improved error metrics and benchmarking #20134

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ pub struct ValidatorServiceMetrics {
forwarded_header_parse_error: IntCounter,
forwarded_header_invalid: IntCounter,
forwarded_header_not_included: IntCounter,
client_id_source_config_mismatch: IntCounter,
}

impl ValidatorServiceMetrics {
Expand Down Expand Up @@ -329,6 +330,12 @@ impl ValidatorServiceMetrics {
registry,
)
.unwrap(),
client_id_source_config_mismatch: register_int_counter_with_registry!(
"validator_service_client_id_source_config_mismatch",
"Number of times detected that client id source config doesn't agree with x-forwarded-for header",
registry,
)
.unwrap(),
}
}

Expand Down Expand Up @@ -1225,6 +1232,19 @@ impl ValidatorService {
return None;
}
let contents_len = header_contents.len();
if contents_len < *num_hops {
error!(
"x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
`client-id-source` in the node config.",
header_contents,
contents_len,
num_hops,
contents_len,
);
self.metrics.client_id_source_config_mismatch.inc();
return None;
}
let Some(client_ip) = header_contents.get(contents_len - num_hops)
else {
error!(
Expand Down Expand Up @@ -1296,7 +1316,11 @@ impl ValidatorService {
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
error_info: error.map(|e| {
let error_type = String::from(e.clone().as_ref());
let error_weight = normalize(e);
(error_weight, error_type)
}),
spam_weight,
timestamp: SystemTime::now(),
})
Expand All @@ -1320,8 +1344,10 @@ fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
// TODO: refine error matching here
fn normalize(err: SuiError) -> Weight {
match err {
SuiError::UserInputError { .. }
| SuiError::InvalidSignature { .. }
SuiError::UserInputError {
error: UserInputError::IncorrectUserSignature { .. },
} => Weight::one(),
SuiError::InvalidSignature { .. }
| SuiError::SignerSignatureAbsent { .. }
| SuiError::SignerSignatureNumberMismatch { .. }
| SuiError::IncorrectSigner { .. }
Expand Down
12 changes: 10 additions & 2 deletions crates/sui-core/src/traffic_controller/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use prometheus::{
register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge,
Registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
};

#[derive(Clone)]
Expand All @@ -18,6 +18,7 @@ pub struct TrafficControllerMetrics {
pub num_dry_run_blocked_requests: IntCounter,
pub tally_handled: IntCounter,
pub error_tally_handled: IntCounter,
pub tally_error_types: IntCounterVec,
pub deadmans_switch_enabled: IntGauge,
pub highest_direct_spam_rate: IntGauge,
pub highest_proxied_spam_rate: IntGauge,
Expand Down Expand Up @@ -90,6 +91,13 @@ impl TrafficControllerMetrics {
registry
)
.unwrap(),
tally_error_types: register_int_counter_vec_with_registry!(
"traffic_control_tally_error_types",
"Number of tally errors, grouped by error type",
&["error_type"],
registry
)
.unwrap(),
deadmans_switch_enabled: register_int_gauge_with_registry!(
"deadmans_switch_enabled",
"If 1, the deadman's switch is enabled and all traffic control
Expand Down
28 changes: 21 additions & 7 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,13 @@ async fn run_tally_loop(
metrics
.highest_direct_spam_rate
.set(highest_direct_rate.0 as i64);
trace!("Recent highest direct spam rate: {:?}", highest_direct_rate);
debug!("Recent highest direct spam rate: {:?}", highest_direct_rate);
}
if let Some(highest_proxied_rate) = spam_policy.highest_proxied_rate() {
metrics
.highest_proxied_spam_rate
.set(highest_proxied_rate.0 as i64);
trace!(
debug!(
"Recent highest proxied spam rate: {:?}",
highest_proxied_rate
);
Expand All @@ -395,7 +395,7 @@ async fn run_tally_loop(
metrics
.highest_direct_error_rate
.set(highest_direct_rate.0 as i64);
trace!(
debug!(
"Recent highest direct error rate: {:?}",
highest_direct_rate
);
Expand All @@ -404,7 +404,7 @@ async fn run_tally_loop(
metrics
.highest_proxied_error_rate
.set(highest_proxied_rate.0 as i64);
trace!(
debug!(
"Recent highest proxied error rate: {:?}",
highest_proxied_rate
);
Expand All @@ -425,10 +425,22 @@ async fn handle_error_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
if !tally.error_weight.is_sampled() {
let Some((error_weight, error_type)) = tally.clone().error_info else {
return Ok(());
};
if !error_weight.is_sampled() {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
trace!(
"Handling error_type {:?} from client {:?}",
error_type,
tally.direct,
);
metrics
.tally_error_types
.with_label_values(&[error_type.as_str()])
.inc();
let resp = policy.handle_tally(tally);
metrics.error_tally_handled.inc();
if let Some(fw_config) = fw_config {
if fw_config.delegate_error_blocking && !mem_drainfile_present {
Expand Down Expand Up @@ -509,6 +521,7 @@ async fn handle_policy_response(
{
// Only increment the metric if the client was not already blocked
debug!("Blocking client: {:?}", client);
metrics.requests_blocked_at_protocol.inc();
metrics.connection_ip_blocklist_len.inc();
}
}
Expand All @@ -523,6 +536,7 @@ async fn handle_policy_response(
{
// Only increment the metric if the client was not already blocked
debug!("Blocking proxied client: {:?}", client);
metrics.requests_blocked_at_protocol.inc();
metrics.proxy_ip_blocklist_len.inc();
}
}
Expand Down Expand Up @@ -745,7 +759,7 @@ impl TrafficSim {
// TODO add proxy IP for testing
None,
// TODO add weight adjustments
Weight::one(),
None,
Weight::one(),
));
} else {
Expand Down
23 changes: 15 additions & 8 deletions crates/sui-core/src/traffic_controller/policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::hash::Hash;
use std::time::Duration;
use std::time::{Instant, SystemTime};
use sui_types::traffic_control::{FreqThresholdConfig, PolicyConfig, PolicyType, Weight};
use tracing::info;
use tracing::{info, trace};

const HIGHEST_RATES_CAPACITY: usize = 20;

Expand Down Expand Up @@ -222,7 +222,7 @@ impl TrafficSketch {
pub struct TrafficTally {
pub direct: Option<IpAddr>,
pub through_fullnode: Option<IpAddr>,
pub error_weight: Weight,
pub error_info: Option<(Weight, String)>,
pub spam_weight: Weight,
pub timestamp: SystemTime,
}
Expand All @@ -231,13 +231,13 @@ impl TrafficTally {
pub fn new(
direct: Option<IpAddr>,
through_fullnode: Option<IpAddr>,
error_weight: Weight,
error_info: Option<(Weight, String)>,
spam_weight: Weight,
) -> Self {
Self {
direct,
through_fullnode,
error_weight,
error_info,
spam_weight,
timestamp: SystemTime::now(),
}
Expand Down Expand Up @@ -360,7 +360,14 @@ impl FreqThresholdPolicy {
let block_client = if let Some(source) = tally.direct {
let key = SketchKey(source, ClientType::Direct);
self.sketch.increment_count(&key);
if self.sketch.get_request_rate(&key) >= self.client_threshold as f64 {
let req_rate = self.sketch.get_request_rate(&key);
trace!(
"FreqThresholdPolicy handling tally -- req_rate: {:?}, client_threshold: {:?}, client: {:?}",
req_rate,
self.client_threshold,
source,
);
if req_rate >= self.client_threshold as f64 {
Some(source)
} else {
None
Expand Down Expand Up @@ -515,21 +522,21 @@ mod tests {
let alice = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
error_weight: Weight::zero(),
error_info: None,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let bob = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))),
error_weight: Weight::zero(),
error_info: None,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let charlie = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))),
error_weight: Weight::zero(),
error_info: None,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
Expand Down
34 changes: 19 additions & 15 deletions crates/sui-e2e-tests/tests/traffic_control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use sui_network::default_mysten_network_config;
use sui_swarm_config::network_config_builder::ConfigBuilder;
use sui_test_transaction_builder::batch_make_transfer_transactions;
use sui_types::{
crypto::Ed25519SuiSignature,
quorum_driver_types::ExecuteTransactionRequestType,
signature::GenericSignature,
traffic_control::{
FreqThresholdConfig, PolicyConfig, PolicyType, RemoteFirewallConfig, Weight,
},
Expand Down Expand Up @@ -225,7 +227,7 @@ async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Er
.with_policy_config(Some(policy_config))
.build();
let committee = network_config.committee_with_network();
let _test_cluster = TestClusterBuilder::new()
let test_cluster = TestClusterBuilder::new()
.set_network_config(network_config)
.build()
.await;
Expand All @@ -235,12 +237,13 @@ async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Er
);
let (_, auth_client) = local_clients.first_key_value().unwrap();

// transaction signed using user wallet from a different chain/genesis,
// therefore we should fail with UserInputError
let other_cluster = TestClusterBuilder::new().build().await;

let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await;
let tx = txns.swap_remove(0);
let mut txns = batch_make_transfer_transactions(&test_cluster.wallet, n as usize).await;
let mut tx = txns.swap_remove(0);
let signatures = tx.tx_signatures_mut_for_testing();
signatures.pop();
signatures.push(GenericSignature::Signature(
sui_types::crypto::Signature::Ed25519SuiSignature(Ed25519SuiSignature::default()),
));

// it should take no more than 4 requests to be added to the blocklist
for _ in 0..n {
Expand All @@ -251,7 +254,7 @@ async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Er
}
}
}
panic!("Expected spam policy to trigger within {n} requests");
panic!("Expected error policy to trigger within {n} requests");
}

#[tokio::test]
Expand Down Expand Up @@ -406,7 +409,7 @@ async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::
.with_firewall_config(Some(firewall_config))
.build();
let committee = network_config.committee_with_network();
let _test_cluster = TestClusterBuilder::new()
let test_cluster = TestClusterBuilder::new()
.set_network_config(network_config)
.build()
.await;
Expand All @@ -416,12 +419,13 @@ async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::
);
let (_, auth_client) = local_clients.first_key_value().unwrap();

// transaction signed using user wallet from a different chain/genesis,
// therefore we should fail with UserInputError
let other_cluster = TestClusterBuilder::new().build().await;

let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await;
let tx = txns.swap_remove(0);
let mut txns = batch_make_transfer_transactions(&test_cluster.wallet, n as usize).await;
let mut tx = txns.swap_remove(0);
let signatures = tx.tx_signatures_mut_for_testing();
signatures.pop();
signatures.push(GenericSignature::Signature(
sui_types::crypto::Signature::Ed25519SuiSignature(Ed25519SuiSignature::default()),
));

// start test firewall server
let mut server = NodeFwTestServer::new();
Expand Down
6 changes: 5 additions & 1 deletion crates/sui-json-rpc/src/axum_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,11 @@ fn handle_traffic_resp(
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
error_info: error.map(|e| {
let error_type = e.to_string();
let error_weight = normalize(e);
(error_weight, error_type)
}),
// For now, count everything as spam with equal weight
// on the rpc node side, including gas-charging endpoints
// such as `sui_executeTransactionBlock`, as this can enable
Expand Down
Loading