Skip to content

Commit

Permalink
added supervised context
Browse files Browse the repository at this point in the history
  • Loading branch information
EnvOut committed Feb 27, 2024
1 parent 28a1272 commit 2169911
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 26 deletions.
103 changes: 103 additions & 0 deletions src/uactor/examples/supervised_actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use std::ops::Shl;
use std::time::Duration;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use uactor::actor::MessageSender;
use uactor::system::System;

use crate::actor1::Actor1;
use crate::actor1::Actor1Msg;
use crate::actor1::Actor1Ref;
use crate::messages::PingMsg;
use crate::supervisor::{Supervisor, SupervisorMsg, SupervisorRef};

mod messages {
use tokio::sync::oneshot::Sender;

use uactor::message::Message;

pub struct PingMsg(pub Sender<PongMsg>);

#[derive(Debug)]
pub struct PongMsg;

uactor::message_impl!(PingMsg, PongMsg);
}

mod actor1 {
use tokio::sync::mpsc;
use uactor::actor::{Actor, Handler, HandleResult, MessageSender};
use uactor::context::ActorContext;
use uactor::context::supervised::SupervisedContext;
use crate::messages::{PingMsg, PongMsg};
use crate::supervisor::{SupervisorMsg, SupervisorRef};

pub struct Actor1;

impl Actor for Actor1 {
type Context = SupervisedContext<SupervisorRef<mpsc::UnboundedSender<SupervisorMsg>>>;
type Inject = ();
}

impl Handler<PingMsg> for Actor1 {
async fn handle(&mut self, _: &mut Self::Inject, ping: PingMsg, ctx: &mut Self::Context) -> HandleResult {
println!("actor1: Received ping message");
let PingMsg(reply) = ping;
let _ = reply.send(PongMsg);
ctx.kill();
Ok(())
}
}

uactor::generate_actor_ref!(Actor1, { PingMsg });
}

mod supervisor {
use uactor::actor::{Actor, Handler, HandleResult, MessageSender};
use uactor::context::{ActorDied, Context};
use uactor::data_publisher::{DataPublisher, DataPublisherResult, TryClone};

pub struct Supervisor;

impl Actor for Supervisor {
type Context = Context;
type Inject = ();
}

impl Handler<ActorDied> for Supervisor {
async fn handle(&mut self, _: &mut Self::Inject, ActorDied(name): ActorDied, _: &mut Context) -> HandleResult {
println!("Actor with name: {name:?} - died");
Ok(())
}
}

uactor::generate_actor_ref!(Supervisor, { ActorDied });
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(LevelFilter::INFO)
.with(tracing_subscriber::fmt::layer())
.init();

let mut system = System::global().build();

let actor1 = Actor1;
let supervisor = Supervisor;

let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1);
let (supervisor_ref, _) = uactor::spawn_with_ref!(system, supervisor: Supervisor);

system.run_actor::<Supervisor>(supervisor_ref.name()).await?;
system.run_actor::<Actor1>(actor1_ref.name()).await?;

let pong = actor1_ref.ask(|reply| PingMsg(reply))
.await?;
println!("main: received {pong:?} message");

tokio::time::sleep(Duration::from_secs(1)).await;

Ok(())
}
100 changes: 78 additions & 22 deletions src/uactor/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,99 @@
use crate::data_publisher::DataPublisher;
use crate::system::System;
use std::sync::Arc;
use crate::actor::MessageSender;
use crate::data_publisher::{DataPublisher, TryClone};
use crate::message::Message;
use crate::system::{System, utils};

pub type ContextResult<T> = Result<T, Box<dyn std::error::Error>>;
pub type ContextInitializationError<T> = Result<T, String>;

pub trait ActorContext: Sized + Unpin + 'static {
async fn on_start(&mut self) -> ContextResult<()>;
async fn on_die(&mut self) -> ContextResult<()>;
async fn on_iteration(&mut self) -> ContextResult<()>;
async fn kill(&mut self);
#[inline]
fn on_start(&mut self) -> ContextResult<()> { Ok(()) }
#[inline]
fn on_die(&mut self, actor_name: Arc<str>) -> ContextResult<()> { Ok(()) }
#[inline]
fn on_iteration(&mut self) -> ContextResult<()> { Ok(()) }
fn kill(&mut self);
#[allow(clippy::wrong_self_convention)]
fn is_alive(&mut self) -> bool { true }
async fn create(system: &mut System) -> ContextInitializationError<Self>;
}

pub struct ActorDied(pub Arc<str>);
impl Message for ActorDied {}

#[derive(derive_more::Constructor)]
pub struct Context {
pub killed: bool,
pub alive: bool,
}

impl ActorContext for Context {
async fn on_start(&mut self) -> ContextResult<()> { Ok(()) }

async fn on_die(&mut self) -> ContextResult<()> { Ok(()) }

async fn on_iteration(&mut self) -> ContextResult<()> { Ok(()) }
fn kill(&mut self) { self.alive = false; }

async fn kill(&mut self) { self.killed = true; }
fn is_alive(&mut self) -> bool {
self.alive
}

async fn create(_: &mut System) -> ContextInitializationError<Self> {
Ok(Context { killed: false })
Ok(Context { alive: true })
}
}

