Skip to content

Commit

Permalink
Merge pull request #24 from nazar-pc/better-node-farmer-status
Browse files Browse the repository at this point in the history
Better node farmer status
  • Loading branch information
nazar-pc authored Dec 9, 2023
2 parents 71d275c + b83e0ec commit a3d0d69
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 82 deletions.
50 changes: 31 additions & 19 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ pub enum LoadingStep {

#[derive(Debug, Clone)]
pub enum NodeNotification {
Syncing(SyncState),
Synced,
SyncStateUpdate(SyncState),
BlockImported { number: BlockNumber },
}

#[derive(Debug, Clone)]
pub enum FarmerNotification {
Plotting {
PlottingStateUpdate {
farm_index: usize,
state: PlottingState,
},
Plotted {
farm_index: usize,
PieceCacheSyncProgress {
/// Progress so far in %
progress: f32,
},
}

Expand Down Expand Up @@ -287,12 +287,8 @@ async fn run(
let _on_sync_state_change_handler_id = consensus_node.on_sync_state_change({
let notifications_sender = notifications_sender.clone();

Arc::new(move |maybe_sync_state| {
let notification = if let Some(sync_state) = maybe_sync_state {
NodeNotification::Syncing(*sync_state)
} else {
NodeNotification::Synced
};
Arc::new(move |&sync_state| {
let notification = NodeNotification::SyncStateUpdate(sync_state);

let mut notifications_sender = notifications_sender.clone();

Expand Down Expand Up @@ -331,14 +327,10 @@ async fn run(
let _on_plotting_state_change_handler_id = farmer.on_plotting_state_change({
let notifications_sender = notifications_sender.clone();

Arc::new(move |&farm_index, maybe_plotting_state| {
let notification = if let Some(plotting_state) = maybe_plotting_state {
FarmerNotification::Plotting {
farm_index,
state: *plotting_state,
}
} else {
FarmerNotification::Plotted { farm_index }
Arc::new(move |&farm_index, &plotting_state| {
let notification = FarmerNotification::PlottingStateUpdate {
farm_index,
state: plotting_state,
};

let mut notifications_sender = notifications_sender.clone();
Expand All @@ -355,6 +347,26 @@ async fn run(
}
})
});
let _on_piece_cache_sync_progress_handler_id = farmer.on_piece_cache_sync_progress({
let notifications_sender = notifications_sender.clone();

Arc::new(move |&progress| {
let notification = FarmerNotification::PieceCacheSyncProgress { progress };

let mut notifications_sender = notifications_sender.clone();

if let Err(error) = notifications_sender
.try_send(BackendNotification::Farmer(notification))
.or_else(|error| {
tokio::task::block_in_place(|| {
Handle::current().block_on(notifications_sender.send(error.into_inner()))
})
})
{
warn!(%error, "Failed to send piece cache sync progress backend notification");
}
})
});

let consensus_node_fut = pin!(consensus_node.run());
let farmer_fut = pin!(farmer.run());
Expand Down
47 changes: 34 additions & 13 deletions src/backend/farmer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub(super) mod maybe_node_client;

use crate::backend::farmer::maybe_node_client::MaybeNodeRpcClient;
use crate::backend::utils::{Handler2, Handler2Fn};
use crate::backend::utils::{Handler, Handler2, Handler2Fn, HandlerFn};
use crate::PosTable;
use anyhow::anyhow;
use atomic::Atomic;
Expand Down Expand Up @@ -45,18 +45,24 @@ pub enum PlottingKind {
Replotting,
}

#[derive(Debug, Copy, Clone, PartialEq)]
pub struct PlottingState {
pub kind: PlottingKind,
/// Progress so far in % (not including this sector)
pub progress: f32,
/// Plotting/replotting speed in sectors/s
pub speed: Option<f32>,
#[derive(Debug, Default, Copy, Clone, PartialEq)]
pub enum PlottingState {
#[default]
Unknown,
Plotting {
kind: PlottingKind,
/// Progress so far in % (not including this sector)
progress: f32,
/// Plotting/replotting speed in sectors/s
speed: Option<f32>,
},
Idle,
}

#[derive(Default, Debug)]
struct Handlers {
plotting_state_change: Handler2<usize, Option<PlottingState>>,
plotting_state_change: Handler2<usize, PlottingState>,
piece_cache_sync_progress: Handler<f32>,
}

pub(super) struct Farmer {
Expand Down Expand Up @@ -103,10 +109,14 @@ impl Farmer {

pub(super) fn on_plotting_state_change(
&self,
callback: Handler2Fn<usize, Option<PlottingState>>,
callback: Handler2Fn<usize, PlottingState>,
) -> HandlerId {
self.handlers.plotting_state_change.add(callback)
}

pub(super) fn on_piece_cache_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
self.handlers.piece_cache_sync_progress.add(callback)
}
}

impl fmt::Debug for Farmer {
Expand Down Expand Up @@ -348,6 +358,17 @@ pub(super) async fn create_farmer(farmer_options: FarmerOptions) -> anyhow::Resu
info!("Finished collecting already plotted pieces successfully");

let handlers = Arc::new(Handlers::default());

piece_cache
.on_sync_progress(Arc::new({
let handlers = Arc::clone(&handlers);

move |progress| {
handlers.piece_cache_sync_progress.call_simple(progress);
}
}))
.detach();

let mut single_disk_farms_stream = single_disk_farms
.into_iter()
.enumerate()
Expand All @@ -366,7 +387,7 @@ pub(super) async fn create_farmer(farmer_options: FarmerOptions) -> anyhow::Resu
let last_sector_plotted = Arc::clone(&last_sector_plotted);

move |plotting_details| {
let state = PlottingState {
let state = PlottingState::Plotting {
kind: if plotting_details.replotting {
PlottingKind::Replotting
} else {
Expand All @@ -378,7 +399,7 @@ pub(super) async fn create_farmer(farmer_options: FarmerOptions) -> anyhow::Resu

handlers
.plotting_state_change
.call_simple(&(disk_farm_index as usize), &Some(state));
.call_simple(&(disk_farm_index as usize), &state);

if plotting_details.last_queued {
last_sector_plotted
Expand Down Expand Up @@ -418,7 +439,7 @@ pub(super) async fn create_farmer(farmer_options: FarmerOptions) -> anyhow::Resu
{
handlers
.plotting_state_change
.call_simple(&(disk_farm_index as usize), &None);
.call_simple(&(disk_farm_index as usize), &PlottingState::Idle);
}
}
};
Expand Down
31 changes: 19 additions & 12 deletions src/backend/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,22 @@ pub enum SyncKind {
Regular,
}

#[derive(Debug, Copy, Clone, PartialEq)]
pub struct SyncState {
pub kind: SyncKind,
pub target: BlockNumber,
/// Sync speed in blocks/s
pub speed: Option<f32>,
#[derive(Debug, Default, Copy, Clone, PartialEq)]
pub enum SyncState {
#[default]
Unknown,
Syncing {
kind: SyncKind,
target: BlockNumber,
/// Sync speed in blocks/s
speed: Option<f32>,
},
Idle,
}

#[derive(Default, Debug)]
struct Handlers {
sync_state_change: Handler<Option<SyncState>>,
sync_state_change: Handler<SyncState>,
block_imported: Handler<BlockNumber>,
}

Expand Down Expand Up @@ -111,7 +116,7 @@ impl ConsensusNode {
let mut sync_status_interval = tokio::time::interval(SYNC_STATUS_EVENT_INTERVAL);
sync_status_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut last_sync_state = None;
let mut last_sync_state = SyncState::Unknown;
self.handlers
.sync_state_change
.call_simple(&last_sync_state);
Expand All @@ -121,7 +126,7 @@ impl ConsensusNode {

if let Ok(sync_status) = self.full_node.sync_service.status().await {
let sync_state = if sync_status.state.is_major_syncing() {
Some(SyncState {
SyncState::Syncing {
kind: match self.sync_mode.load(Ordering::Acquire) {
SyncMode::Paused => {
// We are pausing Substrate's sync during sync from DNS
Expand All @@ -132,9 +137,11 @@ impl ConsensusNode {
target: sync_status.best_seen_block.unwrap_or_default(),
// TODO: Sync speed
speed: None,
})
}
} else if sync_status.num_connected_peers > 0 {
SyncState::Idle
} else {
None
SyncState::Unknown
};

if sync_state != last_sync_state {
Expand Down Expand Up @@ -165,7 +172,7 @@ impl ConsensusNode {
self.full_node.client.info().best_number
}

pub(super) fn on_sync_state_change(&self, callback: HandlerFn<Option<SyncState>>) -> HandlerId {
pub(super) fn on_sync_state_change(&self, callback: HandlerFn<SyncState>) -> HandlerId {
self.handlers.sync_state_change.add(callback)
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub struct ConfigurationView {

#[relm4::component(pub)]
impl Component for ConfigurationView {
type Init = ();
type Init = gtk::Window;
type Input = ConfigurationInput;
type Output = ConfigurationOutput;
type CommandOutput = ();
Expand Down Expand Up @@ -343,12 +343,12 @@ impl Component for ConfigurationView {
}

fn init(
_init: Self::Init,
parent_root: Self::Init,
root: &Self::Root,
sender: ComponentSender<Self>,
) -> ComponentParts<Self> {
let open_dialog = OpenDialog::builder()
.transient_for_native(root)
.transient_for_native(&parent_root)
.launch(OpenDialogSettings {
folder_mode: true,
accept_label: "Select".to_string(),
Expand Down
Loading

0 comments on commit a3d0d69

Please sign in to comment.