diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 1e1ea89d1..20fc13dc4 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "d9bc8b02db59266969caa52b402af82e441437a6119cc3bcb932eae6acb3fc52", + "checksum": "abee9a1e183890980b7417228d49c17fa7e3379b15e022a305a18ccf1a343194", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -10447,6 +10447,10 @@ "id": "serde_json 1.0.128", "target": "serde_json" }, + { + "id": "serde_yaml 0.9.34+deprecated", + "target": "serde_yaml" + }, { "id": "sha2 0.10.8", "target": "sha2" diff --git a/Cargo.lock b/Cargo.lock index 72ab4f089..457fec6d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2080,6 +2080,7 @@ dependencies = [ "self_update", "serde", "serde_json", + "serde_yaml", "sha2 0.10.8", "shlex", "spinners", diff --git a/cordoned_features.yaml b/cordoned_features.yaml new file mode 100644 index 000000000..410f35422 --- /dev/null +++ b/cordoned_features.yaml @@ -0,0 +1,8 @@ +# This file is used for cordoining ceratin node features +# which are used while managing subnets. +# The full list of features can be found here: +# https://github.com/dfinity/dre/blob/main/rs/ic-management-types/src/lib.rs#L317-L324 +features: + # Example entry + # - feature: data_center + # value: mu1 diff --git a/rs/cli/Cargo.toml b/rs/cli/Cargo.toml index 8bad18b74..32a38a313 100644 --- a/rs/cli/Cargo.toml +++ b/rs/cli/Cargo.toml @@ -77,6 +77,7 @@ comfy-table = { workspace = true } human_bytes = { workspace = true } mockall.workspace = true clio = { workspace = true } +serde_yaml.workspace = true [dev-dependencies] actix-rt = { workspace = true } diff --git a/rs/cli/src/commands/mod.rs b/rs/cli/src/commands/mod.rs index 06f54b18f..2abba36de 100644 --- a/rs/cli/src/commands/mod.rs +++ b/rs/cli/src/commands/mod.rs @@ -201,6 +201,10 @@ The argument is mandatory for testnets, and is optional for mainnet and staging" /// Link to the related forum post, where proposal details can be discussed #[clap(long, global = true, visible_aliases = &["forum-link", "forum"])] pub forum_post_link: Option, + + /// Path to file which contains cordoned features + #[clap(long, global = true, visible_aliases = &["cf-file", "cfff"])] + pub cordon_feature_fallback_file: Option, } // Do not use outside of DRE CLI. diff --git a/rs/cli/src/commands/subnet/replace.rs b/rs/cli/src/commands/subnet/replace.rs index 563fd7814..d3a2e1d39 100644 --- a/rs/cli/src/commands/subnet/replace.rs +++ b/rs/cli/src/commands/subnet/replace.rs @@ -52,7 +52,7 @@ impl ExecutableCommand for Replace { _ => SubnetTarget::FromNodesIds(self.nodes.clone()), }; - let subnet_manager = ctx.subnet_manager().await; + let subnet_manager = ctx.subnet_manager().await?; let subnet_change_response = subnet_manager .with_target(subnet_target) .membership_replace( diff --git a/rs/cli/src/commands/subnet/resize.rs b/rs/cli/src/commands/subnet/resize.rs index 3b4fae4a2..38c878058 100644 --- a/rs/cli/src/commands/subnet/resize.rs +++ b/rs/cli/src/commands/subnet/resize.rs @@ -44,7 +44,7 @@ impl ExecutableCommand for Resize { async fn execute(&self, ctx: crate::ctx::DreContext) -> anyhow::Result<()> { let runner = ctx.runner().await?; - let subnet_manager = ctx.subnet_manager().await; + let subnet_manager = ctx.subnet_manager().await?; let subnet_change_response = subnet_manager .subnet_resize( diff --git a/rs/cli/src/cordoned_feature_fetcher.rs b/rs/cli/src/cordoned_feature_fetcher.rs new file mode 100644 index 000000000..7f38fc5d6 --- /dev/null +++ b/rs/cli/src/cordoned_feature_fetcher.rs @@ -0,0 +1,174 @@ +use std::{path::PathBuf, time::Duration}; + +use decentralization::network::NodeFeaturePair; +use futures::future::BoxFuture; +use ic_management_types::NodeFeature; +use itertools::Itertools; +use log::warn; +use mockall::automock; +use reqwest::{Client, ClientBuilder}; +use strum::VariantNames; + +#[automock] +pub trait CordonedFeatureFetcher: Sync + Send { + fn fetch(&self) -> BoxFuture<'_, anyhow::Result>>; +} + +pub struct CordonedFeatureFetcherImpl { + client: Client, + fallback_file: Option, + offline: bool, +} + +const CORDONED_FEATURES_FILE_URL: &str = "https://raw.githubusercontent.com/dfinity/dre/refs/heads/main/cordoned_features.yaml"; + +impl CordonedFeatureFetcherImpl { + pub fn new(offline: bool, fallback_file: Option) -> anyhow::Result { + let client = ClientBuilder::new().timeout(Duration::from_secs(10)).build()?; + + Ok(Self { + client, + fallback_file, + offline, + }) + } + + async fn fetch_from_git(&self) -> anyhow::Result> { + let bytes = self + .client + .get(CORDONED_FEATURES_FILE_URL) + .send() + .await? + .error_for_status()? + .bytes() + .await?; + + self.parse(&bytes) + } + + fn fetch_from_file(&self) -> anyhow::Result> { + let contents = std::fs::read(self.fallback_file.as_ref().unwrap())?; + + self.parse(&contents) + } + + // Write tests for this + fn parse(&self, contents: &[u8]) -> anyhow::Result> { + let valid_yaml = serde_yaml::from_slice::(contents)?; + + let features = match valid_yaml.get("features") { + Some(serde_yaml::Value::Sequence(features)) => features.clone(), + Some(serde_yaml::Value::Null) => vec![], + n => anyhow::bail!( + "Failed to parse contents. Expected to have top-level key `features` with an array of node features. Got: \n{:?}", + n + ), + }; + + let mut valid_features = vec![]; + for feature in features { + valid_features.push(NodeFeaturePair { + feature: feature + .get("feature") + .map(|value| { + serde_yaml::from_value(value.clone()).map_err(|_| { + anyhow::anyhow!( + "Failed to parse feature `{}`. Expected one of: [{}]", + serde_yaml::to_string(value).unwrap(), + NodeFeature::VARIANTS.iter().join(",") + ) + }) + }) + .ok_or(anyhow::anyhow!("Expected `feature` key to be present. Got: \n{:?}", feature))??, + value: feature + .get("value") + .map(|value| { + value + .as_str() + .ok_or(anyhow::anyhow!( + "Failed to parse value `{}`. Expected string", + serde_yaml::to_string(value).unwrap(), + )) + .map(|s| s.to_string()) + }) + .ok_or(anyhow::anyhow!("Expected `value` key to be present. Got: \n{:?}", feature))??, + }); + } + + Ok(valid_features) + } +} + +impl CordonedFeatureFetcher for CordonedFeatureFetcherImpl { + fn fetch(&self) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async { + match (self.offline, self.fallback_file.is_some()) { + (true, true) => self.fetch_from_file(), + (true, false) => Err(anyhow::anyhow!("Cannot fetch cordoned features offline without a fallback file")), + (false, true) => match self.fetch_from_git().await { + Ok(from_git) => Ok(from_git), + Err(e_from_git) => { + warn!("Failed to fetch cordoned features from git: {:?}", e_from_git); + warn!("Falling back to fetching from file"); + match self.fetch_from_file() { + Ok(from_file) => Ok(from_file), + Err(e_from_file) => Err(anyhow::anyhow!( + "Failed to fetch cordoned features both from file and from git.\nError from git: {:?}\nError from file: {:?}", + e_from_git, + e_from_file + )), + } + } + }, + (false, false) => self.fetch_from_git().await, + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn valid_parsing() { + let contents = br#" +features: + - feature: data_center + value: mu1 + - feature: node_provider + value: some-np + - feature: data_center_owner + value: some-dco + - feature: city + value: some-city + - feature: city + value: another-city + - feature: country + value: some-country + - feature: continent + value: some-continent"#; + + let fetcher = CordonedFeatureFetcherImpl::new(true, None).unwrap(); + + let maybe_parsed = fetcher.parse(contents); + assert!(maybe_parsed.is_ok()); + let parsed = maybe_parsed.unwrap(); + + assert_eq!(parsed.len(), 7) + } + + #[test] + fn valid_empty_file() { + let contents = br#" +features:"#; + + let fetcher = CordonedFeatureFetcherImpl::new(true, None).unwrap(); + + let maybe_parsed = fetcher.parse(contents); + assert!(maybe_parsed.is_ok()); + let parsed = maybe_parsed.unwrap(); + + assert_eq!(parsed.len(), 0) + } +} diff --git a/rs/cli/src/ctx.rs b/rs/cli/src/ctx.rs index 9fd50df16..dc6dba4aa 100644 --- a/rs/cli/src/ctx.rs +++ b/rs/cli/src/ctx.rs @@ -7,6 +7,7 @@ use std::{ use ic_canisters::{governance::governance_canister_version, IcAgentCanisterClient}; use ic_management_backend::{ + health::{self, HealthStatusQuerier}, lazy_git::LazyGit, lazy_registry::{LazyRegistry, LazyRegistryImpl}, proposal::{ProposalAgent, ProposalAgentImpl}, @@ -21,6 +22,7 @@ use crate::{ artifact_downloader::{ArtifactDownloader, ArtifactDownloaderImpl}, auth::Neuron, commands::{Args, AuthOpts, AuthRequirement, ExecutableCommand, IcAdminVersion}, + cordoned_feature_fetcher::{CordonedFeatureFetcher, CordonedFeatureFetcherImpl}, ic_admin::{download_ic_admin, should_update_ic_admin, IcAdmin, IcAdminImpl, FALLBACK_IC_ADMIN_VERSION}, runner::Runner, subnet_manager::SubnetManager, @@ -42,8 +44,11 @@ pub struct DreContext { neuron: Neuron, proceed_without_confirmation: bool, version: IcAdminVersion, + cordoned_features_fetcher: Arc, + health_client: Arc, } +#[allow(clippy::too_many_arguments)] impl DreContext { pub async fn new( network: String, @@ -57,6 +62,8 @@ impl DreContext { auth_requirement: AuthRequirement, forum_post_link: Option, ic_admin_version: IcAdminVersion, + cordoned_features_fetcher: Arc, + health_client: Arc, ) -> anyhow::Result { let network = match offline { false => ic_management_types::Network::new(network.clone(), &nns_urls) @@ -93,6 +100,8 @@ impl DreContext { neuron, proceed_without_confirmation: yes, version: ic_admin_version, + cordoned_features_fetcher, + health_client, }) } @@ -109,6 +118,10 @@ impl DreContext { args.subcommands.require_auth(), args.forum_post_link.clone(), args.ic_admin_version.clone(), + Arc::new(CordonedFeatureFetcherImpl::new(args.offline, args.cordon_feature_fallback_file.clone())?) as Arc, + Arc::new(health::HealthClient::new( + ic_management_types::Network::new(args.network.clone(), &args.nns_urls).await?, + )), ) .await } @@ -212,10 +225,14 @@ impl DreContext { )) } - pub async fn subnet_manager(&self) -> SubnetManager { + pub async fn subnet_manager(&self) -> anyhow::Result { let registry = self.registry().await; - SubnetManager::new(registry, self.network().clone()) + Ok(SubnetManager::new( + registry, + self.cordoned_features_fetcher.clone(), + self.health_client.clone(), + )) } pub fn proposals_agent(&self) -> Arc { @@ -235,6 +252,8 @@ impl DreContext { self.verbose_runner, self.ic_repo.clone(), self.artifact_downloader.clone(), + self.cordoned_features_fetcher.clone(), + self.health_client.clone(), )); *self.runner.borrow_mut() = Some(runner.clone()); Ok(runner) @@ -250,10 +269,10 @@ impl DreContext { pub mod tests { use std::{cell::RefCell, sync::Arc}; - use ic_management_backend::{lazy_git::LazyGit, lazy_registry::LazyRegistry, proposal::ProposalAgent}; + use ic_management_backend::{health::HealthStatusQuerier, lazy_git::LazyGit, lazy_registry::LazyRegistry, proposal::ProposalAgent}; use ic_management_types::Network; - use crate::{artifact_downloader::ArtifactDownloader, auth::Neuron, ic_admin::IcAdmin}; + use crate::{artifact_downloader::ArtifactDownloader, auth::Neuron, cordoned_feature_fetcher::CordonedFeatureFetcher, ic_admin::IcAdmin}; use super::DreContext; @@ -265,6 +284,8 @@ pub mod tests { git: Arc, proposal_agent: Arc, artifact_downloader: Arc, + cordoned_features_fetcher: Arc, + health_client: Arc, ) -> DreContext { DreContext { network, @@ -281,6 +302,8 @@ pub mod tests { neuron, proceed_without_confirmation: true, version: crate::commands::IcAdminVersion::Strict("Shouldn't reach this because of mock".to_string()), + cordoned_features_fetcher, + health_client, } } } diff --git a/rs/cli/src/lib.rs b/rs/cli/src/lib.rs index f9e5a9735..0c227d26d 100644 --- a/rs/cli/src/lib.rs +++ b/rs/cli/src/lib.rs @@ -2,6 +2,7 @@ pub mod artifact_downloader; pub mod auth; pub mod commands; +mod cordoned_feature_fetcher; pub mod ctx; mod desktop_notify; pub mod ic_admin; diff --git a/rs/cli/src/main.rs b/rs/cli/src/main.rs index e842f8bcf..56b033e7d 100644 --- a/rs/cli/src/main.rs +++ b/rs/cli/src/main.rs @@ -10,6 +10,7 @@ use log::{info, warn}; mod artifact_downloader; mod auth; mod commands; +mod cordoned_feature_fetcher; mod ctx; mod desktop_notify; mod ic_admin; diff --git a/rs/cli/src/operations/hostos_rollout.rs b/rs/cli/src/operations/hostos_rollout.rs index 88001f4a4..be31a8a1c 100644 --- a/rs/cli/src/operations/hostos_rollout.rs +++ b/rs/cli/src/operations/hostos_rollout.rs @@ -2,9 +2,9 @@ use anyhow::anyhow; use async_recursion::async_recursion; use futures_util::future::try_join; use ic_base_types::{NodeId, PrincipalId}; -use ic_management_backend::health::{self, HealthStatusQuerier}; +use ic_management_backend::health::HealthStatusQuerier; use ic_management_backend::proposal::ProposalAgent; -use ic_management_types::{HealthStatus, Network, Node, Subnet, UpdateNodesHostosVersionsProposal}; +use ic_management_types::{HealthStatus, Node, Subnet, UpdateNodesHostosVersionsProposal}; use indexmap::IndexMap; use log::{debug, info, warn}; use std::sync::Arc; @@ -132,21 +132,21 @@ pub struct HostosRollout { nodes_all: Vec, pub grouped_nodes: IndexMap>, pub subnets: Arc>, - pub network: Network, pub proposal_agent: Arc, pub only_filter: Vec, pub exclude_filter: Vec, pub version: String, + health_client: Arc, } impl HostosRollout { pub fn new( nodes: Arc>, subnets: Arc>, - network: &Network, proposal_agent: Arc, rollout_version: &str, only_filter: &[String], exclude_filter: &[String], + health_client: Arc, ) -> Self { let grouped_nodes: IndexMap> = nodes .values() @@ -179,11 +179,11 @@ impl HostosRollout { nodes_all: nodes.values().cloned().collect(), grouped_nodes, subnets, - network: network.clone(), proposal_agent, only_filter: only_filter.to_vec(), exclude_filter: exclude_filter.to_vec(), version: rollout_version.to_string(), + health_client, } } @@ -490,7 +490,7 @@ impl HostosRollout { /// Execute the host-os rollout operation, on the provided group of nodes. pub async fn execute(&self, update_group: NodeGroupUpdate) -> anyhow::Result { let (nodes_health, nodes_with_open_proposals) = try_join( - health::HealthClient::new(self.network.clone()).nodes(), + self.health_client.nodes(), self.proposal_agent.list_open_update_nodes_hostos_versions_proposals(), ) .await?; @@ -534,6 +534,7 @@ impl HostosRollout { pub mod test { use crate::operations::hostos_rollout::NodeAssignment::{Assigned, Unassigned}; use crate::operations::hostos_rollout::NodeOwner::{Dfinity, Others}; + use ic_management_backend::health::MockHealthStatusQuerier; use ic_management_backend::proposal::ProposalAgentImpl; use ic_management_types::{Network, Node, Operator, Provider, Subnet}; use std::collections::BTreeMap; @@ -586,11 +587,11 @@ pub mod test { let hostos_rollout = HostosRollout::new( Arc::new(union.clone()), Arc::new(subnet.clone()), - &network, Arc::new(ProposalAgentImpl::new(nns_urls)) as Arc, version_one.clone().as_str(), &[], &[], + Arc::new(MockHealthStatusQuerier::new()), ); let results = hostos_rollout @@ -625,11 +626,11 @@ pub mod test { let hostos_rollout = HostosRollout::new( Arc::new(union.clone()), Arc::new(subnet.clone()), - &network, Arc::new(ProposalAgentImpl::new(nns_urls)) as Arc, version_one.clone().as_str(), &[], &nodes_to_exclude, + Arc::new(MockHealthStatusQuerier::new()), ); let results = hostos_rollout @@ -653,11 +654,11 @@ pub mod test { let hostos_rollout = HostosRollout::new( Arc::new(union.clone()), Arc::new(subnet.clone()), - &network, Arc::new(ProposalAgentImpl::new(nns_urls)) as Arc, version_two.clone().as_str(), &[], &[], + Arc::new(MockHealthStatusQuerier::new()), ); let results = hostos_rollout diff --git a/rs/cli/src/runner.rs b/rs/cli/src/runner.rs index 92ce78cf1..be8b01be8 100644 --- a/rs/cli/src/runner.rs +++ b/rs/cli/src/runner.rs @@ -10,7 +10,6 @@ use decentralization::subnets::NodesRemover; use decentralization::SubnetChangeResponse; use futures::TryFutureExt; use futures_util::future::try_join; -use ic_management_backend::health; use ic_management_backend::health::HealthStatusQuerier; use ic_management_backend::lazy_git::LazyGit; use ic_management_backend::lazy_git::LazyGitImpl; @@ -37,6 +36,7 @@ use tabled::builder::Builder; use tabled::settings::Style; use crate::artifact_downloader::ArtifactDownloader; +use crate::cordoned_feature_fetcher::CordonedFeatureFetcher; use crate::ic_admin::{self, IcAdmin}; use crate::ic_admin::{ProposeCommand, ProposeOptions}; use crate::operations::hostos_rollout::HostosRollout; @@ -51,6 +51,8 @@ pub struct Runner { proposal_agent: Arc, verbose: bool, artifact_downloader: Arc, + cordoned_features_fetcher: Arc, + health_client: Arc, } impl Runner { @@ -62,6 +64,8 @@ impl Runner { verbose: bool, ic_repo: RefCell>>, artifact_downloader: Arc, + cordoned_features_fetcher: Arc, + health_client: Arc, ) -> Self { Self { ic_admin, @@ -71,6 +75,8 @@ impl Runner { proposal_agent: agent, verbose, artifact_downloader, + cordoned_features_fetcher, + health_client, } } @@ -120,8 +126,7 @@ impl Runner { } pub async fn health_of_nodes(&self) -> anyhow::Result> { - let health_client = health::HealthClient::new(self.network.clone()); - health_client.nodes().await + self.health_client.nodes().await } pub async fn subnet_create( @@ -149,6 +154,7 @@ impl Runner { request.exclude.clone().unwrap_or_default(), request.only.clone().unwrap_or_default(), &health_of_nodes, + self.cordoned_features_fetcher.fetch().await?, ) .await?; let subnet_creation_data = SubnetChangeResponse::from(&subnet_creation_data).with_health_of_nodes(health_of_nodes.clone()); @@ -346,11 +352,11 @@ impl Runner { let hostos_rollout = HostosRollout::new( self.registry.nodes().await?, self.registry.subnets().await?, - &self.network, self.proposal_agent.clone(), version, only, exclude, + self.health_client.clone(), ); match hostos_rollout.execute(node_group).await? { @@ -455,8 +461,7 @@ impl Runner { } pub async fn remove_nodes(&self, nodes_remover: NodesRemover) -> anyhow::Result<()> { - let health_client = health::HealthClient::new(self.network.clone()); - let (healths, nodes_with_proposals) = try_join(health_client.nodes(), self.registry.nodes_with_proposals()).await?; + let (healths, nodes_with_proposals) = try_join(self.health_client.nodes(), self.registry.nodes_with_proposals()).await?; let (mut node_removals, motivation) = nodes_remover.remove_nodes(healths, nodes_with_proposals); node_removals.sort_by_key(|nr| nr.reason.message()); @@ -508,7 +513,6 @@ impl Runner { } pub async fn network_heal(&self, forum_post_link: Option) -> anyhow::Result<()> { - let health_client = health::HealthClient::new(self.network.clone()); let mut errors = vec![]; // Get the list of subnets, and the list of open proposal for each subnet, if any @@ -525,10 +529,10 @@ impl Runner { .map(|(id, subnet)| (*id, subnet.clone())) .collect::>(); let (available_nodes, health_of_nodes) = - try_join(self.registry.available_nodes().map_err(anyhow::Error::from), health_client.nodes()).await?; + try_join(self.registry.available_nodes().map_err(anyhow::Error::from), self.health_client.nodes()).await?; let subnets_change_response = NetworkHealRequest::new(subnets_without_proposals) - .heal_and_optimize(available_nodes, &health_of_nodes) + .heal_and_optimize(available_nodes, &health_of_nodes, self.cordoned_features_fetcher.fetch().await?) .await?; for change in &subnets_change_response { @@ -608,7 +612,8 @@ impl Runner { let health_of_nodes = self.health_of_nodes().await?; - let change = SubnetChangeResponse::from(&change_request.rescue(&health_of_nodes)?).with_health_of_nodes(health_of_nodes); + let change = SubnetChangeResponse::from(&change_request.rescue(&health_of_nodes, self.cordoned_features_fetcher.fetch().await?)?) + .with_health_of_nodes(health_of_nodes); if change.added_with_desc.is_empty() && change.removed_with_desc.is_empty() { return Ok(()); diff --git a/rs/cli/src/subnet_manager.rs b/rs/cli/src/subnet_manager.rs index 1f49f2438..eecfb1b65 100644 --- a/rs/cli/src/subnet_manager.rs +++ b/rs/cli/src/subnet_manager.rs @@ -8,14 +8,15 @@ use decentralization::{ network::{DecentralizedSubnet, Node as DecentralizedNode, SubnetQueryBy}, SubnetChangeResponse, }; -use ic_management_backend::health::{self, HealthStatusQuerier}; +use ic_management_backend::health::HealthStatusQuerier; use ic_management_backend::lazy_registry::LazyRegistry; use ic_management_types::HealthStatus; -use ic_management_types::Network; use ic_types::PrincipalId; use indexmap::IndexMap; use log::{info, warn}; +use crate::cordoned_feature_fetcher::CordonedFeatureFetcher; + #[derive(Clone)] pub enum SubnetTarget { FromId(PrincipalId), @@ -39,15 +40,21 @@ impl fmt::Display for SubnetManagerError { pub struct SubnetManager { subnet_target: Option, registry_instance: Arc, - network: Network, + cordoned_features_fetcher: Arc, + health_client: Arc, } impl SubnetManager { - pub fn new(registry_instance: Arc, network: Network) -> Self { + pub fn new( + registry_instance: Arc, + cordoned_features_fetcher: Arc, + health_client: Arc, + ) -> Self { Self { subnet_target: None, registry_instance, - network, + cordoned_features_fetcher, + health_client, } } @@ -65,8 +72,7 @@ impl SubnetManager { } async fn unhealthy_nodes(&self, subnet: DecentralizedSubnet) -> anyhow::Result> { - let health_client = health::HealthClient::new(self.network.clone()); - let subnet_health = health_client.subnet(subnet.id).await?; + let subnet_health = self.health_client.subnet(subnet.id).await?; let unhealthy = subnet .nodes @@ -153,10 +159,14 @@ impl SubnetManager { to_be_replaced.extend(subnet_unhealthy_without_included); } - let health_client = health::HealthClient::new(self.network.clone()); - let health_of_nodes = health_client.nodes().await?; + let health_of_nodes = self.health_client.nodes().await?; - let change = subnet_change_request.optimize(optimize.unwrap_or(0), &to_be_replaced, &health_of_nodes)?; + let change = subnet_change_request.optimize( + optimize.unwrap_or(0), + &to_be_replaced, + &health_of_nodes, + self.cordoned_features_fetcher.fetch().await?, + )?; for (n, _) in change.removed().iter().filter(|(n, _)| !node_ids_unhealthy.contains(&n.id)) { motivations.push(format!( @@ -196,7 +206,13 @@ impl SubnetManager { .excluding_from_available(request.exclude.clone().unwrap_or_default()) .including_from_available(request.only.clone().unwrap_or_default()) .including_from_available(request.include.clone().unwrap_or_default()) - .resize(request.add, request.remove, 0, health_of_nodes)?; + .resize( + request.add, + request.remove, + 0, + health_of_nodes, + self.cordoned_features_fetcher.fetch().await?, + )?; for (n, _) in change.removed().iter() { motivations.push(format!("removing {} as per user request", n.id)); diff --git a/rs/cli/src/unit_tests/ctx_init.rs b/rs/cli/src/unit_tests/ctx_init.rs index 35ce57bc6..590c58e72 100644 --- a/rs/cli/src/unit_tests/ctx_init.rs +++ b/rs/cli/src/unit_tests/ctx_init.rs @@ -1,11 +1,13 @@ -use std::{path::PathBuf, str::FromStr}; +use std::{path::PathBuf, str::FromStr, sync::Arc}; use crate::{ auth::{Auth, Neuron, STAGING_KEY_PATH_FROM_HOME, STAGING_NEURON_ID}, commands::{AuthOpts, AuthRequirement, HsmOpts}, + cordoned_feature_fetcher::MockCordonedFeatureFetcher, }; use clio::{ClioPath, InputPath}; use ic_canisters::governance::governance_canister_version; +use ic_management_backend::health::MockHealthStatusQuerier; use ic_management_types::Network; use itertools::Itertools; @@ -45,6 +47,8 @@ async fn get_context(network: &Network, version: IcAdminVersion) -> anyhow::Resu crate::commands::AuthRequirement::Anonymous, None, version, + Arc::new(MockCordonedFeatureFetcher::new()), + Arc::new(MockHealthStatusQuerier::new()), ) .await } @@ -180,6 +184,8 @@ async fn get_ctx_for_neuron_test( requirement, None, IcAdminVersion::Strict("Shouldn't get to here".to_string()), + Arc::new(MockCordonedFeatureFetcher::new()), + Arc::new(MockHealthStatusQuerier::new()), ) .await } diff --git a/rs/cli/src/unit_tests/mod.rs b/rs/cli/src/unit_tests/mod.rs index 754210a4a..f4898bcec 100644 --- a/rs/cli/src/unit_tests/mod.rs +++ b/rs/cli/src/unit_tests/mod.rs @@ -1,3 +1,4 @@ mod ctx_init; +mod replace; mod update_unassigned_nodes; mod version; diff --git a/rs/cli/src/unit_tests/replace.rs b/rs/cli/src/unit_tests/replace.rs new file mode 100644 index 000000000..4b8c5e92c --- /dev/null +++ b/rs/cli/src/unit_tests/replace.rs @@ -0,0 +1,386 @@ +use std::sync::Arc; + +use decentralization::{ + nakamoto::NodeFeatures, + network::{DecentralizedSubnet, Node, NodeFeaturePair}, + SubnetChangeResponse, +}; +use ic_management_backend::{health::MockHealthStatusQuerier, lazy_registry::MockLazyRegistry}; +use ic_management_types::NodeFeature; +use ic_types::PrincipalId; +use indexmap::{Equivalent, IndexMap}; +use itertools::Itertools; + +use crate::{cordoned_feature_fetcher::MockCordonedFeatureFetcher, subnet_manager::SubnetManager}; + +fn node_principal(id: u64) -> PrincipalId { + PrincipalId::new_node_test_id(id) +} + +fn user_principal(id: u64) -> String { + PrincipalId::new_user_test_id(id).to_string() +} + +fn node(id: u64, dfinity_owned: bool, features: &[(NodeFeature, &str)]) -> Node { + Node { + id: node_principal(id), + dfinity_owned, + features: NodeFeatures { + feature_map: { + let mut map = IndexMap::new(); + + features.iter().for_each(|(feature, value)| { + map.insert(feature.clone(), value.to_string()); + }); + + // Insert mandatory features + for feature in &[ + NodeFeature::NodeProvider, + NodeFeature::DataCenter, + NodeFeature::DataCenterOwner, + NodeFeature::Country, + ] { + if !map.contains_key(feature) { + map.insert(feature.clone(), "Some value".to_string()); + } + } + + map + }, + }, + } +} + +fn subnet(id: u64, nodes: &[Node]) -> DecentralizedSubnet { + DecentralizedSubnet { + id: PrincipalId::new_subnet_test_id(id), + nodes: nodes.to_vec(), + added_nodes_desc: vec![], + removed_nodes_desc: vec![], + comment: None, + run_log: vec![], + } +} + +fn cordoned_feature(feature: NodeFeature, value: &str) -> NodeFeaturePair { + NodeFeaturePair { + feature, + value: value.to_string(), + } +} + +fn test_pretty_format_response(response: &Result) -> String { + match response { + Ok(r) => format!( + r#"Response was OK! + Added nodes: +{}, + Removed nodes: +{}, + Feature diff: +{} + "#, + r.added_with_desc + .iter() + .map(|(id, desc)| format!("\t\t- principal: {}\n\t\t desc: {}", id, desc)) + .join("\n"), + r.removed_with_desc + .iter() + .map(|(id, desc)| format!("\t\t- principal: {}\n\t\t desc: {}", id, desc)) + .join("\n"), + r.feature_diff + .iter() + .map(|(feature, diff)| format!( + "\t\t- feature: {}\n{}", + feature, + diff.iter() + .map(|(value, (in_nodes, out_nodes))| format!("\t\t\t- value: {}, In: {}, Out: {}", value, in_nodes, out_nodes)) + .join("\n") + )) + .join("\n") + ), + Err(r) => format!("Response was ERR: {}", r), + } +} + +fn pretty_print_node(node: &Node, num_ident: usize) -> String { + format!( + "{}- principal: {}\n{} dfinity_owned: {}\n{} features: [{}]", + "\t".repeat(num_ident), + node.id, + "\t".repeat(num_ident), + node.dfinity_owned, + "\t".repeat(num_ident), + node.features + .feature_map + .iter() + .map(|(feature, value)| format!("({}, {})", feature, value)) + .join(", ") + ) +} + +fn pretty_print_world(available_nodes: &[Node], subnet: &DecentralizedSubnet) -> String { + format!( + r#"Available nodes: +{} +Observed subnet: + - id: {} + nodes: +{}"#, + available_nodes.iter().map(|node| pretty_print_node(node, 1)).join("\n"), + subnet.id, + subnet.nodes.iter().map(|node| pretty_print_node(node, 2)).join("\n") + ) +} + +#[test] +fn should_skip_cordoned_nodes() { + // World setup + let available_nodes = vec![ + node( + 1, + true, + &[(NodeFeature::DataCenter, "DC 1"), (NodeFeature::DataCenterOwner, &user_principal(1))], + ), + node( + 2, + true, + &[(NodeFeature::DataCenter, "DC 2"), (NodeFeature::DataCenterOwner, &user_principal(1))], + ), + node( + 3, + true, + &[(NodeFeature::DataCenter, "DC 3"), (NodeFeature::DataCenterOwner, &user_principal(2))], + ), + node( + 4, + true, + &[(NodeFeature::DataCenter, "DC 4"), (NodeFeature::DataCenterOwner, &user_principal(2))], + ), + ]; + + let subnet = subnet( + 1, + &[ + node( + 5, + true, + &[(NodeFeature::DataCenter, "DC 1"), (NodeFeature::DataCenterOwner, &user_principal(1))], + ), + node( + 6, + true, + &[(NodeFeature::DataCenter, "DC 2"), (NodeFeature::DataCenterOwner, &user_principal(2))], + ), + ], + ); + + // Services setup + let runtime = tokio::runtime::Runtime::new().unwrap(); + let mut registry = MockLazyRegistry::new(); + let available_nodes_clone = available_nodes.clone(); + registry.expect_available_nodes().returning(move || { + Box::pin({ + let local_clone = available_nodes_clone.clone(); + async move { Ok(local_clone) } + }) + }); + let subnet_clone = subnet.clone(); + registry.expect_get_nodes().returning(move |_| { + Box::pin({ + let nodes = subnet_clone.nodes.clone(); + async { Ok(nodes) } + }) + }); + let subnet_clone = subnet.clone(); + registry.expect_subnet().returning(move |_| { + Box::pin({ + let local_clone = subnet_clone.clone(); + async { Ok(local_clone) } + }) + }); + + let mut health_client = MockHealthStatusQuerier::new(); + // All nodes in the world are healthy for this test + let nodes_health = available_nodes + .iter() + .map(|n| n.id) + .chain(subnet.nodes.iter().map(|n| n.id)) + .map(|node_id| (node_id, ic_management_types::HealthStatus::Healthy)) + .collect::>(); + health_client.expect_nodes().returning(move || { + Box::pin({ + let local_nodes_healht = nodes_health.clone(); + async move { Ok(local_nodes_healht) } + }) + }); + + // Scenarios + let scenarios = vec![ + ( + // No available nodes contain cordoned features. + // All of them should be suitable for replacements. + vec![ + cordoned_feature(NodeFeature::DataCenter, "Random new dc"), + cordoned_feature(NodeFeature::DataCenterOwner, &user_principal(42)), + ], + true, + ), + ( + // First two nodes from available pool must not + // be selected for replacement. Also node 5 could + // be replaced if it increases decentralization. + vec![cordoned_feature(NodeFeature::DataCenterOwner, &user_principal(1))], + true, + ), + ( + // Second and third nodes from available pool must + // not be selected for replacement. Also node with + // id 6 could be replaced if it increases decentralization + vec![ + cordoned_feature(NodeFeature::DataCenter, "DC 2"), + cordoned_feature(NodeFeature::DataCenter, "DC 3"), + ], + true, + ), + ( + // All available nodes are unavailable + vec![ + cordoned_feature(NodeFeature::DataCenterOwner, &user_principal(1)), + cordoned_feature(NodeFeature::DataCenterOwner, &user_principal(2)), + ], + false, + ), + ]; + + let mut failed_scenarios = vec![]; + + let registry = Arc::new(registry); + let health_client = Arc::new(health_client); + for (cordoned_features, should_succeed) in scenarios { + let cordoned_features_clone = cordoned_features.clone(); + let mut cordoned_feature_fetcher = MockCordonedFeatureFetcher::new(); + cordoned_feature_fetcher.expect_fetch().returning(move || { + Box::pin({ + let local_clone = cordoned_features_clone.to_vec(); + async move { Ok(local_clone) } + }) + }); + let subnet_manager = SubnetManager::new(registry.clone(), Arc::new(cordoned_feature_fetcher), health_client.clone()); + + // Act + let response = runtime.block_on( + subnet_manager + .with_target(crate::subnet_manager::SubnetTarget::FromNodesIds(vec![subnet.nodes.first().unwrap().id])) + .membership_replace(false, None, None, None, vec![], None), + ); + + // Assert + if !should_succeed { + if response.is_ok() { + failed_scenarios.push((response, cordoned_features, "Expected outcome to have an error".to_string())); + } + // If it failed, don't check the exact error + // assume it is the correct error. ATM this + // is not ideal but since we use anyhow its + // hard to test exact expected errors + continue; + } + + // Here we know it should have succeeded + if response.is_err() { + failed_scenarios.push((response, cordoned_features, "Expected outcome to be successful".to_string())); + continue; + } + + let response = response.unwrap(); + if response.removed_with_desc.is_empty() { + failed_scenarios.push((Ok(response), cordoned_features, "Expected nodes to be removed".to_string())); + continue; + } + + if response.added_with_desc.is_empty() { + failed_scenarios.push((Ok(response), cordoned_features, "Expected nodes to be added".to_string())); + continue; + } + + let mut failed_features = vec![]; + for (feature, value_diff) in response.feature_diff.iter() { + let cordoned_values_for_feature = cordoned_features + .iter() + .filter(|c_feature_pair| c_feature_pair.feature.equivalent(feature)) + .map(|c_feature_pair| c_feature_pair.value.clone()) + .collect_vec(); + + if cordoned_features.is_empty() { + // This feature is not cordoned so doesn't have to + // be validated + continue; + } + + let diff_for_cordoned_features = value_diff + .iter() + .filter(|(value, _)| cordoned_values_for_feature.contains(value)) + .collect_vec(); + + if diff_for_cordoned_features.is_empty() { + // Feature is cordoned but for a different value + // Example cordoned: `NodeFeature::City`, value: `City 1` + // Found for this replace: `NodeFeature::City`, value: `City 2` + continue; + } + let mut failed_for_feature = vec![]; + for (value, (_, in_nodes)) in diff_for_cordoned_features { + if in_nodes.gt(&0) { + failed_for_feature.push((value, in_nodes)); + continue; + } + } + + if !failed_for_feature.is_empty() { + failed_features.push(format!( + "Feature {} has cordoned values but still found some nodes added:\n{}", + feature, + failed_for_feature + .iter() + .map(|(value, in_nodes)| format!("\tValue: {}, New nodes with that value: {}", value, in_nodes)) + .join("\n") + )); + } + } + + if !failed_features.is_empty() { + failed_scenarios.push(( + Ok(response), + cordoned_features, + format!("All failed features:\n{}", failed_features.iter().join("\n")), + )); + } + } + + assert!( + failed_scenarios.is_empty(), + r#"World state: +{} +Failed scenarios: +{}"#, + pretty_print_world(&available_nodes, &subnet), + failed_scenarios + .iter() + .map(|(outcome, cordoned_features, explaination)| format!( + r#"Reason why it failed: + {} +Cordoned features: + [{}] +Test output: +{}"#, + explaination, + cordoned_features + .iter() + .map(|pair| format!("({}, {})", pair.feature, pair.value)) + .join(", "), + test_pretty_format_response(outcome) + )) + .join("\n############") + ) +} diff --git a/rs/cli/src/unit_tests/update_unassigned_nodes.rs b/rs/cli/src/unit_tests/update_unassigned_nodes.rs index 10bdae562..99c5b2e5c 100644 --- a/rs/cli/src/unit_tests/update_unassigned_nodes.rs +++ b/rs/cli/src/unit_tests/update_unassigned_nodes.rs @@ -3,7 +3,9 @@ use std::{str::FromStr, sync::Arc}; use crate::artifact_downloader::MockArtifactDownloader; use crate::auth::Neuron; use crate::commands::{update_unassigned_nodes::UpdateUnassignedNodes, ExecutableCommand}; +use crate::cordoned_feature_fetcher::MockCordonedFeatureFetcher; use crate::ic_admin::MockIcAdmin; +use ic_management_backend::health::MockHealthStatusQuerier; use ic_management_backend::{lazy_git::MockLazyGit, lazy_registry::MockLazyRegistry, proposal::MockProposalAgent}; use ic_management_types::{Network, Subnet}; use ic_types::PrincipalId; @@ -49,6 +51,8 @@ async fn should_skip_update_same_version_nns_not_provided() { Arc::new(MockLazyGit::new()), Arc::new(MockProposalAgent::new()), Arc::new(MockArtifactDownloader::new()), + Arc::new(MockCordonedFeatureFetcher::new()), + Arc::new(MockHealthStatusQuerier::new()), ); let cmd = UpdateUnassignedNodes { nns_subnet_id: None }; @@ -83,6 +87,8 @@ async fn should_skip_update_same_version_nns_provided() { Arc::new(MockLazyGit::new()), Arc::new(MockProposalAgent::new()), Arc::new(MockArtifactDownloader::new()), + Arc::new(MockCordonedFeatureFetcher::new()), + Arc::new(MockHealthStatusQuerier::new()), ); let cmd = UpdateUnassignedNodes { @@ -122,6 +128,8 @@ async fn should_update_unassigned_nodes() { Arc::new(MockLazyGit::new()), Arc::new(MockProposalAgent::new()), Arc::new(MockArtifactDownloader::new()), + Arc::new(MockCordonedFeatureFetcher::new()), + Arc::new(MockHealthStatusQuerier::new()), ); let cmd = UpdateUnassignedNodes { @@ -159,6 +167,8 @@ async fn should_fail_nns_not_found() { Arc::new(MockLazyGit::new()), Arc::new(MockProposalAgent::new()), Arc::new(MockArtifactDownloader::new()), + Arc::new(MockCordonedFeatureFetcher::new()), + Arc::new(MockHealthStatusQuerier::new()), ); let cmd = UpdateUnassignedNodes { diff --git a/rs/cli/src/unit_tests/version.rs b/rs/cli/src/unit_tests/version.rs index 80588566b..0360081d2 100644 --- a/rs/cli/src/unit_tests/version.rs +++ b/rs/cli/src/unit_tests/version.rs @@ -2,7 +2,7 @@ use indexmap::IndexMap; use std::sync::{Arc, RwLock}; use futures::future::ok; -use ic_management_backend::{lazy_git::MockLazyGit, lazy_registry::MockLazyRegistry, proposal::MockProposalAgent}; +use ic_management_backend::{health::MockHealthStatusQuerier, lazy_git::MockLazyGit, lazy_registry::MockLazyRegistry, proposal::MockProposalAgent}; use ic_management_types::{Artifact, ArtifactReleases, Network}; use itertools::Itertools; @@ -10,6 +10,7 @@ use crate::{ artifact_downloader::MockArtifactDownloader, auth::Neuron, commands::ExecutableCommand, + cordoned_feature_fetcher::MockCordonedFeatureFetcher, ctx::tests::get_mocked_ctx, ic_admin::{MockIcAdmin, ProposeCommand, ProposeOptions}, runner::{format_regular_version_upgrade_summary, format_security_hotfix}, @@ -69,6 +70,8 @@ async fn guest_os_elect_version_tests() { Arc::new(git), Arc::new(proposal_agent), Arc::new(artifact_downloader), + Arc::new(MockCordonedFeatureFetcher::new()), + Arc::new(MockHealthStatusQuerier::new()), ); for (name, expected_title, cmd) in [ diff --git a/rs/decentralization/src/nakamoto/mod.rs b/rs/decentralization/src/nakamoto/mod.rs index 2220b95be..f08eb5ddd 100644 --- a/rs/decentralization/src/nakamoto/mod.rs +++ b/rs/decentralization/src/nakamoto/mod.rs @@ -785,7 +785,7 @@ mod tests { ); let subnet_change_req = SubnetChangeRequest::new(subnet_initial, nodes_available, Vec::new(), Vec::new(), Vec::new()); - let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes).unwrap(); + let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![]).unwrap(); for log in subnet_change.after().run_log.iter() { println!("{}", log); } @@ -842,7 +842,7 @@ mod tests { ); let subnet_change_req = SubnetChangeRequest::new(subnet_initial, nodes_available, Vec::new(), Vec::new(), Vec::new()); - let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes).unwrap(); + let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![]).unwrap(); println!("Replacement run log:"); for line in subnet_change.after().run_log.iter() { println!("{}", line); @@ -900,7 +900,7 @@ mod tests { ); let subnet_change_req = SubnetChangeRequest::new(subnet_initial, nodes_available, Vec::new(), Vec::new(), Vec::new()); - let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes).unwrap(); + let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![]).unwrap(); println!("Replacement run log:"); for line in subnet_change.after().run_log.iter() { @@ -1105,7 +1105,7 @@ mod tests { important.insert(subnet.principal, subnet); let network_heal_response = NetworkHealRequest::new(important.clone()) - .heal_and_optimize(nodes_available.clone(), &health_of_nodes) + .heal_and_optimize(nodes_available.clone(), &health_of_nodes, vec![]) .await .unwrap(); let result = network_heal_response.first().unwrap().clone(); @@ -1131,7 +1131,7 @@ mod tests { let with_keeping_features = change_initial .clone() .keeping_from_used(vec!["CH".to_string()]) - .rescue(&health_of_nodes) + .rescue(&health_of_nodes, vec![]) .unwrap(); assert_eq!(with_keeping_features.added().len(), 6); @@ -1149,7 +1149,7 @@ mod tests { let with_keeping_principals = change_initial .clone() .keeping_from_used(vec!["CH".to_string()]) - .rescue(&health_of_nodes) + .rescue(&health_of_nodes, vec![]) .unwrap(); assert_eq!(with_keeping_principals.added().len(), 6); @@ -1163,7 +1163,7 @@ mod tests { 1 ); - let rescue_all = change_initial.clone().rescue(&health_of_nodes).unwrap(); + let rescue_all = change_initial.clone().rescue(&health_of_nodes, vec![]).unwrap(); assert_eq!(rescue_all.added().len(), 7); assert_eq!(rescue_all.removed().len(), 7); diff --git a/rs/decentralization/src/network.rs b/rs/decentralization/src/network.rs index 7d515edc3..6ca2ab14a 100644 --- a/rs/decentralization/src/network.rs +++ b/rs/decentralization/src/network.rs @@ -837,6 +837,7 @@ pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier + Sync { exclude_nodes: Vec, only_nodes: Vec, health_of_nodes: &'a IndexMap, + cordoned_features: Vec, ) -> BoxFuture<'a, Result> { Box::pin(async move { SubnetChangeRequest { @@ -846,7 +847,7 @@ pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier + Sync { .including_from_available(include_nodes.clone()) .excluding_from_available(exclude_nodes.clone()) .including_from_available(only_nodes.clone()) - .resize(size, 0, 0, health_of_nodes) + .resize(size, 0, 0, health_of_nodes, cordoned_features) }) } } @@ -883,6 +884,12 @@ impl> MatchAnyNode for std::slice::Iter<'_, T> { } } +#[derive(Debug, Clone)] +pub struct NodeFeaturePair { + pub feature: NodeFeature, + pub value: String, +} + #[derive(Default, Clone, Debug)] pub struct SubnetChangeRequest { subnet: DecentralizedSubnet, @@ -975,6 +982,7 @@ impl SubnetChangeRequest { optimize_count: usize, replacements_unhealthy_with_desc: &[(Node, String)], health_of_nodes: &IndexMap, + cordoned_features: Vec, ) -> Result { let old_nodes = self.subnet.nodes.clone(); self.subnet = self.subnet.without_nodes(replacements_unhealthy_with_desc.to_owned())?; @@ -983,11 +991,16 @@ impl SubnetChangeRequest { optimize_count, replacements_unhealthy_with_desc.len(), health_of_nodes, + cordoned_features, )?; Ok(SubnetChange { old_nodes, ..result }) } - pub fn rescue(mut self, health_of_nodes: &IndexMap) -> Result { + pub fn rescue( + mut self, + health_of_nodes: &IndexMap, + cordoned_features: Vec, + ) -> Result { let old_nodes = self.subnet.nodes.clone(); let nodes_to_remove = self .subnet @@ -1009,6 +1022,7 @@ impl SubnetChangeRequest { 0, self.subnet.removed_nodes_desc.len(), health_of_nodes, + cordoned_features, )?; Ok(SubnetChange { old_nodes, ..result }) } @@ -1020,10 +1034,11 @@ impl SubnetChangeRequest { how_many_nodes_to_remove: usize, how_many_nodes_unhealthy: usize, health_of_nodes: &IndexMap, + cordoned_features: Vec, ) -> Result { let old_nodes = self.subnet.nodes.clone(); - let available_nodes = self + let all_healthy_nodes = self .available_nodes .clone() .into_iter() @@ -1031,13 +1046,36 @@ impl SubnetChangeRequest { .filter(|n| health_of_nodes.get(&n.id).unwrap_or(&HealthStatus::Unknown) == &HealthStatus::Healthy) .collect::>(); + let all_healthy_nodes_count = all_healthy_nodes.len(); + let available_nodes = all_healthy_nodes + .into_iter() + .filter(|n| { + for cordoned_feature in &cordoned_features { + if let Some(node_feature) = n.features.get(&cordoned_feature.feature) { + if PartialEq::eq(&node_feature, &cordoned_feature.value) { + // Node contains cordoned feature + // exclude it from available pool + return false; + } + } + } + // Node doesn't contain any cordoned features + // include it the available pool + true + }) + .collect_vec(); + info!( - "Resizing subnet {} by removing {} (+{} unhealthy) nodes and adding {} nodes. Available {} healthy nodes.", + "Resizing subnet {} by removing {} (+{} unhealthy) nodes and adding {} nodes. Available {} healthy nodes{}.", self.subnet.id, how_many_nodes_to_add, how_many_nodes_to_remove, how_many_nodes_unhealthy, - available_nodes.len() + available_nodes.len(), + match all_healthy_nodes_count - available_nodes.len() { + 0 => "".to_string(), + cordoned_nodes => format!(" (There are {} cordoned healthy nodes)", cordoned_nodes), + }, ); let resized_subnet = if how_many_nodes_to_remove > 0 { @@ -1054,6 +1092,20 @@ impl SubnetChangeRequest { .cloned() .chain(resized_subnet.removed_nodes_desc.iter().map(|(n, _)| n.clone())) .filter(|n| health_of_nodes.get(&n.id).unwrap_or(&HealthStatus::Unknown) == &HealthStatus::Healthy) + .filter(|n| { + for cordoned_feature in &cordoned_features { + if let Some(node_feature) = n.features.get(&cordoned_feature.feature) { + if PartialEq::eq(&node_feature, &cordoned_feature.value) { + // Node contains cordoned feature + // exclude it from available pool + return false; + } + } + } + // Node doesn't contain any cordoned features + // include it the available pool + true + }) .collect::>(); let resized_subnet = resized_subnet .with_nodes( @@ -1084,8 +1136,12 @@ impl SubnetChangeRequest { /// Evaluates the subnet change request to simulate the requested topology /// change. Command returns all the information about the subnet before /// and after the change. - pub fn evaluate(self, health_of_nodes: &IndexMap) -> Result { - self.resize(0, 0, 0, health_of_nodes) + pub fn evaluate( + self, + health_of_nodes: &IndexMap, + cordoned_features: Vec, + ) -> Result { + self.resize(0, 0, 0, health_of_nodes, cordoned_features) } } @@ -1200,10 +1256,10 @@ impl NetworkHealRequest { &self, mut available_nodes: Vec, health_of_nodes: &IndexMap, + cordoned_features: Vec, ) -> Result, NetworkError> { let mut subnets_changed = Vec::new(); let subnets_to_heal = unhealthy_with_nodes(&self.subnets, health_of_nodes) - .await .iter() .flat_map(|(subnet_id, unhealthy_nodes)| { let unhealthy_nodes = unhealthy_nodes.iter().map(Node::from).collect::>(); @@ -1272,7 +1328,12 @@ impl NetworkHealRequest { .filter_map(|num_nodes_to_optimize| { change_req .clone() - .optimize(num_nodes_to_optimize, unhealthy_nodes_with_desc, health_of_nodes) + .optimize( + num_nodes_to_optimize, + unhealthy_nodes_with_desc, + health_of_nodes, + cordoned_features.clone(), + ) .map_err(|e| warn!("{}", e)) .ok() }) diff --git a/rs/decentralization/src/subnets.rs b/rs/decentralization/src/subnets.rs index 0400b748b..0c546d3f9 100644 --- a/rs/decentralization/src/subnets.rs +++ b/rs/decentralization/src/subnets.rs @@ -8,7 +8,7 @@ use indexmap::IndexMap; use itertools::Itertools; use std::sync::Arc; -pub async fn unhealthy_with_nodes( +pub fn unhealthy_with_nodes( subnets: &IndexMap, nodes_health: &IndexMap, ) -> IndexMap> { diff --git a/rs/ic-management-backend/src/health.rs b/rs/ic-management-backend/src/health.rs index 894ea93cc..55989ac62 100644 --- a/rs/ic-management-backend/src/health.rs +++ b/rs/ic-management-backend/src/health.rs @@ -1,4 +1,6 @@ +use futures::future::BoxFuture; use indexmap::IndexMap; +use mockall::automock; use std::{collections::HashSet, str::FromStr}; use ic_base_types::PrincipalId; @@ -24,17 +26,17 @@ impl HealthClient { } impl HealthStatusQuerier for HealthClient { - async fn subnet(&self, subnet: PrincipalId) -> anyhow::Result> { + fn subnet(&self, subnet: PrincipalId) -> BoxFuture<'_, anyhow::Result>> { match &self.implementation { - HealthStatusQuerierImplementations::Dashboard(c) => c.subnet(subnet).await, - HealthStatusQuerierImplementations::Prometheus(c) => c.subnet(subnet).await, + HealthStatusQuerierImplementations::Dashboard(c) => c.subnet(subnet), + HealthStatusQuerierImplementations::Prometheus(c) => c.subnet(subnet), } } - async fn nodes(&self) -> anyhow::Result> { + fn nodes(&self) -> BoxFuture<'_, anyhow::Result>> { match &self.implementation { - HealthStatusQuerierImplementations::Dashboard(c) => c.nodes().await, - HealthStatusQuerierImplementations::Prometheus(c) => c.nodes().await, + HealthStatusQuerierImplementations::Dashboard(c) => c.nodes(), + HealthStatusQuerierImplementations::Prometheus(c) => c.nodes(), } } } @@ -54,10 +56,11 @@ impl From for HealthStatusQuerierImplementations { } } +#[automock] #[allow(dead_code)] -pub trait HealthStatusQuerier { - fn subnet(&self, subnet: PrincipalId) -> impl std::future::Future>> + Send; - fn nodes(&self) -> impl std::future::Future>> + Send; +pub trait HealthStatusQuerier: Send + Sync { + fn subnet(&self, subnet: PrincipalId) -> BoxFuture<'_, anyhow::Result>>; + fn nodes(&self) -> BoxFuture<'_, anyhow::Result>>; } pub struct PublicDashboardHealthClient { @@ -186,21 +189,23 @@ struct ShortNodeInfo { } impl HealthStatusQuerier for PublicDashboardHealthClient { - async fn subnet(&self, subnet: PrincipalId) -> anyhow::Result> { - Ok(self - .get_all_nodes() - .await? - .into_iter() - .filter(|n| match n.subnet_id { - None => false, - Some(p) => p.eq(&subnet), - }) - .map(|n| (n.node_id, n.status)) - .collect()) + fn subnet(&self, subnet: PrincipalId) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async move { + Ok(self + .get_all_nodes() + .await? + .into_iter() + .filter(|n| match n.subnet_id { + None => false, + Some(p) => p.eq(&subnet), + }) + .map(|n| (n.node_id, n.status)) + .collect()) + }) } - async fn nodes(&self) -> anyhow::Result> { - Ok(self.get_all_nodes().await?.into_iter().map(|n| (n.node_id, n.status)).collect()) + fn nodes(&self) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async { Ok(self.get_all_nodes().await?.into_iter().map(|n| (n.node_id, n.status)).collect()) }) } } @@ -219,65 +224,69 @@ impl PrometheusHealthClient { } impl HealthStatusQuerier for PrometheusHealthClient { - async fn subnet(&self, subnet: PrincipalId) -> anyhow::Result> { - let ic_name = self.network.legacy_name(); - let subnet_name = subnet.to_string(); - let query_up = Selector::new() - .metric("up") - .eq("ic", ic_name.as_str()) - .eq("job", "replica") - .eq("ic_subnet", subnet_name.as_str()); - - let response_up = self.client.query(query_up).get().await?; - let instant_up = response_up.data().as_vector().expect("Expected instant vector"); - - // Alerts are synthetic time series and cannot be queries as regular metrics - // https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#inspecting-alerts-during-runtime - let query_alert = format!( - "ALERTS{{ic=\"{}\", job=\"replica\", ic_subnet=\"{}\", alertstate=\"firing\"}}", - self.network.legacy_name(), - subnet - ); - let response_alert = self.client.query(query_alert).get().await?; - let instant_alert = response_alert.data().as_vector().expect("Expected instant vector"); - let node_ids_with_alerts: HashSet = instant_alert - .iter() - .filter_map(|r| r.metric().get("ic_node").and_then(|id| PrincipalId::from_str(id).ok())) - .collect(); - - Ok(instant_up - .iter() - .filter_map(|r| { - r.metric().get("ic_node").and_then(|id| PrincipalId::from_str(id).ok()).map(|id| { - let status = if r.sample().value() == 1.0 { - if node_ids_with_alerts.contains(&id) { - HealthStatus::Degraded + fn subnet(&self, subnet: PrincipalId) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async move { + let ic_name = self.network.legacy_name(); + let subnet_name = subnet.to_string(); + let query_up = Selector::new() + .metric("up") + .eq("ic", ic_name.as_str()) + .eq("job", "replica") + .eq("ic_subnet", subnet_name.as_str()); + + let response_up = self.client.query(query_up).get().await?; + let instant_up = response_up.data().as_vector().expect("Expected instant vector"); + + // Alerts are synthetic time series and cannot be queries as regular metrics + // https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#inspecting-alerts-during-runtime + let query_alert = format!( + "ALERTS{{ic=\"{}\", job=\"replica\", ic_subnet=\"{}\", alertstate=\"firing\"}}", + self.network.legacy_name(), + subnet + ); + let response_alert = self.client.query(query_alert).get().await?; + let instant_alert = response_alert.data().as_vector().expect("Expected instant vector"); + let node_ids_with_alerts: HashSet = instant_alert + .iter() + .filter_map(|r| r.metric().get("ic_node").and_then(|id| PrincipalId::from_str(id).ok())) + .collect(); + + Ok(instant_up + .iter() + .filter_map(|r| { + r.metric().get("ic_node").and_then(|id| PrincipalId::from_str(id).ok()).map(|id| { + let status = if r.sample().value() == 1.0 { + if node_ids_with_alerts.contains(&id) { + HealthStatus::Degraded + } else { + HealthStatus::Healthy + } } else { - HealthStatus::Healthy - } - } else { - HealthStatus::Dead - }; - (id, status) + HealthStatus::Dead + }; + (id, status) + }) }) - }) - .collect()) + .collect()) + }) } - async fn nodes(&self) -> anyhow::Result> { - let query = format!( - r#"ic_replica_orchestrator:health_state:bottomk_1{{ic="{network}"}}"#, - network = self.network.legacy_name(), - ); - let response = self.client.query(query).get().await?; - let results = response.data().as_vector().expect("Expected instant vector"); - Ok(results - .iter() - .filter_map(|r| { - let status = HealthStatus::from_str(r.metric().get("state").expect("all vectors should have a state label")) - .expect("all vectors should have a valid label"); - r.metric().get("ic_node").map(|id| (PrincipalId::from_str(id).unwrap(), status)) - }) - .collect()) + fn nodes(&self) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async { + let query = format!( + r#"ic_replica_orchestrator:health_state:bottomk_1{{ic="{network}"}}"#, + network = self.network.legacy_name(), + ); + let response = self.client.query(query).get().await?; + let results = response.data().as_vector().expect("Expected instant vector"); + Ok(results + .iter() + .filter_map(|r| { + let status = HealthStatus::from_str(r.metric().get("state").expect("all vectors should have a state label")) + .expect("all vectors should have a valid label"); + r.metric().get("ic_node").map(|id| (PrincipalId::from_str(id).unwrap(), status)) + }) + .collect()) + }) } }