#[derive(derive_more::Constructor)]
pub struct SupervisedContext<T: DataPublisher> {
pub killed: bool,
id: u32,
supervisor: T,
}
pub mod supervised {
use std::sync::Arc;
use crate::actor::MessageSender;
use crate::context::{ActorContext, ActorDied, ContextInitializationError, ContextResult};
use crate::data_publisher::{DataPublisher, TryClone};
use crate::system::{System, utils};

#[derive(derive_more::Constructor)]
pub struct SupervisedContext<T> where T: MessageSender<ActorDied> {
pub alive: bool,
id: usize,
supervisor: T,
}

impl<T> ActorContext for SupervisedContext<T>
where T: MessageSender<ActorDied> + Unpin + 'static + TryClone + Send + Sync
{
fn on_die(&mut self, actor_name: Arc<str>) -> ContextResult<()> {
if let Err(e) = self.supervisor.send(ActorDied(actor_name)) {
tracing::error!("Failed to notify supervisor about actor death: {:?}", e);
}
Ok(())
}

fn kill(&mut self) { self.alive = false; }

fn is_alive(&mut self) -> bool {
self.alive
}

async fn create(system: &mut System) -> ContextInitializationError<Self> {
let mut found_actors: Vec<T> = system.get_actors::<T>()
.map_err(|e| e.to_string())?;
let is_more_one = found_actors.len() > 1;

if is_more_one {
let msg = format!("SupervisedContext can't be used with more than one actor: {:?} of the same kind", utils::type_name::<T>());
tracing::error!(msg);
return Err(msg);
} else if found_actors.len() == 0 {
let msg = format!("SupervisedContext can't be used without selected supervisor's actor: {:?}", utils::type_name::<T>());
tracing::error!(msg);
return Err(msg);
}

let (supervisor) = found_actors.remove(0);
Ok(Self {
alive: true,
id: rand::random(),
supervisor,
})
}
}
}

pub mod extensions {
use std::any::{Any, TypeId};
Expand Down Expand Up @@ -385,10 +441,10 @@ pub mod actor_registry {

#[derive(thiserror::Error, Debug)]
pub enum ActorRegistryErrors {
#[error("Type {kind:?} with name: {actor_name:?} is not registered within system context {system_name:?}")]
#[error("Actor {kind:?} with name: {actor_name:?} is not registered within system context {system_name:?}")]
NotRegisteredActor { system_name: Arc<str>, kind: String, actor_name: Arc<str> },

#[error("Type {kind:?} is not registered within system context {system_name:?}")]
#[error("Actor {kind} is not registered within system context {system_name:?}")]
NotRegisteredActorKind { system_name: Arc<str>, kind: String },

#[error("Can't downcast registered actor into: {kind:?}, system: {system_name:?}")]
Expand Down
2 changes: 2 additions & 0 deletions src/uactor/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ mod select_from_tuple {
async fn select(&mut self, inject: &mut A::Inject, ctx: &mut <A as Actor>::Context, actor: &mut A) -> SelectResult {
if let Ok(msg) = self.next().await {
let _ = actor.handle(inject, msg, ctx).await?;
} else {
tracing::error!("Channel closed");
}
Ok(())
}
Expand Down
31 changes: 27 additions & 4 deletions src/uactor/src/system.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::actor::Actor;
use crate::context::extensions::{ExtensionErrors, Extensions, Service};
use crate::context::{ActorContext, Context, ContextInitializationError};
use crate::context::{ActorContext, Context, ContextInitializationError, ContextResult};
use crate::di::{Inject, InjectError};
use crate::select::ActorSelect;
use crate::system::builder::SystemBuilder;
Expand All @@ -21,7 +21,7 @@ pub enum ActorRunningError {
MissedInitializationOrAlreadyStarted(Arc<str>),
#[error(transparent)]
InjectError(#[from] InjectError),
#[error("Can't create actor context: {0:?}")]
#[error("Can't create actor context: {0}")]
ContextError(String),
}

Expand Down Expand Up @@ -75,7 +75,7 @@ impl System {
let actor_ref = self.actor_registry.get_all()
.ok_or_else(|| {
let system_name = self.name.clone();
let kind = utils::type_name::<A>();
let kind = std::any::type_name::<A>().to_owned();
ActorRegistryErrors::NotRegisteredActorKind { system_name, kind }
})?.into_iter()
.map(|i: &A| i.try_clone())
Expand All @@ -84,6 +84,7 @@ impl System {
}

pub fn insert_actor<T: Send + Sync + TryClone + 'static>(&mut self, actor_name: Arc<str>, actor_ref: T) {
tracing::info!("Insert actor: {actor_name:?}: {} into system context: {:?}", std::any::type_name::<T>(), self.name);
self.actor_registry.insert::<T>(actor_name, actor_ref);
}

Expand Down Expand Up @@ -166,7 +167,19 @@ impl System {
*boxed_state
};

loop {
// call on_start
match ctx.on_start() {
Ok(_) => {
tracing::trace!("Starting the actor: {name:?}");
}
Err(err) => {
tracing::error!("Error during actor start: {err:?}");
ctx.kill();
}
}

// main loop
while ctx.is_alive() {
tracing::trace!("iteration of the process: {name:?}");
let res = select.select(&mut state, &mut ctx, &mut actor).await;

Expand All @@ -176,6 +189,16 @@ impl System {
tracing::trace!("{name:?} successful iteration");
}
}
// call on_die
match ctx.on_die(name.clone()) {
Ok(_) => {
tracing::trace!("The actor: {name:?} is dead");
}
Err(err) => {
tracing::error!("Error during actor die: {err:?}");
return;
}
};
} else {
tracing::error!("Can't run {name:?}, system dropped");
()
Expand Down

0 comments on commit 2169911

Please sign in to comment.