Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: monitor command for the multi-block staking miner #910

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
/target/
/multi-block/target/

# These are backup files generated by rustfmt
**/*.rs.bk
Expand Down
36 changes: 36 additions & 0 deletions multi-block/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "multi-block"
version = "0.1.0"
edition = "2021"

[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0" }
scale-info = { package = "scale-info", version = "2.11.2" }
thiserror = "1.0"
tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "sync", "signal"] }
clap = { version = "4.5", features = ["derive", "env"] }
jsonrpsee = { version = "0.22.3", features = ["ws-client"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
serde = "1.0"
serde_json = "1.0"
futures = "0.3"
log = "0.4"
once_cell = "1.20"
hyper = { version = "0.14.30", features = ["server", "http1", "http2", "tcp"] }

# subxt
scale-value = "0.16.3"
subxt = { version = "0.37.0", features = ["substrate-compat"] }

# polkadot-sdk
#frame-election-provider-support = "37.0.0"
#pallet-election-provider-multi-block = "4.0.0-dev" # uncomment once pallet is published
#sp-npos-elections = "34.0.0"
#frame-support = "37.0.0"
#sp-runtime = "39.0.1"

frame-election-provider-support = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/frame/election-provider-support" }
pallet-election-provider-multi-block = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/frame/election-provider-multi-block" }
sp-npos-elections = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/primitives/npos-elections" }
frame-support = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/frame/support" }
sp-runtime = { path = "/Users/gpestana/parity/polkadot-sdk/substrate/primitives/runtime" }
Binary file added multi-block/metadata.scale
Binary file not shown.
51 changes: 51 additions & 0 deletions multi-block/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use crate::prelude::*;
use jsonrpsee::ws_client::WsClientBuilder;
use subxt::backend::rpc::RpcClient as RawRpcClient;

/// Wraps the subxt interfaces to make it easy to use for the staking-miner.
#[derive(Clone, Debug)]
pub struct Client {
/// Access to typed rpc calls from subxt.
rpc: RpcClient,
/// Access to chain APIs such as storage, events etc.
chain_api: ChainClient,
}

impl Client {
pub async fn new(uri: &str) -> Result<Self, subxt::Error> {
log::debug!(target: LOG_TARGET, "attempting to connect to {:?}", uri);

let rpc = loop {
match WsClientBuilder::default()
.max_request_size(u32::MAX)
.max_response_size(u32::MAX)
.request_timeout(std::time::Duration::from_secs(600))
.build(&uri)
.await
{
Ok(rpc) => break RawRpcClient::new(rpc),
Err(e) => {
log::warn!(
target: LOG_TARGET,
"failed to connect to client due to {:?}, retrying soon..",
e,
);
},
};
tokio::time::sleep(std::time::Duration::from_millis(2_500)).await;
};

let chain_api = ChainClient::from_rpc_client(rpc.clone()).await?;
Ok(Self { rpc: RpcClient::new(rpc), chain_api })
}

/// Get a reference to the RPC interface exposed by subxt.
pub fn rpc(&self) -> &RpcClient {
&self.rpc
}

/// Get a reference to the chain API.
pub fn chain_api(&self) -> &ChainClient {
&self.chain_api
}
}
3 changes: 3 additions & 0 deletions multi-block/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod monitor;

pub use monitor::monitor_cmd;
245 changes: 245 additions & 0 deletions multi-block/src/commands/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
use crate::{client::Client, error::Error, prelude::*, signer::Signer};

use crate::{
epm, helpers, runtime::runtime_types::pallet_election_provider_multi_block::types::*,
static_types,
};

use pallet_election_provider_multi_block::unsigned::miner;

use clap::Parser;
use std::sync::Arc;
use subxt::backend::rpc::RpcSubscription;
use tokio::sync::Mutex;

/// Represents the current task to be performed in the election round.
enum Task {
/// Do nothing.
Nothing,
/// A page of the target snapshot has been fetched, cache it locally.
CacheTargetSnapshot(TargetSnapshotPage),
/// A page of the voter snapshot has been fetched, cache it locally.
CacheVoterSnapshot(VoterSnapshotPage),
/// Next task is to compute the election.
ComputeElectionResult,
}

#[derive(Debug, Clone, Parser, PartialEq)]
pub struct MonitorConfig {
#[clap(long)]
pub at: Option<Hash>,

#[clap(long, short, env = "SEED")]
pub seed_or_path: String,

#[clap(long, value_enum, default_value_t = Listen::Head)]
pub listen: Listen,
}

#[derive(clap::ValueEnum, Debug, Copy, Clone, PartialEq)]
pub enum Listen {
Finalized,
Head,
}

/// Monitors the current on-chain phase and performs the relevant actions.
///
/// Steps:
/// 1. In `Phase::Snapshot(page)`, fetch the pages of the target and voter snapshot.
/// 2. In `Phase::Signed`:
/// 2.1. Compute the solution
/// 2.2. Register the solution with the *full* election score of the submission
/// 2.3. Submit each page separately (1 per block).
///
/// If by the time of computing the solution the snapshot pages have not been fetched and cached,
/// the snapshots will be fetched before computing the solution.
pub async fn monitor_cmd<T>(client: Client, config: MonitorConfig) -> Result<(), Error>
where
T: miner::Config<
AccountId = AccountId,
MaxVotesPerVoter = static_types::MaxVotesPerVoter,
TargetSnapshotPerBlock = static_types::TargetSnapshotPerBlock,
VoterSnapshotPerBlock = static_types::VoterSnapshotPerBlock,
Pages = static_types::Pages,
> + Send
+ Sync
+ 'static,
T::Solution: Send,
{
let signer = Signer::new(&config.seed_or_path)?;
let account_info = {
let addr = runtime::storage().system().account(signer.account_id());
client
.chain_api()
.storage()
.at_latest()
.await?
.fetch(&addr)
.await?
.ok_or(Error::AccountDoesNotExists)?
};

log::info!(target: LOG_TARGET, "Loaded account {} {{ nonce: {}, free_balance: {}, reserved_balance: {}, frozen_balance: {} }}",
signer,
account_info.nonce,
account_info.data.free,
account_info.data.reserved,
account_info.data.frozen,
);

let mut subscription = heads_subscription(client.rpc(), config.listen).await?;
let (_tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Error>();
let _submit_lock = Arc::new(Mutex::new(()));

let mut target_snapshot: TargetSnapshotPage = Default::default();
let mut voter_snapshot_paged: Vec<VoterSnapshotPage> = Default::default();

let n_pages = static_types::Pages::get();
let mut last_round_submitted = None;

loop {
let at = tokio::select! {
maybe_rp = subscription.next() => {
match maybe_rp {
Some(Ok(r)) => r,
Some(Err(e)) => {
log::error!(target: LOG_TARGET, "subscription failed to decode Header {:?}, this is bug please file an issue", e);
return Err(e.into());
}
// The subscription was dropped, should only happen if:
// - the connection was closed.
// - the subscription could not keep up with the server.
None => {
log::warn!(target: LOG_TARGET, "subscription to `{:?}` terminated. Retrying..", config.listen);
subscription = heads_subscription(client.rpc(), config.listen).await?;
continue
}
}
},
maybe_err = rx.recv() => {
match maybe_err {
Some(err) => return Err(err),
None => unreachable!("at least one sender kept in the main loop should always return Some; qed"),
}
}
};

let storage = helpers::storage_at(config.at, client.chain_api()).await?;
let storage2 = storage.clone();
let phase = storage
.fetch_or_default(&runtime::storage().election_provider_multi_block().current_phase())
.await?;
let round = storage
.fetch_or_default(&runtime::storage().election_provider_multi_block().round())
.await?;

let result = tokio::spawn(async move {
match phase {
Phase::Off => {
log::trace!(target: LOG_TARGET, "Phase::Off, do nothing.");
Task::Nothing
},

Phase::Snapshot(page) =>
if page == n_pages {
epm::target_snapshot(&storage)
.await
.map(|t| Task::CacheTargetSnapshot(t))
.unwrap_or(Task::Nothing)
} else {
epm::paged_voter_snapshot(page, &storage)
.await
.map(|v| Task::CacheVoterSnapshot(v))
.unwrap_or(Task::Nothing)
},
Phase::Signed => {
log::trace!(target: LOG_TARGET, "Phase::Signed",);
Task::ComputeElectionResult
},
_ => {
log::trace!(target: LOG_TARGET, "{:?}, do nothing.", phase);
Task::Nothing
},
}
});

match result.await {
Ok(Task::CacheTargetSnapshot(s)) => {
target_snapshot = s;
},
Ok(Task::CacheVoterSnapshot(p)) => {
// TODO: page from msp -> lsp, prepend p instead of push().
voter_snapshot_paged.push(p);
},
Ok(Task::ComputeElectionResult) => {
if last_round_submitted == Some(round) {
// skip minig again, everything submitted.
log::trace!(
target: LOG_TARGET,
"Solution successfully submitted for round {}, do nothing.",
round
);
continue
}

if !target_snapshot.is_empty() && voter_snapshot_paged.len() == n_pages as usize {
// all pages in cache, compute election.
match epm::mine_and_submit::<T>(
&at,
&client,
signer.clone(),
config.listen,
&target_snapshot,
&voter_snapshot_paged,
n_pages,
round,
)
.await
{
Ok(_) => last_round_submitted = Some(round),
Err(err) => {
log::error!("mine_and_submit: {:?}", err);
}, // continue trying.
}
} else {
// TODO: check if there are already *some* pageed cached and fetch only missing
// ones.
match epm::fetch_mine_and_submit::<T>(
&at,
&client,
signer.clone(),
config.listen,
&storage2,
n_pages,
round,
)
.await
{
Ok(_) => last_round_submitted = Some(round),
Err(err) => {
log::error!("fetch_mine_and_submit: {:?}", err);
}, // continue trying.
}
log::trace!(target: LOG_TARGET, "not all snapshots in cache, fetch all and compute.");
}
},
Ok(Task::Nothing) => {
// reset cached snapshot.
target_snapshot = Default::default();
voter_snapshot_paged = Default::default();
},
Err(e) => log::error!(target: LOG_TARGET, "ERROR: {:?}", e),
}
}
}

async fn heads_subscription(
rpc: &RpcClient,
listen: Listen,
) -> Result<RpcSubscription<Header>, Error> {
match listen {
Listen::Head => rpc.chain_subscribe_new_heads().await,
Listen::Finalized => rpc.chain_subscribe_finalized_heads().await,
}
.map_err(Into::into)
}
Loading
Loading