diff --git a/.gitignore b/.gitignore index c9ec14c1b..00763a716 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Generated by Cargo # will have compiled files and executables /target/ +/multi-block/target/ # These are backup files generated by rustfmt **/*.rs.bk diff --git a/multi-block/Cargo.toml b/multi-block/Cargo.toml new file mode 100644 index 000000000..fc69d854b --- /dev/null +++ b/multi-block/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "multi-block" +version = "0.1.0" +edition = "2021" + +[dependencies] +codec = { package = "parity-scale-codec", version = "3.0.0" } +scale-info = { package = "scale-info", version = "2.11.2" } +thiserror = "1.0" +tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "sync", "signal"] } +clap = { version = "4.5", features = ["derive", "env"] } +jsonrpsee = { version = "0.22.3", features = ["ws-client"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +serde = "1.0" +serde_json = "1.0" +futures = "0.3" +log = "0.4" +once_cell = "1.20" +hyper = { version = "0.14.30", features = ["server", "http1", "http2", "tcp"] } + +# subxt +scale-value = "0.16.3" +subxt = { version = "0.37.0", features = ["substrate-compat"] } + +# polkadot-sdk +#frame-election-provider-support = "37.0.0" +#pallet-election-provider-multi-block = "4.0.0-dev" # uncomment once pallet is published +#sp-npos-elections = "34.0.0" +#frame-support = "37.0.0" +#sp-runtime = "39.0.1" + +frame-election-provider-support = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/frame/election-provider-support" } +pallet-election-provider-multi-block = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/frame/election-provider-multi-block" } +sp-npos-elections = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/primitives/npos-elections" } +frame-support = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/frame/support" } +sp-runtime = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/primitives/runtime" } diff --git a/multi-block/metadata.scale b/multi-block/metadata.scale new file mode 100644 index 000000000..6038920eb Binary files /dev/null and b/multi-block/metadata.scale differ diff --git a/multi-block/src/client.rs b/multi-block/src/client.rs new file mode 100644 index 000000000..f78131317 --- /dev/null +++ b/multi-block/src/client.rs @@ -0,0 +1,51 @@ +use crate::prelude::*; +use jsonrpsee::ws_client::WsClientBuilder; +use subxt::backend::rpc::RpcClient as RawRpcClient; + +/// Wraps the subxt interfaces to make it easy to use for the staking-miner. +#[derive(Clone, Debug)] +pub struct Client { + /// Access to typed rpc calls from subxt. + rpc: RpcClient, + /// Access to chain APIs such as storage, events etc. + chain_api: ChainClient, +} + +impl Client { + pub async fn new(uri: &str) -> Result { + log::debug!(target: LOG_TARGET, "attempting to connect to {:?}", uri); + + let rpc = loop { + match WsClientBuilder::default() + .max_request_size(u32::MAX) + .max_response_size(u32::MAX) + .request_timeout(std::time::Duration::from_secs(600)) + .build(&uri) + .await + { + Ok(rpc) => break RawRpcClient::new(rpc), + Err(e) => { + log::warn!( + target: LOG_TARGET, + "failed to connect to client due to {:?}, retrying soon..", + e, + ); + }, + }; + tokio::time::sleep(std::time::Duration::from_millis(2_500)).await; + }; + + let chain_api = ChainClient::from_rpc_client(rpc.clone()).await?; + Ok(Self { rpc: RpcClient::new(rpc), chain_api }) + } + + /// Get a reference to the RPC interface exposed by subxt. + pub fn rpc(&self) -> &RpcClient { + &self.rpc + } + + /// Get a reference to the chain API. + pub fn chain_api(&self) -> &ChainClient { + &self.chain_api + } +} diff --git a/multi-block/src/commands/mod.rs b/multi-block/src/commands/mod.rs new file mode 100644 index 000000000..11d4889c8 --- /dev/null +++ b/multi-block/src/commands/mod.rs @@ -0,0 +1,3 @@ +pub mod monitor; + +pub use monitor::monitor_cmd; diff --git a/multi-block/src/commands/monitor.rs b/multi-block/src/commands/monitor.rs new file mode 100644 index 000000000..98d073283 --- /dev/null +++ b/multi-block/src/commands/monitor.rs @@ -0,0 +1,245 @@ +use crate::{client::Client, error::Error, prelude::*, signer::Signer}; + +use crate::{ + epm, helpers, runtime::runtime_types::pallet_election_provider_multi_block::types::*, + static_types, +}; + +use pallet_election_provider_multi_block::unsigned::miner; + +use clap::Parser; +use std::sync::Arc; +use subxt::backend::rpc::RpcSubscription; +use tokio::sync::Mutex; + +/// Represents the current task to be performed in the election round. +enum Task { + /// Do nothing. + Nothing, + /// A page of the target snapshot has been fetched, cache it locally. + CacheTargetSnapshot(TargetSnapshotPage), + /// A page of the voter snapshot has been fetched, cache it locally. + CacheVoterSnapshot(VoterSnapshotPage), + /// Next task is to compute the election. + ComputeElectionResult, +} + +#[derive(Debug, Clone, Parser, PartialEq)] +pub struct MonitorConfig { + #[clap(long)] + pub at: Option, + + #[clap(long, short, env = "SEED")] + pub seed_or_path: String, + + #[clap(long, value_enum, default_value_t = Listen::Head)] + pub listen: Listen, +} + +#[derive(clap::ValueEnum, Debug, Copy, Clone, PartialEq)] +pub enum Listen { + Finalized, + Head, +} + +/// Monitors the current on-chain phase and performs the relevant actions. +/// +/// Steps: +/// 1. In `Phase::Snapshot(page)`, fetch the pages of the target and voter snapshot. +/// 2. In `Phase::Signed`: +/// 2.1. Compute the solution +/// 2.2. Register the solution with the *full* election score of the submission +/// 2.3. Submit each page separately (1 per block). +/// +/// If by the time of computing the solution the snapshot pages have not been fetched and cached, +/// the snapshots will be fetched before computing the solution. +pub async fn monitor_cmd(client: Client, config: MonitorConfig) -> Result<(), Error> +where + T: miner::Config< + AccountId = AccountId, + MaxVotesPerVoter = static_types::MaxVotesPerVoter, + TargetSnapshotPerBlock = static_types::TargetSnapshotPerBlock, + VoterSnapshotPerBlock = static_types::VoterSnapshotPerBlock, + Pages = static_types::Pages, + > + Send + + Sync + + 'static, + T::Solution: Send, +{ + let signer = Signer::new(&config.seed_or_path)?; + let account_info = { + let addr = runtime::storage().system().account(signer.account_id()); + client + .chain_api() + .storage() + .at_latest() + .await? + .fetch(&addr) + .await? + .ok_or(Error::AccountDoesNotExists)? + }; + + log::info!(target: LOG_TARGET, "Loaded account {} {{ nonce: {}, free_balance: {}, reserved_balance: {}, frozen_balance: {} }}", + signer, + account_info.nonce, + account_info.data.free, + account_info.data.reserved, + account_info.data.frozen, + ); + + let mut subscription = heads_subscription(client.rpc(), config.listen).await?; + let (_tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + let _submit_lock = Arc::new(Mutex::new(())); + + let mut target_snapshot: TargetSnapshotPage = Default::default(); + let mut voter_snapshot_paged: Vec = Default::default(); + + let n_pages = static_types::Pages::get(); + let mut last_round_submitted = None; + + loop { + let at = tokio::select! { + maybe_rp = subscription.next() => { + match maybe_rp { + Some(Ok(r)) => r, + Some(Err(e)) => { + log::error!(target: LOG_TARGET, "subscription failed to decode Header {:?}, this is bug please file an issue", e); + return Err(e.into()); + } + // The subscription was dropped, should only happen if: + // - the connection was closed. + // - the subscription could not keep up with the server. + None => { + log::warn!(target: LOG_TARGET, "subscription to `{:?}` terminated. Retrying..", config.listen); + subscription = heads_subscription(client.rpc(), config.listen).await?; + continue + } + } + }, + maybe_err = rx.recv() => { + match maybe_err { + Some(err) => return Err(err), + None => unreachable!("at least one sender kept in the main loop should always return Some; qed"), + } + } + }; + + let storage = helpers::storage_at(config.at, client.chain_api()).await?; + let storage2 = storage.clone(); + let phase = storage + .fetch_or_default(&runtime::storage().election_provider_multi_block().current_phase()) + .await?; + let round = storage + .fetch_or_default(&runtime::storage().election_provider_multi_block().round()) + .await?; + + let result = tokio::spawn(async move { + match phase { + Phase::Off => { + log::trace!(target: LOG_TARGET, "Phase::Off, do nothing."); + Task::Nothing + }, + + Phase::Snapshot(page) => + if page == n_pages { + epm::target_snapshot(&storage) + .await + .map(|t| Task::CacheTargetSnapshot(t)) + .unwrap_or(Task::Nothing) + } else { + epm::paged_voter_snapshot(page, &storage) + .await + .map(|v| Task::CacheVoterSnapshot(v)) + .unwrap_or(Task::Nothing) + }, + Phase::Signed => { + log::trace!(target: LOG_TARGET, "Phase::Signed",); + Task::ComputeElectionResult + }, + _ => { + log::trace!(target: LOG_TARGET, "{:?}, do nothing.", phase); + Task::Nothing + }, + } + }); + + match result.await { + Ok(Task::CacheTargetSnapshot(s)) => { + target_snapshot = s; + }, + Ok(Task::CacheVoterSnapshot(p)) => { + // TODO: page from msp -> lsp, prepend p instead of push(). + voter_snapshot_paged.push(p); + }, + Ok(Task::ComputeElectionResult) => { + if last_round_submitted == Some(round) { + // skip minig again, everything submitted. + log::trace!( + target: LOG_TARGET, + "Solution successfully submitted for round {}, do nothing.", + round + ); + continue + } + + if !target_snapshot.is_empty() && voter_snapshot_paged.len() == n_pages as usize { + // all pages in cache, compute election. + match epm::mine_and_submit::( + &at, + &client, + signer.clone(), + config.listen, + &target_snapshot, + &voter_snapshot_paged, + n_pages, + round, + ) + .await + { + Ok(_) => last_round_submitted = Some(round), + Err(err) => { + log::error!("mine_and_submit: {:?}", err); + }, // continue trying. + } + } else { + // TODO: check if there are already *some* pageed cached and fetch only missing + // ones. + match epm::fetch_mine_and_submit::( + &at, + &client, + signer.clone(), + config.listen, + &storage2, + n_pages, + round, + ) + .await + { + Ok(_) => last_round_submitted = Some(round), + Err(err) => { + log::error!("fetch_mine_and_submit: {:?}", err); + }, // continue trying. + } + log::trace!(target: LOG_TARGET, "not all snapshots in cache, fetch all and compute."); + } + }, + Ok(Task::Nothing) => { + // reset cached snapshot. + target_snapshot = Default::default(); + voter_snapshot_paged = Default::default(); + }, + Err(e) => log::error!(target: LOG_TARGET, "ERROR: {:?}", e), + } + } +} + +async fn heads_subscription( + rpc: &RpcClient, + listen: Listen, +) -> Result, Error> { + match listen { + Listen::Head => rpc.chain_subscribe_new_heads().await, + Listen::Finalized => rpc.chain_subscribe_finalized_heads().await, + } + .map_err(Into::into) +} diff --git a/multi-block/src/epm.rs b/multi-block/src/epm.rs new file mode 100644 index 000000000..2a34dd8c6 --- /dev/null +++ b/multi-block/src/epm.rs @@ -0,0 +1,384 @@ +use crate::{ + client::Client, commands::monitor::Listen, error::Error, helpers, prelude::*, signer::Signer, + static_types, +}; +use pallet_election_provider_multi_block::unsigned::miner; +use sp_npos_elections::ElectionScore; + +use codec::{Decode, Encode}; +use frame_support::BoundedVec; +use scale_info::PortableRegistry; +use scale_value::scale::decode_as_type; +use subxt::{ + config::{DefaultExtrinsicParamsBuilder, Header as _}, + dynamic::Value, + tx::DynamicPayload, +}; + +const EPM_PALLET_NAME: &str = "ElectionProviderMultiBlock"; +const EPM_SIGNED_PALLET_NAME: &str = "ElectionSignedPallet"; + +type TypeId = u32; +type PagedRawSolutionOf = pallet_election_provider_multi_block::types::PagedRawSolution; + +#[derive(Copy, Clone, Debug)] +struct EpmConstant { + epm: &'static str, + constant: &'static str, +} + +impl std::fmt::Display for EpmConstant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}::{}", self.epm, self.constant)) + } +} + +impl EpmConstant { + const fn new(constant: &'static str) -> Self { + Self { epm: EPM_PALLET_NAME, constant } + } + const fn to_parts(self) -> (&'static str, &'static str) { + (self.epm, self.constant) + } +} + +pub(crate) fn update_metadata_constants(api: &ChainClient) -> Result<(), Error> { + const PAGES: EpmConstant = EpmConstant::new("Pages"); + const TARGET_SNAPSHOT_PER_BLOCK: EpmConstant = EpmConstant::new("TargetSnapshotPerBlock"); + const VOTER_SNAPSHOT_PER_BLOCK: EpmConstant = EpmConstant::new("VoterSnapshotPerBlock"); + const MAX_VOTES_PER_VOTER: EpmConstant = EpmConstant::new("MinerMaxVotesPerVoter"); + const MAX_BACKERS_PER_WINNER: EpmConstant = EpmConstant::new("MinerMaxBackersPerWinner"); + const MAX_WINNERS_PER_PAGE: EpmConstant = EpmConstant::new("MinerMaxWinnersPerPage"); + + let pages: u32 = read_constant(api, PAGES)?; + let target_snapshot_per_block: u32 = read_constant(api, TARGET_SNAPSHOT_PER_BLOCK)?; + let voter_snapshot_per_block: u32 = read_constant(api, VOTER_SNAPSHOT_PER_BLOCK)?; + let max_votes_per_voter: u32 = read_constant(api, MAX_VOTES_PER_VOTER)?; + let max_backers_per_winner: u32 = read_constant(api, MAX_BACKERS_PER_WINNER)?; + let max_winners_per_page: u32 = read_constant(api, MAX_WINNERS_PER_PAGE)?; + + fn log_metadata(metadata: EpmConstant, val: impl std::fmt::Display) { + log::trace!(target: LOG_TARGET, "updating metadata constant `{metadata}`: {val}",); + } + + log_metadata(PAGES, pages); + log_metadata(TARGET_SNAPSHOT_PER_BLOCK, target_snapshot_per_block); + log_metadata(VOTER_SNAPSHOT_PER_BLOCK, voter_snapshot_per_block); + log_metadata(MAX_VOTES_PER_VOTER, max_votes_per_voter); + log_metadata(MAX_BACKERS_PER_WINNER, max_backers_per_winner); + log_metadata(MAX_WINNERS_PER_PAGE, max_winners_per_page); + + static_types::Pages::set(pages); + static_types::TargetSnapshotPerBlock::set(target_snapshot_per_block); + static_types::VoterSnapshotPerBlock::set(voter_snapshot_per_block); + static_types::MaxVotesPerVoter::set(max_votes_per_voter); + static_types::MaxBackersPerWinner::set(max_backers_per_winner); + static_types::MaxWinnersPerPage::set(max_winners_per_page); + + Ok(()) +} + +/// Fetches the target snapshot. +/// +/// Note: the target snapshot is single paged. +pub(crate) async fn target_snapshot(storage: &Storage) -> Result { + let page_idx = vec![Value::from(0u32)]; + let addr = subxt::dynamic::storage(EPM_PALLET_NAME, "PagedTargetSnapshot", page_idx); + + match storage.fetch(&addr).await { + Ok(Some(val)) => { + let snapshot: TargetSnapshotPage = Decode::decode(&mut val.encoded())?; + log::trace!( + target: LOG_TARGET, + "Target snapshot with len {:?}", + snapshot.len() + ); + Ok(snapshot) + }, + Ok(None) => Err(Error::EmptySnapshot), + Err(err) => Err(err.into()), + } +} + +/// Fetches `page` of the voter snapshot. +pub(crate) async fn paged_voter_snapshot( + page: u32, + storage: &Storage, +) -> Result { + let page_idx = vec![Value::from(page)]; + let addr = subxt::dynamic::storage(EPM_PALLET_NAME, "PagedVoterSnapshot", page_idx); + + match storage.fetch(&addr).await { + Ok(Some(val)) => match Decode::decode(&mut val.encoded()) { + Ok(s) => { + let snapshot: VoterSnapshotPage = s; + log::trace!( + target: LOG_TARGET, + "Voter snapshot page {:?} with len {:?}", + page, + snapshot.len() + ); + Ok(snapshot) + }, + Err(err) => Err(err.into()), + }, + Ok(None) => Err(Error::EmptySnapshot), + Err(err) => Err(err.into()), + } +} + +/// Fetches the full voter and target snapshots. +pub(crate) async fn fetch_full_snapshots( + n_pages: u32, + storage: &Storage, +) -> Result<(TargetSnapshotPage, Vec), Error> { + log::trace!(target: LOG_TARGET, "fetch_full_snapshots"); + + let mut voters = vec![]; + let targets = target_snapshot(storage).await?; + + for page in 0..n_pages { + let paged_voters = paged_voter_snapshot(page, storage).await?; + voters.push(paged_voters); + } + + Ok((targets, voters)) +} + +/// Mines, registers and submits a solution. +pub(crate) async fn mine_and_submit( + at: &Header, + client: &Client, + signer: Signer, + listen: Listen, + target_snapshot: &TargetSnapshotPageOf, + voter_snapshot_paged: &Vec>, + n_pages: u32, + round: u32, +) -> Result<(), Error> +where + T: miner::Config< + AccountId = AccountId, + MaxVotesPerVoter = static_types::MaxVotesPerVoter, + TargetSnapshotPerBlock = static_types::TargetSnapshotPerBlock, + VoterSnapshotPerBlock = static_types::VoterSnapshotPerBlock, + Pages = static_types::Pages, + > + Send + + Sync + + 'static, + T::Solution: Send, +{ + // TODO: get from runtime/configs. + let do_reduce = false; + let desired_targets = 400; + + log::trace!( + target: LOG_TARGET, + "Mine, submit: election target snap size: {:?}, voter snap size: {:?}", + target_snapshot.len(), + voter_snapshot_paged.len() + ); + + let targets: TargetSnapshotPageOf = target_snapshot.clone(); + let voters: BoundedVec, T::Pages> = + BoundedVec::truncate_from(voter_snapshot_paged.clone()); + + let (paged_raw_solution, _trimming_status): (PagedRawSolutionOf, _) = + miner::Miner::::mine_paged_solution_with_snapshot( + &voters, + &targets, + n_pages, + round, + desired_targets, + do_reduce, + ) + .map_err(|_| Error::Miner("mine paged solution.".to_string()))?; + + let solution_pages: Vec<(u32, ElectionScore, ::Solution)> = + paged_raw_solution + .solution_pages + .iter() + .enumerate() + .map(|(page, solution)| { + let partial_score = miner::Miner::::compute_partial_score( + &voters, + &targets, + solution, + desired_targets, + page as u32, + ) + .map_err(|_| Error::Miner("solution compute..".to_string()))?; + + Ok((page as u32, partial_score, solution.to_owned())) + }) + .collect::, Error>>()?; + + // register solution. + let register_tx = register_solution(paged_raw_solution.score)?; + submit_and_watch::(at, client, signer.clone(), listen, register_tx, "register").await?; + + // submit all solution pages. + for (page, _score, solution) in solution_pages { + let submit_tx = submit_page::(page, Some(solution))?; + submit_and_watch::(at, client, signer.clone(), listen, submit_tx, "submit page").await?; + } + + Ok(()) +} + +/// Performs the whole set of steps to submit a new solution: +/// 1. Fetches target and voter snapshots; +/// 2. Mines a solution; +/// 3. Registers new solution; +/// 4. Submits the paged solution. +pub(crate) async fn fetch_mine_and_submit( + at: &Header, + client: &Client, + signer: Signer, + listen: Listen, + storage: &Storage, + n_pages: u32, + round: u32, +) -> Result<(), Error> +where + T: miner::Config< + AccountId = AccountId, + MaxVotesPerVoter = static_types::MaxVotesPerVoter, + TargetSnapshotPerBlock = static_types::TargetSnapshotPerBlock, + VoterSnapshotPerBlock = static_types::VoterSnapshotPerBlock, + Pages = static_types::Pages, + > + Send + + Sync + + 'static, + T::Solution: Send, +{ + let (target_snapshot, voter_snapshot_paged) = fetch_full_snapshots(n_pages, storage).await?; + log::trace!( + target: LOG_TARGET, + "Fetched, full election target snapshot with {} targets, voter snapshot with {:?} pages.", + target_snapshot.len(), + voter_snapshot_paged.len() + ); + + mine_and_submit::( + at, + client, + signer, + listen, + &target_snapshot, + &voter_snapshot_paged, + n_pages, + round, + ) + .await +} + +/// Submits and watches a `DynamicPayload`, ie. an extrinsic. +async fn submit_and_watch( + at: &Header, + client: &Client, + signer: Signer, + listen: Listen, + tx: DynamicPayload, + reason: &'static str, +) -> Result<(), Error> { + let block_hash = at.hash(); + let nonce = client.rpc().system_account_next_index(signer.account_id()).await?; + + log::trace!(target: LOG_TARGET, "submit_and_watch for `{:?}` at {:?}", reason, block_hash); + + // TODO:set mortality. + let xt_cfg = DefaultExtrinsicParamsBuilder::default().nonce(nonce).build(); + let xt = client.chain_api().tx().create_signed(&tx, &*signer, xt_cfg).await?; + + let tx_progress = xt.submit_and_watch().await.map_err(|e| { + log::error!(target: LOG_TARGET, "submit solution failed: {:?}", e); + e + })?; + + match listen { + Listen::Head => { + let in_block = helpers::wait_for_in_block(tx_progress).await?; + let _events = in_block.fetch_events().await.expect("events should exist"); + // TODO: finish + }, + Listen::Finalized => { + let finalized_block = tx_progress.wait_for_finalized().await?; + let _block_hash = finalized_block.block_hash(); + let _finalized_success = finalized_block.wait_for_success().await?; + // TODO: finish + }, + } + Ok(()) +} + +/// Helper to construct a register solution transaction. +fn register_solution(election_score: ElectionScore) -> Result { + let scale_score = to_scale_value(election_score).map_err(|err| { + Error::DynamicTransaction(format!("failed to encode `ElectionScore: {:?}", err)) + })?; + + Ok(subxt::dynamic::tx(EPM_SIGNED_PALLET_NAME, "register", vec![scale_score])) +} + +/// Helper to construct a submit page transaction. +fn submit_page( + page: u32, + maybe_solution: Option, +) -> Result { + let scale_page = to_scale_value(page) + .map_err(|err| Error::DynamicTransaction(format!("failed to encode Page: {:?}", err)))?; + + let scale_solution = to_scale_value(maybe_solution).map_err(|err| { + Error::DynamicTransaction(format!("failed to encode Solution: {:?}", err)) + })?; + + Ok(subxt::dynamic::tx(EPM_SIGNED_PALLET_NAME, "submit_page", vec![scale_page, scale_solution])) +} + +fn read_constant<'a, T: serde::Deserialize<'a>>( + api: &ChainClient, + constant: EpmConstant, +) -> Result { + let (epm_name, constant) = constant.to_parts(); + + let val = api + .constants() + .at(&subxt::dynamic::constant(epm_name, constant)) + .map_err(|e| invalid_metadata_error(constant.to_string(), e))? + .to_value() + .map_err(|e| Error::Subxt(e.into()))?; + + scale_value::serde::from_value::<_, T>(val).map_err(|e| { + Error::InvalidMetadata(format!("Decoding `{}` failed {}", std::any::type_name::(), e)) + }) +} + +fn invalid_metadata_error(item: String, err: E) -> Error { + Error::InvalidMetadata(format!("{} failed: {}", item, err)) +} + +fn make_type() -> (TypeId, PortableRegistry) { + let m = scale_info::MetaType::new::(); + let mut types = scale_info::Registry::new(); + let id = types.register_type(&m); + let portable_registry: PortableRegistry = types.into(); + + (id.id, portable_registry) +} + +fn to_scale_value(val: T) -> Result { + let (ty_id, types) = make_type::(); + + let bytes = val.encode(); + + decode_as_type(&mut bytes.as_ref(), ty_id, &types) + .map(|v| v.remove_context()) + .map_err(|e| { + Error::DynamicTransaction(format!( + "Failed to decode {}: {:?}", + std::any::type_name::(), + e + )) + }) +} diff --git a/multi-block/src/error.rs b/multi-block/src/error.rs new file mode 100644 index 000000000..04066ea70 --- /dev/null +++ b/multi-block/src/error.rs @@ -0,0 +1,29 @@ +use crate::prelude::*; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Failed to parse log directive: `{0}ยด")] + LogParse(#[from] tracing_subscriber::filter::ParseError), + #[error("I/O error: `{0}`")] + Io(#[from] std::io::Error), + #[error("subxt error: `{0}`")] + Subxt(#[from] subxt::Error), + #[error("Invalid chain: `{0}`, staking-miner supports only polkadot, kusama and westend")] + InvalidChain(String), + #[error("Invalid metadata: {0}")] + InvalidMetadata(String), + #[error("Codec error: `{0}`")] + Codec(#[from] codec::Error), + #[error("The account does not exist")] + AccountDoesNotExists, + #[error("Crypto error: `{0:?}`")] + Crypto(sp_core::crypto::SecretStringError), + #[error("Empty snapshot")] + EmptySnapshot, + #[error("Miner error: `{0}`")] + Miner(String), + #[error("Dynamic transaction error: {0}")] + DynamicTransaction(String), + #[error("Other error: `{0}`")] + Other(String), +} diff --git a/multi-block/src/helpers.rs b/multi-block/src/helpers.rs new file mode 100644 index 000000000..c21bbd952 --- /dev/null +++ b/multi-block/src/helpers.rs @@ -0,0 +1,49 @@ +use subxt::{ + error::RpcError, + storage::Storage, + tx::{TxInBlock, TxProgress}, +}; + +use crate::{error::Error, prelude::*}; + +/// Helper to get storage at block. +pub async fn storage_at( + block: Option, + api: &ChainClient, +) -> Result, Error> { + if let Some(block_hash) = block { + Ok(api.storage().at(block_hash)) + } else { + api.storage().at_latest().await.map_err(Into::into) + } +} + +/// Wait for the transaction to be in a block. +/// +/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some +/// probability that the transaction will not make it into a block but there is no guarantee +/// that this is true. In those cases the stream is closed however, so you currently have no way to find +/// out if they finally made it into a block or not. +pub async fn wait_for_in_block( + mut tx: TxProgress, +) -> Result, subxt::Error> +where + T: subxt::Config + Clone, + C: subxt::client::OnlineClientT + std::fmt::Debug + Clone, +{ + use subxt::{error::TransactionError, tx::TxStatus}; + + while let Some(status) = tx.next().await { + match status? { + // Finalized or otherwise in a block! Return. + TxStatus::InBestBlock(s) | TxStatus::InFinalizedBlock(s) => return Ok(s), + // Error scenarios; return the error. + TxStatus::Error { message } => return Err(TransactionError::Error(message).into()), + TxStatus::Invalid { message } => return Err(TransactionError::Invalid(message).into()), + TxStatus::Dropped { message } => return Err(TransactionError::Dropped(message).into()), + // Ignore anything else and wait for next status event: + _ => continue, + } + } + Err(RpcError::SubscriptionDropped.into()) +} diff --git a/multi-block/src/main.rs b/multi-block/src/main.rs new file mode 100644 index 000000000..1ce9739c0 --- /dev/null +++ b/multi-block/src/main.rs @@ -0,0 +1,155 @@ +mod client; +mod commands; +mod epm; +mod error; +mod helpers; +mod opts; +mod prelude; +mod signer; +mod static_types; + +use crate::{commands::*, opts::*}; +use client::Client; +use error::Error; +use prelude::*; + +use clap::Parser; +use futures::future::{BoxFuture, FutureExt}; +use std::str::FromStr; +use tokio::sync::oneshot; +use tracing_subscriber::EnvFilter; + +#[derive(Debug, Clone, Parser)] +#[cfg_attr(test, derive(PartialEq))] +#[clap(author, version, about)] +pub struct Opt { + /// The `ws` node to connect to. + #[clap(long, short, default_value = DEFAULT_URI, env = "URI")] + pub uri: String, + + #[clap(subcommand)] + pub command: Command, + + /// Sets a custom logging filter. Syntax is `=`, e.g. -lpolkadot-staking-miner=debug. + /// + /// Log levels (least to most verbose) are error, warn, info, debug, and trace. + /// By default, all targets log `info`. The global log level can be set with `-l`. + #[clap(long, short, default_value = "info")] + pub log: String, +} + +#[derive(Debug, Clone, Parser)] +#[cfg_attr(test, derive(PartialEq))] +pub enum Command { + /// Monitor for the phase being signed, then compute. + Monitor(monitor::MonitorConfig), +} + +// TODO: use any_runtime! macro to cover all runtimes. +use crate::static_types::staking_dev::MinerConfig; + +#[tokio::main] +async fn main() -> Result<(), Error> { + let Opt { uri, command, log } = Opt::parse(); + let filter = EnvFilter::from_default_env().add_directive(log.parse()?); + tracing_subscriber::fmt().with_env_filter(filter).init(); + + let client = Client::new(&uri).await?; + let runtime_version: RuntimeVersion = + client.rpc().state_get_runtime_version(None).await?.into(); + let chain = Chain::from_str(&runtime_version.spec_name)?; + + log::info!(target: LOG_TARGET, "Connected to chain {:?}", chain); + epm::update_metadata_constants(client.chain_api())?; + + SHARED_CLIENT.set(client.clone()).expect("shared client only set once; qed"); + + // Start a new tokio task to perform the runtime updates in the background. + // if this fails then the miner will be stopped and has to be re-started. + let (tx_upgrade, rx_upgrade) = oneshot::channel::(); + tokio::spawn(runtime_upgrade_task(client.chain_api().clone(), tx_upgrade)); + + // TODO: any_runtime! + let fut = match command { + Command::Monitor(cfg) => commands::monitor_cmd::(client, cfg).boxed(), + // TODO: other commands + }; + + let res = run_command(fut, rx_upgrade).await; + log::debug!(target: LOG_TARGET, "round of execution finished. outcome = {:?}", res); + + res +} + +#[cfg(target_family = "unix")] +async fn run_command( + fut: BoxFuture<'_, Result<(), Error>>, + rx_upgrade: oneshot::Receiver, +) -> Result<(), Error> { + use tokio::signal::unix::{signal, SignalKind}; + + let mut stream_int = signal(SignalKind::interrupt()).map_err(Error::Io)?; + let mut stream_term = signal(SignalKind::terminate()).map_err(Error::Io)?; + + tokio::select! { + _ = stream_int.recv() => { + Ok(()) + } + _ = stream_term.recv() => { + Ok(()) + } + res = rx_upgrade => { + match res { + Ok(err) => Err(err), + Err(_) => unreachable!("A message is sent before the upgrade task is closed; qed"), + } + }, + res = fut => res, + } +} + +/// Runs until the RPC connection fails or updating the metadata failed. +async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender) { + let updater = client.updater(); + + let mut update_stream = match updater.runtime_updates().await { + Ok(u) => u, + Err(e) => { + let _ = tx.send(e.into()); + return; + }, + }; + + loop { + // if the runtime upgrade subscription fails then try establish a new one and if it fails quit. + let update = match update_stream.next().await { + Some(Ok(update)) => update, + _ => { + log::warn!(target: LOG_TARGET, "Runtime upgrade subscription failed"); + update_stream = match updater.runtime_updates().await { + Ok(u) => u, + Err(e) => { + let _ = tx.send(e.into()); + return; + }, + }; + continue; + }, + }; + + let version = update.runtime_version().spec_version; + match updater.apply_update(update) { + Ok(()) => { + if let Err(e) = epm::update_metadata_constants(&client) { + let _ = tx.send(e); + return; + } + //prometheus::on_runtime_upgrade(); + log::info!(target: LOG_TARGET, "upgrade to version: {} successful", version); + }, + Err(e) => { + log::debug!(target: LOG_TARGET, "upgrade to version: {} failed: {:?}", version, e); + }, + } + } +} diff --git a/multi-block/src/opts.rs b/multi-block/src/opts.rs new file mode 100644 index 000000000..9ef5c593f --- /dev/null +++ b/multi-block/src/opts.rs @@ -0,0 +1,101 @@ +use crate::error::Error; + +use serde::{Deserialize, Serialize}; +use sp_runtime::DeserializeOwned; +use std::{collections::HashMap, fmt, str::FromStr}; + +use subxt::backend::legacy::rpc_methods as subxt_rpc; + +/// The chain being used. +#[derive(Debug, Copy, Clone)] +pub enum Chain { + Westend, + Kusama, + Polkadot, + Rococo, +} + +impl fmt::Display for Chain { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let chain = match self { + Self::Polkadot => "polkadot", + Self::Kusama => "kusama", + Self::Westend => "westend", + Self::Rococo => "staking-dev", + }; + write!(f, "{}", chain) + } +} + +impl std::str::FromStr for Chain { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "polkadot" => Ok(Self::Polkadot), + "kusama" => Ok(Self::Kusama), + "westend" => Ok(Self::Westend), + "staking-dev" => Ok(Self::Rococo), + chain => Err(Error::InvalidChain(chain.to_string())), + } + } +} + +impl TryFrom for Chain { + type Error = Error; + + fn try_from(rv: subxt_rpc::RuntimeVersion) -> Result { + let json = rv + .other + .get("specName") + .expect("RuntimeVersion must have specName; qed") + .clone(); + let mut chain = + serde_json::from_value::(json).expect("specName must be String; qed"); + chain.make_ascii_lowercase(); + Chain::from_str(&chain) + } +} + +// This is infallible because all these field must exist on substrate-based chains +// and is based on +impl From for RuntimeVersion { + fn from(rv: subxt_rpc::RuntimeVersion) -> Self { + let mut spec_name: String = get_val_unchecked("specName", &rv.other); + let impl_name: String = get_val_unchecked("implName", &rv.other); + let impl_version: u32 = get_val_unchecked("implVersion", &rv.other); + let authoring_version: u32 = get_val_unchecked("authoringVersion", &rv.other); + let state_version: u8 = get_val_unchecked("stateVersion", &rv.other); + + let spec_version = rv.spec_version; + let transaction_version = rv.transaction_version; + + spec_name.make_ascii_lowercase(); + + Self { + spec_name, + impl_name, + impl_version, + spec_version, + transaction_version, + authoring_version, + state_version, + } + } +} + +#[derive(Deserialize, Serialize, PartialEq, Debug, Clone)] +pub struct RuntimeVersion { + pub spec_name: String, + pub impl_name: String, + pub spec_version: u32, + pub impl_version: u32, + pub authoring_version: u32, + pub transaction_version: u32, + pub state_version: u8, +} + +fn get_val_unchecked(val: &str, rv: &HashMap) -> T { + let json = rv.get(val).expect("`{val}` must exist; qed").clone(); + serde_json::from_value::(json).expect("T must be Deserialize; qed") +} diff --git a/multi-block/src/prelude.rs b/multi-block/src/prelude.rs new file mode 100644 index 000000000..6b1321fd2 --- /dev/null +++ b/multi-block/src/prelude.rs @@ -0,0 +1,51 @@ +use crate::static_types; + +use frame_election_provider_support::SequentialPhragmen; +use frame_support::BoundedVec; +use pallet_election_provider_multi_block::unsigned::miner; + +pub use subxt::{ext::sp_core, OnlineClient, PolkadotConfig}; + +pub type Config = subxt::PolkadotConfig; +pub type Hash = sp_core::H256; + +pub type RpcClient = subxt::backend::legacy::LegacyRpcMethods; +pub type ChainClient = subxt::OnlineClient; +pub type Storage = subxt::storage::Storage>; + +pub const DEFAULT_URI: &str = "ws://127.0.0.1:9944"; +pub const LOG_TARGET: &str = "polkadot-staking-miner-mb"; + +pub type AccountId = sp_runtime::AccountId32; + +pub type Solver = SequentialPhragmen; + +pub type TargetSnapshotPageOf = + BoundedVec::TargetSnapshotPerBlock>; +pub type VoterSnapshotPageOf = BoundedVec::VoterSnapshotPerBlock>; + +pub type Voter = frame_election_provider_support::Voter; + +pub type TargetSnapshotPage = BoundedVec; +pub type VoterSnapshotPage = BoundedVec; + +pub type Header = + subxt::config::substrate::SubstrateHeader; + +pub type Pair = sp_core::sr25519::Pair; + +// TODO: move under opts to expose to caller. +use sp_npos_elections::BalancingConfig; +frame_support::parameter_types! { + pub static BalanceIterations: usize = 10; + pub static Balancing: Option = Some( BalancingConfig { iterations: BalanceIterations::get(), tolerance: 0 }); +} + +#[subxt::subxt( + runtime_metadata_path = "metadata.scale", + derive_for_all_types = "Clone, Debug, Eq, PartialEq" +)] +pub mod runtime {} + +pub static SHARED_CLIENT: once_cell::sync::OnceCell = + once_cell::sync::OnceCell::new(); diff --git a/multi-block/src/signer.rs b/multi-block/src/signer.rs new file mode 100644 index 000000000..3033d8b1a --- /dev/null +++ b/multi-block/src/signer.rs @@ -0,0 +1,57 @@ +use crate::{error::Error, prelude::*}; +use sp_core::Pair as _; + +// A signer type, parameterized for using with `subxt`. +pub type PairSigner = subxt::tx::PairSigner; + +// Signer wrapper. +// +// NOTE: both `Pair` and `PairSigner` are stored here so it can be cloned +// which is hack around that PairSigner !Clone. +pub struct Signer { + pair: Pair, + signer: PairSigner, +} + +impl std::fmt::Display for Signer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.signer.account_id()) + } +} + +impl Clone for Signer { + fn clone(&self) -> Self { + Self { pair: self.pair.clone(), signer: PairSigner::new(self.pair.clone()) } + } +} + +impl Signer { + pub fn new(mut seed_or_path: &str) -> Result { + seed_or_path = seed_or_path.trim(); + + let seed = match std::fs::read(seed_or_path) { + Ok(s) => String::from_utf8(s).map_err(|e| Error::Other(e.to_string()))?, + Err(_) => seed_or_path.to_string(), + }; + + let seed = seed.trim(); + let pair = Pair::from_string(seed, None).map_err(Error::Crypto)?; + let signer = PairSigner::new(pair.clone()); + + Ok(Self { pair, signer }) + } +} + +impl std::ops::Deref for Signer { + type Target = PairSigner; + + fn deref(&self) -> &Self::Target { + &self.signer + } +} + +impl std::ops::DerefMut for Signer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.signer + } +} diff --git a/multi-block/src/static_types.rs b/multi-block/src/static_types.rs new file mode 100644 index 000000000..64b5f33a0 --- /dev/null +++ b/multi-block/src/static_types.rs @@ -0,0 +1,67 @@ +use frame_support::traits::ConstU32; + +macro_rules! impl_atomic_u32_parameter_types { + ($mod:ident, $name:ident) => { + mod $mod { + use std::sync::atomic::{AtomicU32, Ordering}; + static VAL: AtomicU32 = AtomicU32::new(0); + pub struct $name; + + impl $name { + pub fn get() -> u32 { + VAL.load(Ordering::SeqCst) + } + } + impl> frame_support::traits::Get for $name { + fn get() -> I { + I::from(Self::get()) + } + } + + impl $name { + pub fn set(val: u32) { + VAL.store(val, std::sync::atomic::Ordering::SeqCst); + } + } + } + pub use $mod::$name; + }; +} + +impl_atomic_u32_parameter_types!(pages, Pages); +impl_atomic_u32_parameter_types!(max_votes_per_voter, MaxVotesPerVoter); +impl_atomic_u32_parameter_types!(target_snapshot_per_block, TargetSnapshotPerBlock); +impl_atomic_u32_parameter_types!(voter_snapshot_per_block, VoterSnapshotPerBlock); +impl_atomic_u32_parameter_types!(max_winners_per_page, MaxWinnersPerPage); +impl_atomic_u32_parameter_types!(max_backers_per_winner, MaxBackersPerWinner); + +pub mod staking_dev { + use super::*; + use crate::prelude::*; + + frame_election_provider_support::generate_solution_type!( + #[compact] + pub struct NposSolution16::< + VoterIndex = u32, + TargetIndex = u16, + Accuracy = sp_runtime::PerU16, + MaxVoters = ConstU32::<22500> // TODO: fetch + >(16) + ); + + #[derive(Debug)] + pub struct MinerConfig; + impl pallet_election_provider_multi_block::unsigned::miner::Config for MinerConfig { + type AccountId = AccountId; + type Solution = NposSolution16; + type Solver = Solver; + type Pages = Pages; + type MaxVotesPerVoter = MaxVotesPerVoter; + type MaxWinnersPerPage = MaxWinnersPerPage; + type MaxBackersPerWinner = MaxBackersPerWinner; + type VoterSnapshotPerBlock = VoterSnapshotPerBlock; + type TargetSnapshotPerBlock = TargetSnapshotPerBlock; + type MaxWeight = (); // TODO + type MaxLength = (); // TODO + } +}