Skip to content

Commit

Permalink
refactor(chain-ops): Abstract away built-in task implementations via …
Browse files Browse the repository at this point in the history
…dependency injection.
  • Loading branch information
KirilMihaylov committed Aug 21, 2024
1 parent 63a1b2f commit 35f10a3
Show file tree
Hide file tree
Showing 12 changed files with 465 additions and 354 deletions.
40 changes: 26 additions & 14 deletions chain-ops/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ use crate::{
log,
service::{self, ShutdownResult},
supervisor::{
self,
configuration::{self, Configuration},
Supervisor,
},
task::application_defined::{Id, TaskCreationContext},
task::{
application_defined, balance_reporter::BalanceReporter,
broadcast::Broadcast, protocol_watcher::ProtocolWatcher,
},
};

#[inline]
Expand All @@ -28,31 +31,29 @@ pub async fn run<
where
LogsDirectory: AsRef<Path>,
TaskCreationContextCtor: FnOnce() -> Result<
TaskCreationContext<<StartupTasksIter::Item as Id>::Task>,
<StartupTasksIter::Item as application_defined::Id>::TaskCreationContext,
>,
StartupTasksFunctor: FnOnce() -> StartupTasksIter,
StartupTasksIter: Iterator + Send + 'static,
StartupTasksIter::Item: Id + Unpin,
StartupTasksIter::Item: application_defined::Id<ServiceConfiguration=configuration::Service> + Unpin,
{
log::init(logs_directory).context("Failed to initialize logging!")?;

let configuration = configuration::Static::read_from_env()
.await
.context("Failed to read service configuration!")?;
let service_configuration =
configuration::Service::read_from_env()
.await
.context("Failed to read service configuration!")?;

let task_creation_context = task_creation_context()
.context("Failed to construct task creation context!")?;

let startup_tasks = startup_tasks();

service::run(move |task_spawner, task_result_rx| async move {
Supervisor::new(
Configuration::<<StartupTasksIter::Item as Id>::Task>::new(
configuration,
task_spawner,
task_result_rx,
task_creation_context,
),
Supervisor::<StartupTasksIter::Item>::new(
Configuration::new(service_configuration, task_creation_context),
task_spawner,
task_result_rx,
application_name,
application_version,
startup_tasks,
Expand All @@ -72,3 +73,14 @@ where
ShutdownResult::StopSignalReceived => Ok(()),
})
}

type Supervisor<Id> = supervisor::Supervisor<
BalanceReporter,
Broadcast<TxExpiration<Id>>,
ProtocolWatcher,
Task<Id>,
>;

type TxExpiration<Id> = <Task<Id> as application_defined::Task>::TxExpiration;

type Task<Id> = <Id as application_defined::Id>::Task;
132 changes: 68 additions & 64 deletions chain-ops/src/supervisor/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,54 @@
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use anyhow::{Context as _, Error, Result};
use zeroize::Zeroizing;

use crate::{
contract,
env::ReadFromVar,
key, node,
service::{task_spawner::TaskSpawner, TaskResultsReceiver},
signer::{GasAndFeeConfiguration, Signer},
task::{self, application_defined},
task::application_defined,
};

#[must_use]
pub struct Static {
pub struct Configuration<Id>
where
Id: application_defined::Id,
{
pub(super) service_configuration: Id::ServiceConfiguration,
pub(super) task_creation_context: Id::TaskCreationContext,
}

impl<Id> Configuration<Id>
where
Id: application_defined::Id,
{
#[inline]
pub fn new(
service_configuration: Id::ServiceConfiguration,
task_creation_context: Id::TaskCreationContext,
) -> Self {
Self {
service_configuration,
task_creation_context,
}
}
}

#[must_use]
pub struct Service {
node_client: node::Client,
signer: Signer,
admin_contract_address: Arc<str>,
admin_contract: contract::Admin,
idle_duration: Duration,
timeout_duration: Duration,
balance_reporter_idle_duration: Duration,
broadcast_delay_duration: Duration,
broadcast_retry_delay_duration: Duration,
}

impl Static {
impl Service {
pub async fn read_from_env() -> Result<Self> {
let node_client = node::Client::connect(&Self::read_node_grpc_uri()?)
.await
Expand All @@ -37,8 +62,10 @@ impl Static {
)
.await?;

let admin_contract_address =
Self::read_admin_contract_address()?.into();
let admin_contract = contract::Admin::new(
node_client.clone().query_wasm(),
Self::read_admin_contract_address()?.into(),
);

let idle_duration = Self::read_idle_duration()?;

Expand All @@ -55,7 +82,7 @@ impl Static {
Ok(Self {
node_client,
signer,
admin_contract_address,
admin_contract,
idle_duration,
timeout_duration,
balance_reporter_idle_duration,
Expand All @@ -64,6 +91,38 @@ impl Static {
})
}

pub fn node_client(&self) -> &node::Client {
&self.node_client
}

pub fn signer(&self) -> &Signer {
&self.signer
}

pub fn admin_contract(&self) -> &contract::Admin {
&self.admin_contract
}

pub fn idle_duration(&self) -> Duration {
self.idle_duration
}

pub fn timeout_duration(&self) -> Duration {
self.timeout_duration
}

pub fn balance_reporter_idle_duration(&self) -> Duration {
self.balance_reporter_idle_duration
}

pub fn broadcast_delay_duration(&self) -> Duration {
self.broadcast_delay_duration
}

pub fn broadcast_retry_delay_duration(&self) -> Duration {
self.broadcast_retry_delay_duration
}

fn read_node_grpc_uri() -> Result<String> {
String::read_from_var("NODE_GRPC_URI")
.context("Failed to read node's gRPC URI!")
Expand Down Expand Up @@ -125,58 +184,3 @@ impl Static {
.context("Failed to read between broadcast retries delay period duration!")
}
}

#[must_use]
pub struct Configuration<T>
where
T: application_defined::Task,
{
pub(super) node_client: node::Client,
pub(super) signer: Signer,
pub(super) admin_contract_address: Arc<str>,
pub(super) task_spawner: TaskSpawner<task::Id<T::Id>, Result<()>>,
pub(super) task_result_rx: TaskResultsReceiver<task::Id<T::Id>, Result<()>>,
pub(super) idle_duration: Duration,
pub(super) timeout_duration: Duration,
pub(super) balance_reporter_idle_duration: Duration,
pub(super) broadcast_delay_duration: Duration,
pub(super) broadcast_retry_delay_duration: Duration,
pub(super) task_creation_context:
application_defined::TaskCreationContext<T>,
}

impl<T> Configuration<T>
where
T: application_defined::Task,
{
pub fn new(
Static {
node_client,
signer,
admin_contract_address,
idle_duration,
timeout_duration,
balance_reporter_idle_duration,
broadcast_delay_duration,
broadcast_retry_delay_duration,
}: Static,
task_spawner: TaskSpawner<task::Id<T::Id>, Result<()>>,
task_result_rx: TaskResultsReceiver<task::Id<T::Id>, Result<()>>,
task_creation_context:
<T::Id as application_defined::Id>::TaskCreationContext,
) -> Self {
Self {
node_client,
signer,
admin_contract_address,
task_spawner,
task_result_rx,
idle_duration,
timeout_duration,
balance_reporter_idle_duration,
broadcast_delay_duration,
broadcast_retry_delay_duration,
task_creation_context,
}
}
}
Loading

0 comments on commit 35f10a3

Please sign in to comment.