Skip to content

Commit

Permalink
feat(c-bridge): polish drs status metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
guidiaz committed Sep 23, 2024
1 parent cd2440c commit 7004d11
Showing 1 changed file with 69 additions and 46 deletions.
115 changes: 69 additions & 46 deletions bridges/centralized-ethereum/src/actors/watch_dog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use web3::{
contract::Contract,
transports::Http,
types::H160,
};
use web3::{contract::Contract, transports::Http, types::H160};
use witnet_net::client::tcp::{jsonrpc, JsonRpcClient};
use witnet_node::utils::stop_system_if_panicking;

Expand Down Expand Up @@ -39,6 +35,8 @@ pub struct WatchDog {
pub start_eth_balance: Option<f64>,
/// Wit balance upon last refund
pub start_wit_balance: Option<f64>,
/// Past data request cumulative counters:
pub drs_history: Option<(u64, u64)>,
}

impl Drop for WatchDog {
Expand All @@ -57,7 +55,13 @@ impl Actor for WatchDog {
fn started(&mut self, ctx: &mut Self::Context) {
log::debug!("WatchDog actor has been started!");

self.watch_global_status(None, None, ctx, Duration::from_millis(self.polling_rate_ms));
self.watch_global_status(
None,
None,
None,
ctx,
Duration::from_millis(self.polling_rate_ms),
);
}
}

Expand All @@ -71,7 +75,7 @@ enum WatchDogStatus {
WitDisconnect,
WitSyncing,
WitWaitingConsensus,
UpAndRunning
UpAndRunning,
}

impl WatchDogStatus {
Expand All @@ -84,7 +88,7 @@ impl WatchDogStatus {
WatchDogStatus::WitDisconnect => "wit-disconnect".to_string(),
WatchDogStatus::WitErrors => format!("wit-errors"),
WatchDogStatus::WitSyncing => "wit-syncing".to_string(),
WatchDogStatus::WitWaitingConsensus => "wit-waiting-consensus".to_string(),
WatchDogStatus::WitWaitingConsensus => "wit-waiting-consensus".to_string(),
WatchDogStatus::UpAndRunning => "up-and-running".to_string(),
}
}
Expand All @@ -98,7 +102,8 @@ impl WatchDog {
/// Initialize from config
pub fn from_config(config: &Config, eth_contract: Arc<Contract<Http>>) -> Self {
Self {
wit_client: JsonRpcClient::start(config.witnet_jsonrpc_socket.to_string().as_str()).ok(),
wit_client: JsonRpcClient::start(config.witnet_jsonrpc_socket.to_string().as_str())
.ok(),
wit_jsonrpc_socket: config.witnet_jsonrpc_socket.to_string(),
wit_utxo_min_value_threshold: config.witnet_utxo_min_value_threshold,
eth_account: config.eth_from,
Expand All @@ -108,13 +113,15 @@ impl WatchDog {
start_ts: Some(Instant::now()),
start_eth_balance: None,
start_wit_balance: None,
drs_history: None,
}
}

fn watch_global_status(
&mut self,
eth_balance: Option<f64>,
wit_balance: Option<f64>,
drs_history: Option<(u64, u64)>,
ctx: &mut Context<Self>,
period: Duration,
) {
Expand All @@ -127,6 +134,9 @@ impl WatchDog {
log::warn!("Wit account refunded to {} $WIT", wit_balance);
}
}
if self.drs_history.is_none() && drs_history.is_some() {
self.drs_history = drs_history;
}
let start_eth_balance = self.start_eth_balance;
let start_wit_balance = self.start_wit_balance;
let wit_client = self.wit_client.clone();
Expand All @@ -136,38 +146,50 @@ impl WatchDog {
let eth_account = self.eth_account;
let eth_contract_address = self.eth_contract.clone().unwrap().address();
let running_secs = self.start_ts.unwrap().elapsed().as_secs();
let mut drs_history = self.drs_history.unwrap_or_default();

let fut = async move {
let mut status = WatchDogStatus::UpAndRunning;

let dr_database = DrDatabase::from_registry();
let (_, drs_pending, drs_finished, _) =
let (drs_new, drs_pending, drs_finished, drs_dismissed) =
dr_database.send(CountDrsPerState).await.unwrap().unwrap();

let mut metrics: String = "{".to_string();
metrics.push_str(&format!("\"drsFinished\": {drs_finished}, "));
metrics.push_str(&format!("\"drsPending\": {drs_pending}, "));

metrics.push_str(&format!("\"drsCurrentlyPending\": {drs_pending}, "));

if drs_history != (0u64, 0u64) {
let last_reported = drs_finished - drs_history.0;
let last_dismissed = drs_dismissed - drs_history.1;

metrics.push_str(&format!("\"drsLastReported\": {last_reported}, "));
metrics.push_str(&format!("\"drsLastDismissed\": {last_dismissed}, "));
}
drs_history = (drs_finished, drs_dismissed);

let total_queries = drs_new + drs_pending + drs_finished + drs_dismissed;
metrics.push_str(&format!("\"drsTotalQueries\": {total_queries}, "));

metrics.push_str(&format!("\"evmAccount\": \"{eth_account}\", "));

if let Some(wit_client) = wit_client {
if let Err(err) = check_wit_connection_status(&wit_client).await {
status = err;
}

let (wit_account, wit_balance, wit_utxos_above_threshold) = match fetch_wit_info(
&wit_client,
wit_utxo_min_value_threshold
).await {
Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => {
(wit_account, wit_balance, wit_utxos_above_threshold)
}
Err(err) => {
if status == WatchDogStatus::UpAndRunning {
status = err;

let (wit_account, wit_balance, wit_utxos_above_threshold) =
match fetch_wit_info(&wit_client, wit_utxo_min_value_threshold).await {
Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => {
(wit_account, wit_balance, wit_utxos_above_threshold)
}
(None, None, None)
}
};
Err(err) => {
if status == WatchDogStatus::UpAndRunning {
status = err;
}
(None, None, None)
}
};

if wit_account.is_some() {
metrics.push_str(&format!("\"witAccount\": {:?}, ", wit_account.unwrap()));
Expand Down Expand Up @@ -216,26 +238,26 @@ impl WatchDog {
));
}
}

metrics.push_str(&format!("\"runningSecs\": {running_secs}, "));
metrics.push_str(&format!("\"status\": \"{}\"", status.to_string()));
metrics.push_str("}");

log::info!("{metrics}");
(eth_balance, wit_balance)

(eth_balance, wit_balance, Some(drs_history))
};

ctx.spawn(
fut.into_actor(self)
.then(move |(eth_balance, wit_balance), _act, ctx| {
// Schedule next iteration only when finished,
// as to avoid multiple tasks running in parallel
ctx.run_later(period, move |act, ctx| {
act.watch_global_status(eth_balance, wit_balance, ctx, period);
});
actix::fut::ready(())
}),
);
ctx.spawn(fut.into_actor(self).then(
move |(eth_balance, wit_balance, drs_history), _act, ctx| {
// Schedule next iteration only when finished,
// as to avoid multiple tasks running in parallel
ctx.run_later(period, move |act, ctx| {
act.watch_global_status(eth_balance, wit_balance, drs_history, ctx, period);
});
actix::fut::ready(())
},
));
}
}

Expand Down Expand Up @@ -263,13 +285,15 @@ async fn check_eth_account_balance(
},
Err(e) => {
log::debug!("check_eth_account_balance => {}", e);

Err(WatchDogStatus::EvmErrors)
}
}
}

