From 35f10a39f02070468bf236c79f28840623be7133 Mon Sep 17 00:00:00 2001 From: Kiril Mihaylov <80464733+KirilMihaylov@users.noreply.github.com> Date: Wed, 21 Aug 2024 15:55:09 +0300 Subject: [PATCH] refactor(chain-ops): Abstract away built-in task implementations via dependency injection. --- chain-ops/src/run.rs | 40 ++- chain-ops/src/supervisor/configuration.rs | 132 ++++---- chain-ops/src/supervisor/mod.rs | 297 +++++++----------- chain-ops/src/task/application_defined.rs | 18 +- chain-ops/src/task/balance_reporter.rs | 30 +- chain-ops/src/task/broadcast.rs | 34 +- chain-ops/src/task/mod.rs | 98 ++++-- chain-ops/src/task/protocol_watcher.rs | 42 ++- services/alarms-dispatcher/src/task/mod.rs | 72 +++-- services/market-data-feeder/src/task/id.rs | 46 +-- services/market-data-feeder/src/task/mod.rs | 6 +- .../market-data-feeder/src/task/provider.rs | 4 +- 12 files changed, 465 insertions(+), 354 deletions(-) diff --git a/chain-ops/src/run.rs b/chain-ops/src/run.rs index 7dd70c9..a329c7d 100644 --- a/chain-ops/src/run.rs +++ b/chain-ops/src/run.rs @@ -6,10 +6,13 @@ use crate::{ log, service::{self, ShutdownResult}, supervisor::{ + self, configuration::{self, Configuration}, - Supervisor, }, - task::application_defined::{Id, TaskCreationContext}, + task::{ + application_defined, balance_reporter::BalanceReporter, + broadcast::Broadcast, protocol_watcher::ProtocolWatcher, + }, }; #[inline] @@ -28,17 +31,18 @@ pub async fn run< where LogsDirectory: AsRef, TaskCreationContextCtor: FnOnce() -> Result< - TaskCreationContext<::Task>, + ::TaskCreationContext, >, StartupTasksFunctor: FnOnce() -> StartupTasksIter, StartupTasksIter: Iterator + Send + 'static, - StartupTasksIter::Item: Id + Unpin, + StartupTasksIter::Item: application_defined::Id + Unpin, { log::init(logs_directory).context("Failed to initialize logging!")?; - let configuration = configuration::Static::read_from_env() - .await - .context("Failed to read service configuration!")?; + let service_configuration = + configuration::Service::read_from_env() + .await + .context("Failed to read service configuration!")?; let task_creation_context = task_creation_context() .context("Failed to construct task creation context!")?; @@ -46,13 +50,10 @@ where let startup_tasks = startup_tasks(); service::run(move |task_spawner, task_result_rx| async move { - Supervisor::new( - Configuration::<::Task>::new( - configuration, - task_spawner, - task_result_rx, - task_creation_context, - ), + Supervisor::::new( + Configuration::new(service_configuration, task_creation_context), + task_spawner, + task_result_rx, application_name, application_version, startup_tasks, @@ -72,3 +73,14 @@ where ShutdownResult::StopSignalReceived => Ok(()), }) } + +type Supervisor = supervisor::Supervisor< + BalanceReporter, + Broadcast>, + ProtocolWatcher, + Task, +>; + +type TxExpiration = as application_defined::Task>::TxExpiration; + +type Task = ::Task; diff --git a/chain-ops/src/supervisor/configuration.rs b/chain-ops/src/supervisor/configuration.rs index f4ed384..23cc105 100644 --- a/chain-ops/src/supervisor/configuration.rs +++ b/chain-ops/src/supervisor/configuration.rs @@ -1,21 +1,46 @@ -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use anyhow::{Context as _, Error, Result}; use zeroize::Zeroizing; use crate::{ + contract, env::ReadFromVar, key, node, - service::{task_spawner::TaskSpawner, TaskResultsReceiver}, signer::{GasAndFeeConfiguration, Signer}, - task::{self, application_defined}, + task::application_defined, }; #[must_use] -pub struct Static { +pub struct Configuration +where + Id: application_defined::Id, +{ + pub(super) service_configuration: Id::ServiceConfiguration, + pub(super) task_creation_context: Id::TaskCreationContext, +} + +impl Configuration +where + Id: application_defined::Id, +{ + #[inline] + pub fn new( + service_configuration: Id::ServiceConfiguration, + task_creation_context: Id::TaskCreationContext, + ) -> Self { + Self { + service_configuration, + task_creation_context, + } + } +} + +#[must_use] +pub struct Service { node_client: node::Client, signer: Signer, - admin_contract_address: Arc, + admin_contract: contract::Admin, idle_duration: Duration, timeout_duration: Duration, balance_reporter_idle_duration: Duration, @@ -23,7 +48,7 @@ pub struct Static { broadcast_retry_delay_duration: Duration, } -impl Static { +impl Service { pub async fn read_from_env() -> Result { let node_client = node::Client::connect(&Self::read_node_grpc_uri()?) .await @@ -37,8 +62,10 @@ impl Static { ) .await?; - let admin_contract_address = - Self::read_admin_contract_address()?.into(); + let admin_contract = contract::Admin::new( + node_client.clone().query_wasm(), + Self::read_admin_contract_address()?.into(), + ); let idle_duration = Self::read_idle_duration()?; @@ -55,7 +82,7 @@ impl Static { Ok(Self { node_client, signer, - admin_contract_address, + admin_contract, idle_duration, timeout_duration, balance_reporter_idle_duration, @@ -64,6 +91,38 @@ impl Static { }) } + pub fn node_client(&self) -> &node::Client { + &self.node_client + } + + pub fn signer(&self) -> &Signer { + &self.signer + } + + pub fn admin_contract(&self) -> &contract::Admin { + &self.admin_contract + } + + pub fn idle_duration(&self) -> Duration { + self.idle_duration + } + + pub fn timeout_duration(&self) -> Duration { + self.timeout_duration + } + + pub fn balance_reporter_idle_duration(&self) -> Duration { + self.balance_reporter_idle_duration + } + + pub fn broadcast_delay_duration(&self) -> Duration { + self.broadcast_delay_duration + } + + pub fn broadcast_retry_delay_duration(&self) -> Duration { + self.broadcast_retry_delay_duration + } + fn read_node_grpc_uri() -> Result { String::read_from_var("NODE_GRPC_URI") .context("Failed to read node's gRPC URI!") @@ -125,58 +184,3 @@ impl Static { .context("Failed to read between broadcast retries delay period duration!") } } - -#[must_use] -pub struct Configuration -where - T: application_defined::Task, -{ - pub(super) node_client: node::Client, - pub(super) signer: Signer, - pub(super) admin_contract_address: Arc, - pub(super) task_spawner: TaskSpawner, Result<()>>, - pub(super) task_result_rx: TaskResultsReceiver, Result<()>>, - pub(super) idle_duration: Duration, - pub(super) timeout_duration: Duration, - pub(super) balance_reporter_idle_duration: Duration, - pub(super) broadcast_delay_duration: Duration, - pub(super) broadcast_retry_delay_duration: Duration, - pub(super) task_creation_context: - application_defined::TaskCreationContext, -} - -impl Configuration -where - T: application_defined::Task, -{ - pub fn new( - Static { - node_client, - signer, - admin_contract_address, - idle_duration, - timeout_duration, - balance_reporter_idle_duration, - broadcast_delay_duration, - broadcast_retry_delay_duration, - }: Static, - task_spawner: TaskSpawner, Result<()>>, - task_result_rx: TaskResultsReceiver, Result<()>>, - task_creation_context: - ::TaskCreationContext, - ) -> Self { - Self { - node_client, - signer, - admin_contract_address, - task_spawner, - task_result_rx, - idle_duration, - timeout_duration, - balance_reporter_idle_duration, - broadcast_delay_duration, - broadcast_retry_delay_duration, - task_creation_context, - } - } -} diff --git a/chain-ops/src/supervisor/mod.rs b/chain-ops/src/supervisor/mod.rs index 80b9834..02f2aa3 100644 --- a/chain-ops/src/supervisor/mod.rs +++ b/chain-ops/src/supervisor/mod.rs @@ -2,6 +2,7 @@ use std::{ collections::{btree_map::Entry as BTreeMapEntry, BTreeMap, VecDeque}, convert::identity, future::pending, + marker::PhantomData, time::Duration, }; @@ -12,19 +13,13 @@ use tokio::{ }; use crate::{ - channel::{bounded, unbounded, Channel as _}, - contract::Admin as AdminContract, - node, + channel::{self, Channel as _}, service::{task_spawner::TaskSpawner, TaskResult, TaskResultsReceiver}, - signer::Signer, task::{ self, application_defined::{self, Id as _}, - balance_reporter::BalanceReporter, - broadcast::Broadcast, - protocol_watcher::{ - Command as ProtocolWatcherCommand, ProtocolWatcher, - }, + protocol_watcher::Command as ProtocolWatcherCommand, + BalanceReporterTrait, BroadcastTrait, ProtocolWatcherTrait, State as TaskState, Task, TxPackage, }, }; @@ -45,83 +40,94 @@ macro_rules! log { pub mod log; #[must_use] -pub struct Supervisor -where - T: application_defined::Task, +pub struct Supervisor< + BalanceReporter, + Broadcast, + ProtocolWatcher, + ApplicationDefined, +> where + BalanceReporter: BalanceReporterTrait, + Broadcast: BroadcastTrait< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + ProtocolWatcher: ProtocolWatcherTrait< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + ApplicationDefined: application_defined::Task< + TxExpiration = Broadcast::TxExpiration, + Id: application_defined::Id< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + >, { - node_client: node::Client, - signer: Signer, - admin_contract: AdminContract, - task_spawner: TaskSpawner, Result<()>>, - task_result_rx: TaskResultsReceiver, Result<()>>, - task_states: BTreeMap, TaskState>, - restart_queue: VecDeque<(Instant, task::Id)>, - transaction_tx: unbounded::Sender>, - protocol_watcher_rx: bounded::Receiver, - idle_duration: Duration, - timeout_duration: Duration, - balance_reporter_idle_duration: Duration, - broadcast_delay_duration: Duration, - broadcast_retry_delay_duration: Duration, - task_creation_context: application_defined::TaskCreationContext, + configuration: Configuration, + task_spawner: TaskSpawner, Result<()>>, + task_result_rx: + TaskResultsReceiver, Result<()>>, + task_states: BTreeMap, TaskState>, + restart_queue: VecDeque<(Instant, task::Id)>, + transaction_tx: + channel::unbounded::Sender>, + protocol_watcher_rx: channel::bounded::Receiver, + _balance_reporter: PhantomData, + _broadcast: PhantomData, + _protocol_watcher: PhantomData, } -impl Supervisor +impl + Supervisor where - T: application_defined::Task, + BalanceReporter: BalanceReporterTrait, + Broadcast: BroadcastTrait< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + ProtocolWatcher: ProtocolWatcherTrait< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + ApplicationDefined: application_defined::Task< + TxExpiration = Broadcast::TxExpiration, + Id: application_defined::Id< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + >, { pub async fn new( - Configuration { - node_client, - signer, - admin_contract_address, - task_spawner, - task_result_rx, - idle_duration, - timeout_duration, - balance_reporter_idle_duration, - broadcast_delay_duration, - broadcast_retry_delay_duration, - task_creation_context, - }: Configuration, + configuration: Configuration, + task_spawner: TaskSpawner, Result<()>>, + task_result_rx: TaskResultsReceiver< + task::Id, + Result<()>, + >, application: &'static str, version: &'static str, tasks: U, ) -> Result where - U: Iterator, + U: Iterator, { log!(info!( %application, %version, - sender_address = %signer.address(), "Starting up supervisor.", )); - let (transaction_tx, transaction_rx) = unbounded::Channel::new(); + let (transaction_tx, transaction_rx) = + channel::unbounded::Channel::new(); let (protocol_watcher_tx, protocol_watcher_rx) = - bounded::Channel::new(); + channel::bounded::Channel::new(); let mut supervisor = Self { - node_client: node_client.clone(), - signer, - admin_contract: AdminContract::new( - node_client.query_wasm(), - admin_contract_address.clone(), - ), + configuration, task_spawner, task_result_rx, task_states: BTreeMap::new(), restart_queue: VecDeque::new(), transaction_tx, protocol_watcher_rx, - idle_duration, - timeout_duration, - balance_reporter_idle_duration, - broadcast_delay_duration, - broadcast_retry_delay_duration, - task_creation_context, + _balance_reporter: PhantomData, + _broadcast: PhantomData, + _protocol_watcher: PhantomData, }; log!(info!("Starting worker tasks.")); @@ -136,6 +142,9 @@ where #[inline] pub async fn run(mut self) -> Result<()> { + const TASK_RESULTS_CHANNEL_CLOSED_ERROR: &str = + "Task results channel closed unexpectedly!"; + log!(info!("Running.")); loop { @@ -143,7 +152,7 @@ where biased; task_result = self.task_result_rx.recv() => { let result = - Self::handle_task_result_channel_output(task_result)?; + task_result.context(TASK_RESULTS_CHANNEL_CLOSED_ERROR)?; self.handle_task_result_and_restart(result).await }, @@ -166,29 +175,46 @@ where async fn start_tasks( &mut self, - transaction_rx: unbounded::Receiver>, - protocol_watcher_tx: bounded::Sender, + transaction_rx: channel::unbounded::Receiver< + TxPackage, + >, + protocol_watcher_tx: channel::bounded::Sender, tasks: U, ) -> Result<()> where - U: Iterator, + U: Iterator, { - Task::::BalanceReporter(self.create_balance_reporter_task()) + Task::< + BalanceReporter, + Broadcast, + ProtocolWatcher, + ApplicationDefined, + >::BalanceReporter(self.create_balance_reporter_task()) .run(&self.task_spawner, &mut self.task_states) .await .context("Failed to start balance reporter task!")?; - Task::::Broadcast(self.create_broadcast_task_with(transaction_rx)) + Task::< + BalanceReporter, + Broadcast, + ProtocolWatcher, + ApplicationDefined, + >::Broadcast(self.create_broadcast_task_with(transaction_rx)) .run(&self.task_spawner, &mut self.task_states) .await .context("Failed to start broadcaster task!")?; - Task::::ProtocolWatcher( + Task::< + BalanceReporter, + Broadcast, + ProtocolWatcher, + ApplicationDefined, + >::ProtocolWatcher( self.create_protocol_watcher_task_with(protocol_watcher_tx), ) - .run(&self.task_spawner, &mut self.task_states) - .await - .context("Failed to start protocol watcher task!")?; + .run(&self.task_spawner, &mut self.task_states) + .await + .context("Failed to start protocol watcher task!")?; for task_id in tasks { self.run_task(task::Id::ApplicationDefined(task_id)) @@ -199,18 +225,9 @@ where Ok(()) } - fn handle_task_result_channel_output( - task_result: Option, Result<()>>>, - ) -> Result, Result<()>>> { - const TASK_RESULTS_CHANNEL_CLOSED_ERROR: &str = - "Task results channel closed unexpectedly!"; - - task_result.context(TASK_RESULTS_CHANNEL_CLOSED_ERROR) - } - async fn handle_task_result_and_restart( &mut self, - task_result: TaskResult, Result<()>>, + task_result: TaskResult, Result<()>>, ) -> Result<()> { const MAX_CONSEQUENT_RETRIES: u8 = 2; @@ -241,7 +258,10 @@ where } } - async fn run_task(&mut self, task_id: task::Id) -> Result<()> { + async fn run_task( + &mut self, + task_id: task::Id, + ) -> Result<()> { let result = match task_id.clone() { task::Id::BalanceReporter => { Ok(Task::BalanceReporter(self.create_balance_reporter_task())) @@ -253,15 +273,11 @@ where Ok(Task::ProtocolWatcher(self.create_protocol_watcher_task())) }, task::Id::ApplicationDefined(id) => id - .into_task(TaskCreationContext { - node_client: &mut self.node_client, - signer_address: self.signer.address(), - admin_contract: &mut self.admin_contract, - transaction_tx: &self.transaction_tx, - idle_duration: self.idle_duration, - timeout_duration: self.timeout_duration, - application_defined: &mut self.task_creation_context, - }) + .into_task( + &mut self.configuration.service_configuration, + &mut self.configuration.task_creation_context, + &self.transaction_tx, + ) .await .map(Task::ApplicationDefined), }; @@ -284,34 +300,29 @@ where } #[inline] - fn create_balance_reporter_task(&mut self) -> BalanceReporter { - BalanceReporter::new( - self.node_client.clone().query_bank(), - self.signer.address().into(), - self.signer.fee_token().into(), - self.balance_reporter_idle_duration, - ) + fn create_balance_reporter_task(&self) -> BalanceReporter { + BalanceReporter::new(&self.configuration.service_configuration) } #[inline] - fn create_broadcast_task(&mut self) -> Broadcast { + fn create_broadcast_task(&mut self) -> Broadcast { let transaction_rx; - (self.transaction_tx, transaction_rx) = unbounded::Channel::new(); + (self.transaction_tx, transaction_rx) = + channel::unbounded::Channel::new(); self.create_broadcast_task_with(transaction_rx) } fn create_broadcast_task_with( &self, - transaction_rx: unbounded::Receiver>, - ) -> Broadcast { + transaction_rx: channel::unbounded::Receiver< + TxPackage, + >, + ) -> Broadcast { Broadcast::new( - self.node_client.clone().broadcast_tx(), - self.signer.clone(), + &self.configuration.service_configuration, transaction_rx, - self.broadcast_delay_duration, - self.broadcast_retry_delay_duration, ) } @@ -320,34 +331,25 @@ where let protocol_watcher_tx; (protocol_watcher_tx, self.protocol_watcher_rx) = - bounded::Channel::new(); + channel::bounded::Channel::new(); self.create_protocol_watcher_task_with(protocol_watcher_tx) } fn create_protocol_watcher_task_with( &self, - protocol_watcher_tx: bounded::Sender, + protocol_watcher_tx: channel::bounded::Sender, ) -> ProtocolWatcher { ProtocolWatcher::new( - self.admin_contract.clone(), - self.task_states - .keys() - .filter_map(|id| { - if let task::Id::ApplicationDefined(id) = id { - id.protocol().cloned() - } else { - None - } - }) - .collect(), + &self.configuration.service_configuration, + &self.task_states, protocol_watcher_tx, ) } fn place_on_restart_queue( &mut self, - task_id: task::Id, + task_id: task::Id, ) -> Result<()> { log!(warn!( task = %task_id.name(), @@ -370,7 +372,7 @@ where async fn handle_task_result( &mut self, - task_result: TaskResult, Result<()>>, + task_result: TaskResult, Result<()>>, ) -> Result<()> { match task_result { TaskResult { @@ -447,7 +449,7 @@ where ) -> Result<()> { match protocol_command { ProtocolWatcherCommand::ProtocolAdded(protocol) => { - for id in T::protocol_task_set_ids(protocol) { + for id in ApplicationDefined::protocol_task_set_ids(protocol) { self.run_task(task::Id::ApplicationDefined(id)).await?; } }, @@ -489,63 +491,6 @@ where } } -#[must_use] -pub struct TaskCreationContext<'r, T> -where - T: application_defined::Task, -{ - node_client: &'r mut node::Client, - signer_address: &'r str, - admin_contract: &'r mut AdminContract, - transaction_tx: &'r unbounded::Sender>, - idle_duration: Duration, - timeout_duration: Duration, - application_defined: &'r mut application_defined::TaskCreationContext, -} - -impl<'r, T> TaskCreationContext<'r, T> -where - T: application_defined::Task, -{ - #[must_use] - pub fn node_client(&mut self) -> &mut node::Client { - self.node_client - } - - #[must_use] - pub const fn signer_address(&self) -> &str { - self.signer_address - } - - pub fn admin_contract(&mut self) -> &mut AdminContract { - self.admin_contract - } - - #[must_use] - pub const fn transaction_tx( - &self, - ) -> &unbounded::Sender> { - self.transaction_tx - } - - #[must_use] - pub const fn idle_duration(&self) -> Duration { - self.idle_duration - } - - #[must_use] - pub const fn timeout_duration(&self) -> Duration { - self.timeout_duration - } - - #[must_use] - pub fn application_defined( - &mut self, - ) -> &mut application_defined::TaskCreationContext { - self.application_defined - } -} - #[cold] #[inline] fn cold() {} diff --git a/chain-ops/src/task/application_defined.rs b/chain-ops/src/task/application_defined.rs index 3f5b56a..e0ef3d9 100644 --- a/chain-ops/src/task/application_defined.rs +++ b/chain-ops/src/task/application_defined.rs @@ -2,9 +2,9 @@ use std::{borrow::Cow, fmt::Debug, future::Future, sync::Arc}; use anyhow::Result; -use crate::supervisor; +use crate::channel; -use super::{Runnable, TxExpiration}; +use super::{Runnable, TxExpiration, TxPackage}; pub trait Task: Runnable + Send + Sized + 'static { type TxExpiration: TxExpiration; @@ -19,6 +19,8 @@ pub trait Task: Runnable + Send + Sized + 'static { } pub trait Id: Debug + Clone + Ord + Send + Sized + 'static { + type ServiceConfiguration: Send + 'static; + type TaskCreationContext: Send + 'static; type Task: Task; @@ -27,10 +29,12 @@ pub trait Id: Debug + Clone + Ord + Send + Sized + 'static { fn name(&self) -> Cow<'static, str>; - fn into_task( + fn into_task<'r>( self, - task_creation_context: supervisor::TaskCreationContext<'_, Self::Task>, - ) -> impl Future> + Send + '_; + service_configuration: &'r mut Self::ServiceConfiguration, + task_creation_context: &'r mut Self::TaskCreationContext, + transaction_tx: &'r channel::unbounded::Sender< + TxPackage<::TxExpiration>, + >, + ) -> impl Future> + Send + 'r; } - -pub type TaskCreationContext = <::Id as Id>::TaskCreationContext; diff --git a/chain-ops/src/task/balance_reporter.rs b/chain-ops/src/task/balance_reporter.rs index a7d86f2..299f168 100644 --- a/chain-ops/src/task/balance_reporter.rs +++ b/chain-ops/src/task/balance_reporter.rs @@ -3,9 +3,9 @@ use std::time::Duration; use anyhow::Result; use tokio::time::sleep; -use crate::node; +use crate::{node, supervisor::configuration}; -use super::Runnable; +use super::{BalanceReporterTrait, BuiltInTask, Runnable}; macro_rules! log { ($macro:ident!($($body:tt)+)) => { @@ -26,7 +26,7 @@ macro_rules! log_span { pub struct BalanceReporter { client: node::QueryBank, address: Box, - denom: Box, + fee_token: Box, idle_duration: Duration, } @@ -41,7 +41,7 @@ impl BalanceReporter { Self { client, address: signer_address, - denom, + fee_token: denom, idle_duration, } } @@ -67,14 +67,16 @@ impl Runnable for BalanceReporter { loop { let amount = self .client - .balance(self.address.to_string(), self.denom.to_string()) + .balance(self.address.to_string(), self.fee_token.to_string()) .await? .to_string(); log_span!(info_span!("Balance Report") { log!(info!("")); - log!(info!("Amount available: {} {}", Self::format_amount(amount), self.denom)); + log!(info!("Account address: {}", self.address)); + + log!(info!("Amount available: {} {}", Self::format_amount(amount), self.fee_token)); log!(info!("")); }); @@ -84,6 +86,22 @@ impl Runnable for BalanceReporter { } } +impl BuiltInTask for BalanceReporter { + type ServiceConfiguration = configuration::Service; +} + +impl BalanceReporterTrait for BalanceReporter { + fn new(service_configuration: &Self::ServiceConfiguration) -> Self { + Self { + client: service_configuration.node_client().clone().query_bank(), + address: service_configuration.signer().address().into(), + fee_token: service_configuration.signer().fee_token().into(), + idle_duration: service_configuration + .balance_reporter_idle_duration(), + } + } +} + #[test] fn test_amount_formatting() { assert_eq!(BalanceReporter::format_amount("1".into()), "1"); diff --git a/chain-ops/src/task/broadcast.rs b/chain-ops/src/task/broadcast.rs index be2a5c5..53052b7 100644 --- a/chain-ops/src/task/broadcast.rs +++ b/chain-ops/src/task/broadcast.rs @@ -9,9 +9,9 @@ use cosmrs::{ }; use tokio::{sync::mpsc, time::sleep}; -use crate::{node, signer::Signer}; +use crate::{channel, node, signer::Signer, supervisor::configuration}; -use super::{Runnable, TxExpiration, TxPackage}; +use super::{BroadcastTrait, BuiltInTask, Runnable, TxExpiration, TxPackage}; macro_rules! log_simulation { ($macro:ident![$source:expr]($($body:tt)+)) => { @@ -254,3 +254,33 @@ where } } } + +impl BuiltInTask for Broadcast +where + Expiration: TxExpiration, +{ + type ServiceConfiguration = configuration::Service; +} + +impl BroadcastTrait for Broadcast +where + Expiration: TxExpiration, +{ + type TxExpiration = Expiration; + + fn new( + service_configuration: &Self::ServiceConfiguration, + transaction_rx: channel::unbounded::Receiver< + TxPackage, + >, + ) -> Self { + Self { + client: service_configuration.node_client().clone().broadcast_tx(), + signer: service_configuration.signer().clone(), + transaction_rx, + delay_duration: service_configuration.broadcast_delay_duration(), + retry_delay_duration: service_configuration + .broadcast_retry_delay_duration(), + } + } +} diff --git a/chain-ops/src/task/mod.rs b/chain-ops/src/task/mod.rs index 4ab942e..25c7bd5 100644 --- a/chain-ops/src/task/mod.rs +++ b/chain-ops/src/task/mod.rs @@ -17,13 +17,9 @@ use tokio::{ }; use tracing::{error, error_span}; -use crate::service::task_spawner::{ - CancellationToken, ServiceStopped, TaskSpawner, -}; - -use self::{ - balance_reporter::BalanceReporter, broadcast::Broadcast, - protocol_watcher::ProtocolWatcher, +use crate::{ + channel, + service::task_spawner::{CancellationToken, ServiceStopped, TaskSpawner}, }; pub mod application_defined; @@ -36,19 +32,19 @@ pub trait Runnable: Sized { } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub enum Id +pub enum Id where - T: application_defined::Id, + ApplicationDefined: application_defined::Id, { BalanceReporter, Broadcast, ProtocolWatcher, - ApplicationDefined(T), + ApplicationDefined(ApplicationDefined), } -impl Id +impl Id where - T: application_defined::Id, + ApplicationDefined: application_defined::Id, { pub fn name(&self) -> Cow<'static, str> { match &self { @@ -87,24 +83,78 @@ impl State { } } -pub enum Task +pub trait BuiltInTask: Runnable + Send + Sized + 'static { + type ServiceConfiguration; +} + +pub trait BalanceReporterTrait: BuiltInTask { + fn new(service_configuration: &Self::ServiceConfiguration) -> Self; +} + +pub trait BroadcastTrait: BuiltInTask { + type TxExpiration: TxExpiration; + + fn new( + service_configuration: &Self::ServiceConfiguration, + transaction_rx: channel::unbounded::Receiver< + TxPackage, + >, + ) -> Self; +} + +pub trait ProtocolWatcherTrait: BuiltInTask { + fn new( + service_configuration: &Self::ServiceConfiguration, + task_states: &BTreeMap, State>, + command_tx: channel::bounded::Sender, + ) -> Self + where + ApplicationDefined: application_defined::Id; +} + +pub enum Task where - T: application_defined::Task, + BalanceReporter: BalanceReporterTrait, + Broadcast: BroadcastTrait< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + ProtocolWatcher: ProtocolWatcherTrait< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + ApplicationDefined: application_defined::Task< + TxExpiration = Broadcast::TxExpiration, + Id: application_defined::Id< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + >, { BalanceReporter(BalanceReporter), - Broadcast(Broadcast), + Broadcast(Broadcast), ProtocolWatcher(ProtocolWatcher), - ApplicationDefined(T), + ApplicationDefined(ApplicationDefined), } -impl Task +impl + Task where - T: application_defined::Task, + BalanceReporter: BalanceReporterTrait, + Broadcast: BroadcastTrait< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + ProtocolWatcher: ProtocolWatcherTrait< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + ApplicationDefined: application_defined::Task< + TxExpiration = Broadcast::TxExpiration, + Id: application_defined::Id< + ServiceConfiguration = BalanceReporter::ServiceConfiguration, + >, + >, { pub async fn run( self, - task_spawner: &TaskSpawner, Result<()>>, - task_states: &mut BTreeMap, State>, + task_spawner: &TaskSpawner, Result<()>>, + task_states: &mut BTreeMap, State>, ) -> Result<(), ServiceStopped> { let task_id = self.identifier(); @@ -140,7 +190,7 @@ where }) } - fn identifier(&self) -> Id { + fn identifier(&self) -> Id { match self { Self::BalanceReporter { .. } => Id::BalanceReporter, Self::Broadcast { .. } => Id::Broadcast, @@ -196,17 +246,17 @@ impl TxExpiration for NoExpiration { #[derive(Clone, Copy)] #[must_use] -pub struct WithExpiration { +pub struct TimeBasedExpiration { expires_at: Instant, } -impl WithExpiration { +impl TimeBasedExpiration { pub const fn new(expires_at: Instant) -> Self { Self { expires_at } } } -impl TxExpiration for WithExpiration { +impl TxExpiration for TimeBasedExpiration { type Expired = Elapsed; #[inline] diff --git a/chain-ops/src/task/protocol_watcher.rs b/chain-ops/src/task/protocol_watcher.rs index a48ec6d..de5a9b7 100644 --- a/chain-ops/src/task/protocol_watcher.rs +++ b/chain-ops/src/task/protocol_watcher.rs @@ -1,11 +1,17 @@ -use std::{collections::BTreeSet, sync::Arc, time::Duration}; +use std::{ + collections::BTreeMap, collections::BTreeSet, sync::Arc, time::Duration, +}; use anyhow::{Context as _, Result}; use tokio::{sync::mpsc, time::sleep}; -use crate::contract::Admin as AdminContract; +use crate::{ + channel, contract::Admin as AdminContract, supervisor::configuration, task, +}; -use super::Runnable; +use super::{ + application_defined, BuiltInTask, ProtocolWatcherTrait, Runnable, State, +}; macro_rules! log { ($macro:ident![$protocol:expr]($($body:tt)+)) => { @@ -76,6 +82,36 @@ impl Runnable for ProtocolWatcher { } } +impl BuiltInTask for ProtocolWatcher { + type ServiceConfiguration = configuration::Service; +} + +impl ProtocolWatcherTrait for ProtocolWatcher { + fn new( + service_configuration: &Self::ServiceConfiguration, + task_states: &BTreeMap, State>, + command_tx: channel::bounded::Sender, + ) -> Self + where + ApplicationDefined: application_defined::Id, + { + Self { + admin_contract: service_configuration.admin_contract().clone(), + protocol_tasks: task_states + .keys() + .filter_map(|id| { + if let task::Id::ApplicationDefined(id) = id { + id.protocol().cloned() + } else { + None + } + }) + .collect(), + command_tx, + } + } +} + #[derive(Debug)] pub enum Command { ProtocolAdded(Arc), diff --git a/services/alarms-dispatcher/src/task/mod.rs b/services/alarms-dispatcher/src/task/mod.rs index a52b9fd..72bbc02 100644 --- a/services/alarms-dispatcher/src/task/mod.rs +++ b/services/alarms-dispatcher/src/task/mod.rs @@ -3,9 +3,10 @@ use std::{borrow::Cow, sync::Arc}; use anyhow::Result; use chain_ops::{ + channel, contract::admin::{BaseProtocol, ProtocolContracts}, - supervisor::TaskCreationContext, - task::{application_defined, NoExpiration, Runnable}, + supervisor::configuration, + task::{application_defined, NoExpiration, Runnable, TxPackage}, }; use crate::ApplicationDefinedContext; @@ -31,31 +32,31 @@ pub enum Id { impl Id { async fn create_time_alarms_task( - task_creation_context: &mut TaskCreationContext<'_, Task>, + service_configuration: &configuration::Service, + task_creation_context: &ApplicationDefinedContext, + transaction_tx: &channel::unbounded::Sender< + TxPackage<::TxExpiration>, + >, ) -> Result { - task_creation_context + service_configuration .admin_contract() + .clone() .platform() .await .and_then(|platform| { alarms_generator::AlarmsGenerator::new_time_alarms( alarms_generator::Configuration { - node_client: task_creation_context + node_client: service_configuration .node_client() .clone(), - transaction_tx: task_creation_context - .transaction_tx() - .clone(), - sender: task_creation_context.signer_address().into(), + transaction_tx: transaction_tx.clone(), + sender: service_configuration.signer().address().into(), address: platform.time_alarms.into(), alarms_per_message: task_creation_context - .application_defined() .time_alarms_per_message, - gas_per_alarm: task_creation_context - .application_defined() - .gas_per_time_alarm, - idle_duration: task_creation_context.idle_duration(), - timeout_duration: task_creation_context + gas_per_alarm: task_creation_context.gas_per_time_alarm, + idle_duration: service_configuration.idle_duration(), + timeout_duration: service_configuration .timeout_duration(), }, TimeAlarms {}, @@ -65,11 +66,14 @@ impl Id { } async fn create_price_alarms_task( - mut task_creation_context: TaskCreationContext<'_, Task>, + service_configuration: &configuration::Service, + task_creation_context: &ApplicationDefinedContext, + transaction_tx: &channel::unbounded::Sender>, protocol_name: Arc, ) -> Result { - task_creation_context + service_configuration .admin_contract() + .clone() .base_protocol(&protocol_name) .await .and_then( @@ -78,25 +82,22 @@ impl Id { }| { alarms_generator::AlarmsGenerator::new_price_alarms( alarms_generator::Configuration { - node_client: task_creation_context + node_client: service_configuration .node_client() .clone(), - transaction_tx: task_creation_context - .transaction_tx() - .clone(), - sender: task_creation_context - .signer_address() + transaction_tx: transaction_tx.clone(), + sender: service_configuration + .signer() + .address() .into(), address: oracle.into(), alarms_per_message: task_creation_context - .application_defined() .price_alarms_per_message, gas_per_alarm: task_creation_context - .application_defined() .gas_per_price_alarm, - idle_duration: task_creation_context + idle_duration: service_configuration .idle_duration(), - timeout_duration: task_creation_context + timeout_duration: service_configuration .timeout_duration(), }, PriceAlarms::new(protocol_name), @@ -108,6 +109,8 @@ impl Id { } impl application_defined::Id for Id { + type ServiceConfiguration = configuration::Service; + type TaskCreationContext = ApplicationDefinedContext; type Task = Task; @@ -128,15 +131,22 @@ impl application_defined::Id for Id { } } - async fn into_task( + async fn into_task<'r>( self, - mut task_creation_context: TaskCreationContext<'_, Task>, + &mut ref service_configuration: &'r mut Self::ServiceConfiguration, + &mut ref task_creation_context: &'r mut Self::TaskCreationContext, + transaction_tx: &'r channel::unbounded::Sender>, ) -> Result { match self { Id::TimeAlarmsGenerator => { log!(info!("Creating time alarms generator.")); - Self::create_time_alarms_task(&mut task_creation_context).await + Self::create_time_alarms_task( + service_configuration, + task_creation_context, + transaction_tx, + ) + .await }, Id::PriceAlarmsGenerator { protocol: protocol_name, @@ -147,7 +157,9 @@ impl application_defined::Id for Id { )); Self::create_price_alarms_task( + service_configuration, task_creation_context, + transaction_tx, protocol_name, ) .await diff --git a/services/market-data-feeder/src/task/id.rs b/services/market-data-feeder/src/task/id.rs index 5c54df3..add66f3 100644 --- a/services/market-data-feeder/src/task/id.rs +++ b/services/market-data-feeder/src/task/id.rs @@ -2,14 +2,15 @@ use std::{ borrow::Cow, collections::btree_map::Entry as BTreeMapEntry, sync::Arc, }; -use anyhow::{bail, Context as _}; +use anyhow::{bail, Context as _, Result}; use chain_ops::{ + channel, contract::admin::{Dex, Protocol, ProtocolContracts}, env::ReadFromVar, node, - supervisor::TaskCreationContext, - task::application_defined, + supervisor::configuration, + task::{application_defined, TimeBasedExpiration, TxPackage}, tx::ExecuteTemplate, }; @@ -77,6 +78,8 @@ impl Id { } impl application_defined::Id for Id { + type ServiceConfiguration = configuration::Service; + type TaskCreationContext = ApplicationDefinedContext; type Task = Task; @@ -91,10 +94,14 @@ impl application_defined::Id for Id { Cow::Owned(self.protocol.to_string()) } - async fn into_task( + async fn into_task<'r>( self, - mut task_creation_context: TaskCreationContext<'_, Task>, - ) -> anyhow::Result { + service_configuration: &'r mut Self::ServiceConfiguration, + task_creation_context: &'r mut Self::TaskCreationContext, + transaction_tx: &'r channel::unbounded::Sender< + TxPackage, + >, + ) -> Result { let Protocol { network, dex, @@ -102,8 +109,9 @@ impl application_defined::Id for Id { ProtocolContracts { oracle: oracle_address, }, - } = task_creation_context + } = service_configuration .admin_contract() + .clone() .protocol(&self.protocol) .await .with_context(|| { @@ -113,11 +121,10 @@ impl application_defined::Id for Id { ) })?; - let node_client = task_creation_context.node_client().clone(); + let node_client = service_configuration.node_client().clone(); let dex_node_client = { let entry = task_creation_context - .application_defined() .dex_node_clients .entry(network.clone()); @@ -135,16 +142,13 @@ impl application_defined::Id for Id { }; task_creation_context - .application_defined() .dex_node_clients .insert(network, dex_node_client.clone()); Oracle::new( node_client.clone().query_wasm(), oracle_address.clone(), - task_creation_context - .application_defined() - .update_currencies_interval, + task_creation_context.update_currencies_interval, ) .await .map(|oracle| Base { @@ -158,19 +162,15 @@ impl application_defined::Id for Id { self.protocol, ) .into(), - duration_before_start: task_creation_context - .application_defined() - .duration_before_start, + duration_before_start: task_creation_context.duration_before_start, execute_template: ExecuteTemplate::new( - task_creation_context.signer_address().into(), + service_configuration.signer().address().into(), oracle_address, ), - idle_duration: task_creation_context.idle_duration(), - timeout_duration: task_creation_context.timeout_duration(), - hard_gas_limit: task_creation_context - .application_defined() - .gas_limit, - transaction_tx: task_creation_context.transaction_tx().clone(), + idle_duration: service_configuration.idle_duration(), + timeout_duration: service_configuration.timeout_duration(), + hard_gas_limit: task_creation_context.gas_limit, + transaction_tx: transaction_tx.clone(), }) .map(|base| Task { base, diff --git a/services/market-data-feeder/src/task/mod.rs b/services/market-data-feeder/src/task/mod.rs index ae024c5..d391614 100644 --- a/services/market-data-feeder/src/task/mod.rs +++ b/services/market-data-feeder/src/task/mod.rs @@ -6,7 +6,7 @@ use cosmrs::Gas; use chain_ops::{ channel::unbounded, node, - task::{application_defined, Runnable, TxPackage, WithExpiration}, + task::{application_defined, Runnable, TimeBasedExpiration, TxPackage}, tx::ExecuteTemplate, }; @@ -37,7 +37,7 @@ impl Runnable for Task { } impl application_defined::Task for Task { - type TxExpiration = WithExpiration; + type TxExpiration = TimeBasedExpiration; type Id = Id; @@ -65,5 +65,5 @@ struct Base { idle_duration: Duration, timeout_duration: Duration, hard_gas_limit: Gas, - transaction_tx: unbounded::Sender>, + transaction_tx: unbounded::Sender>, } diff --git a/services/market-data-feeder/src/task/provider.rs b/services/market-data-feeder/src/task/provider.rs index ffab378..1c095bd 100644 --- a/services/market-data-feeder/src/task/provider.rs +++ b/services/market-data-feeder/src/task/provider.rs @@ -17,7 +17,7 @@ use tokio::{ use chain_ops::{ defer::Defer, - task::{TxPackage, WithExpiration}, + task::{TimeBasedExpiration, TxPackage}, task_set::TaskSet, tx, }; @@ -398,7 +398,7 @@ where hard_gas_limit: self.base.hard_gas_limit, fallback_gas, feedback_sender, - expiration: WithExpiration::new( + expiration: TimeBasedExpiration::new( Instant::now() + self.base.timeout_duration, ), })