async fn check_wit_connection_status(wit_client: &Addr<JsonRpcClient>) -> Result<(), WatchDogStatus> {
async fn check_wit_connection_status(
wit_client: &Addr<JsonRpcClient>,
) -> Result<(), WatchDogStatus> {
let req = jsonrpc::Request::method("syncStatus").timeout(Duration::from_secs(5));
let res = wit_client.send(req).await;
match res {
Expand All @@ -279,7 +303,7 @@ async fn check_wit_connection_status(wit_client: &Addr<JsonRpcClient>) -> Result
"Synced" => Ok(()),
"AlmostSynced" => Err(WatchDogStatus::WitAlmostSynced),
"WaitingConsensus" => Err(WatchDogStatus::WitWaitingConsensus),
_ => Err(WatchDogStatus::WitSyncing)
_ => Err(WatchDogStatus::WitSyncing),
}
} else {
log::debug!("check_wit_connection_status => unknown node_state");
Expand All @@ -297,7 +321,7 @@ async fn check_wit_connection_status(wit_client: &Addr<JsonRpcClient>) -> Result
}
}

async fn fetch_wit_info (
async fn fetch_wit_info(
wit_client: &Addr<JsonRpcClient>,
wit_utxos_min_threshold: u64,
) -> Result<(Option<String>, Option<f64>, Option<u64>), WatchDogStatus> {
Expand Down Expand Up @@ -328,7 +352,7 @@ async fn fetch_wit_info (
log::debug!("fetch_wit_info => {}", err);
return Err(WatchDogStatus::WitErrors);
}
};
};
match res {
Ok(value) => match value.get("total") {
Some(value) => match value.as_f64() {
Expand Down Expand Up @@ -385,7 +409,6 @@ async fn fetch_wit_info (
}
None => None,
};


Ok((wit_account, wit_account_balance, wit_utxos_above_threshold))
}

0 comments on commit 7004d11

Please sign in to comment.