From 7f8d8498ea25b16d2d0bcae0f2bd8231f0df3998 Mon Sep 17 00:00:00 2001 From: Alexey Cheban Date: Mon, 12 Aug 2024 17:05:58 +0300 Subject: [PATCH] AF-4032: Refactor --- src/uactor/examples/base_sample.rs | 27 +++-- src/uactor/examples/dependency_injection.rs | 66 +++++++----- src/uactor/examples/interval.rs | 31 ++++-- src/uactor/examples/single_channel_actor.rs | 12 ++- src/uactor/examples/supervised_actor.rs | 33 +++--- src/uactor/src/actor.rs | 19 ++-- src/uactor/src/context.rs | 29 +++--- src/uactor/src/data_publisher.rs | 71 +++++++------ src/uactor/src/datasource.rs | 37 +++---- src/uactor/src/di.rs | 39 +++---- src/uactor/src/message.rs | 48 +++++++-- src/uactor/src/select.rs | 47 ++++++--- src/uactor/src/system.rs | 107 +++++++++++++------- 13 files changed, 353 insertions(+), 213 deletions(-) diff --git a/src/uactor/examples/base_sample.rs b/src/uactor/examples/base_sample.rs index 3325648..771ad4c 100644 --- a/src/uactor/examples/base_sample.rs +++ b/src/uactor/examples/base_sample.rs @@ -1,11 +1,9 @@ use std::time::Duration; use tracing::level_filters::LevelFilter; -use tracing_subscriber::fmt::writer::MakeWriterExt; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use uactor::select::ActorSelect; use uactor::system::System; use crate::actor1::Actor1; @@ -37,7 +35,7 @@ pub mod messages { } pub mod actor1 { - use uactor::actor::{Actor, Handler, HandleResult}; + use uactor::actor::{Actor, HandleResult, Handler}; use uactor::context::Context; use crate::messages::{PingPongMsg, ReqMsg, RespMsg}; @@ -52,14 +50,24 @@ pub mod actor1 { } impl Handler for Actor1 { - async fn handle(&mut self, _: &mut Self::Inject, msg: PingPongMsg, ctx: &mut Self::Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + msg: PingPongMsg, + _ctx: &mut Self::Context, + ) -> HandleResult { println!("actor1 handle PingPongMsg: {msg:?}"); Ok(()) } } impl Handler for Actor1 { - async fn handle(&mut self, _: &mut Self::Inject, msg: ReqMsg, ctx: &mut Self::Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + msg: ReqMsg, + _ctx: &mut Self::Context, + ) -> HandleResult { println!("actor1 handle ReqMsg: {msg:?}"); self.resp_tx.send(RespMsg::Ok).await?; Ok(()) @@ -68,7 +76,7 @@ pub mod actor1 { } pub mod actor2 { - use uactor::actor::{Actor, Handler, HandleResult}; + use uactor::actor::{Actor, HandleResult, Handler}; use uactor::context::Context; use crate::messages::RespMsg; @@ -76,7 +84,12 @@ pub mod actor2 { pub struct Actor2; impl Handler for Actor2 { - async fn handle(&mut self, _: &mut Self::Inject, msg: RespMsg, _: &mut Self::Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + msg: RespMsg, + _: &mut Self::Context, + ) -> HandleResult { println!("actor2 handle RespMsg: {msg:?}"); Ok(()) } diff --git a/src/uactor/examples/dependency_injection.rs b/src/uactor/examples/dependency_injection.rs index b975f6b..b8e8918 100644 --- a/src/uactor/examples/dependency_injection.rs +++ b/src/uactor/examples/dependency_injection.rs @@ -1,6 +1,4 @@ use time::ext::NumericalStdDuration; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; use uactor::actor::MessageSender; use uactor::system::System; @@ -36,9 +34,9 @@ mod messages { mod actor1 { use tokio::sync::mpsc::UnboundedSender; - use uactor::actor::{Actor, EmptyState, Handler, HandleResult, MessageSender}; - use uactor::context::Context; + use uactor::actor::{Actor, EmptyState, HandleResult, Handler, MessageSender}; use uactor::context::extensions::Service; + use uactor::context::Context; use uactor::di::{Inject, InjectError}; use uactor::system::System; @@ -56,11 +54,12 @@ mod actor1 { impl Inject for Services { async fn inject(system: &System) -> Result - where - Self: Sized, + where + Self: Sized, { let service1 = system.get_service()?; - let actor2_ref = system.get_actor::>>("actor2".into())?; + let actor2_ref = + system.get_actor::>>("actor2".into())?; Ok(Services::new(service1, actor2_ref)) } } @@ -71,7 +70,12 @@ mod actor1 { } impl Handler for Actor1 { - async fn handle(&mut self, Services { service1, .. }: &mut Self::Inject, ping: PingMsg, ctx: &mut Context) -> HandleResult { + async fn handle( + &mut self, + Services { service1, .. }: &mut Self::Inject, + ping: PingMsg, + _ctx: &mut Context, + ) -> HandleResult { println!("actor1: Received ping message"); service1.do_something(); @@ -83,7 +87,12 @@ mod actor1 { } impl Handler for Actor1 { - async fn handle(&mut self, Services { actor2_ref, .. }: &mut Self::Inject, msg: MessageWithoutReply, ctx: &mut Context) -> HandleResult { + async fn handle( + &mut self, + Services { actor2_ref, .. }: &mut Self::Inject, + msg: MessageWithoutReply, + _ctx: &mut Context, + ) -> HandleResult { println!("actor1: Received {msg:?} message, sending PrintMessage to the actor2"); actor2_ref.send(PrintMessage::new(msg.into()))?; Ok(()) @@ -91,13 +100,12 @@ mod actor1 { } uactor::generate_actor_ref!(Actor1, { PingMsg, MessageWithoutReply }, EmptyState); - } mod actor2 { - use uactor::actor::{Actor, EmptyState, Handler, HandleResult}; - use uactor::context::Context; + use uactor::actor::{Actor, EmptyState, HandleResult, Handler}; use uactor::context::extensions::Service; + use uactor::context::Context; use uactor::di::{Inject, InjectError}; use uactor::system::System; @@ -111,8 +119,8 @@ mod actor2 { impl Inject for Services { async fn inject(system: &System) -> Result - where - Self: Sized, + where + Self: Sized, { let service2 = system.get_service::()?; Ok(Services(service2)) @@ -125,7 +133,12 @@ mod actor2 { } impl Handler for Actor2 { - async fn handle(&mut self, Services(service2): &mut Self::Inject, ping: PingMsg, _: &mut Context) -> HandleResult { + async fn handle( + &mut self, + Services(service2): &mut Self::Inject, + ping: PingMsg, + _ctx: &mut Context, + ) -> HandleResult { println!("actor2: Received ping message"); service2.do_something(); @@ -137,7 +150,12 @@ mod actor2 { } impl Handler for Actor2 { - async fn handle(&mut self, _: &mut Self::Inject, msg: PrintMessage, _: &mut Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + msg: PrintMessage, + _ctx: &mut Context, + ) -> HandleResult { println!("actor2: Received message: {msg:?}"); Ok(()) } @@ -192,25 +210,21 @@ async fn main() -> anyhow::Result<()> { let actor2 = Actor2; let (actor2_ref, _) = uactor::spawn_with_ref!(system, actor2: Actor2); - // Run actors system.run_actor::(actor1_ref.name()).await?; system.run_actor::(actor2_ref.name()).await?; // Case #1: send messages and call injected (not from &self) services inside handlers - println!("-- Case #1: send messages and call injected (not from &self) services inside handlers"); - let pong1 = actor1_ref - .ask::(|reply| PingMsg(reply)) - .await?; - let pong2 = actor2_ref - .ask::(|reply| PingMsg(reply)) - .await?; + println!( + "-- Case #1: send messages and call injected (not from &self) services inside handlers" + ); + let pong1 = actor1_ref.ask::(PingMsg).await?; + let pong2 = actor2_ref.ask::(PingMsg).await?; println!("main: received {pong1:?} and {pong2:?} messages"); // Case #2: send message#1 to actor1 and reply to actor2 without actor2 reference inside message#1 println!("\n-- Case #2: send message#1 to actor1 and reply to actor2 without actor2 reference inside message#1"); - let pong1 = actor1_ref - .send(MessageWithoutReply("login:password".to_owned()))?; + actor1_ref.send(MessageWithoutReply("login:password".to_owned()))?; tokio::time::sleep(1.std_milliseconds()).await; Ok(()) diff --git a/src/uactor/examples/interval.rs b/src/uactor/examples/interval.rs index 05cf526..4b2d980 100644 --- a/src/uactor/examples/interval.rs +++ b/src/uactor/examples/interval.rs @@ -1,4 +1,3 @@ -use anyhow::Context; use time::ext::NumericalStdDuration; use uactor::actor::MessageSender; @@ -23,7 +22,7 @@ mod messages { } mod actor1 { - use uactor::actor::{Actor, EmptyState, Handler, HandleResult}; + use uactor::actor::{Actor, EmptyState, HandleResult, Handler}; use uactor::context::Context; use uactor::message::IntervalMessage; @@ -40,7 +39,12 @@ mod actor1 { } impl Handler for Actor1 { - async fn handle(&mut self, _: &mut Self::Inject, ping: PingMsg, _: &mut Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + ping: PingMsg, + _ctx: &mut Context, + ) -> HandleResult { println!("actor1: Received ping message"); let PingMsg(reply) = ping; let _ = reply.send(PongMsg); @@ -49,9 +53,20 @@ mod actor1 { } impl Handler for Actor1 { - async fn handle(&mut self, _: &mut Self::Inject, IntervalMessage { time: _, duration }: IntervalMessage, _: &mut Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + IntervalMessage { + time: _, + duration: _, + }: IntervalMessage, + _ctx: &mut Context, + ) -> HandleResult { self.interval_count += 1; - println!("actor1: received {}nd interval message", self.interval_count); + println!( + "actor1: received {}nd interval message", + self.interval_count + ); Ok(()) } } @@ -68,13 +83,11 @@ async fn main() -> anyhow::Result<()> { // 1 second interval let interval = tokio::time::interval(1.std_seconds()); - let (mut actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1, interval); + let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1, interval); system.run_actor::(actor1_ref.name()).await?; - let pong = actor1_ref - .ask::(|reply| PingMsg(reply)) - .await?; + let pong = actor1_ref.ask::(PingMsg).await?; println!("main: received {pong:?} message"); // waiting 10 seconds and expecting new message each 1 second diff --git a/src/uactor/examples/single_channel_actor.rs b/src/uactor/examples/single_channel_actor.rs index 9b82231..571f632 100644 --- a/src/uactor/examples/single_channel_actor.rs +++ b/src/uactor/examples/single_channel_actor.rs @@ -20,7 +20,7 @@ mod messages { } mod actor1 { - use uactor::actor::{Actor, EmptyState, Handler, HandleResult}; + use uactor::actor::{Actor, EmptyState, HandleResult, Handler}; use uactor::context::Context; use crate::messages::{PingMsg, PongMsg}; @@ -33,7 +33,12 @@ mod actor1 { } impl Handler for Actor1 { - async fn handle(&mut self, _: &mut Self::Inject, ping: PingMsg, _: &mut Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + ping: PingMsg, + _: &mut Context, + ) -> HandleResult { println!("actor1: Received ping message"); let PingMsg(reply) = ping; let _ = reply.send(PongMsg); @@ -54,8 +59,7 @@ async fn main() -> anyhow::Result<()> { system.run_actor::(actor1_ref.name()).await?; - let pong = actor1_ref.ask(|reply| PingMsg(reply)) - .await?; + let pong = actor1_ref.ask(PingMsg).await?; println!("main: received {pong:?} message"); Ok(()) diff --git a/src/uactor/examples/supervised_actor.rs b/src/uactor/examples/supervised_actor.rs index 7cb995a..296bbec 100644 --- a/src/uactor/examples/supervised_actor.rs +++ b/src/uactor/examples/supervised_actor.rs @@ -1,4 +1,3 @@ -use std::ops::Shl; use std::time::Duration; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; @@ -26,12 +25,12 @@ mod messages { } mod actor1 { - use tokio::sync::mpsc; - use uactor::actor::{Actor, EmptyState, Handler, HandleResult, MessageSender}; - use uactor::context::ActorContext; - use uactor::context::supervised::SupervisedContext; use crate::messages::{PingMsg, PongMsg}; use crate::supervisor::{SupervisorMsg, SupervisorRef}; + use tokio::sync::mpsc; + use uactor::actor::{Actor, EmptyState, HandleResult, Handler}; + use uactor::context::supervised::SupervisedContext; + use uactor::context::ActorContext; pub struct Actor1; @@ -41,7 +40,12 @@ mod actor1 { } impl Handler for Actor1 { - async fn handle(&mut self, _: &mut Self::Inject, ping: PingMsg, ctx: &mut Self::Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + ping: PingMsg, + ctx: &mut Self::Context, + ) -> HandleResult { println!("actor1: Received ping message"); let PingMsg(reply) = ping; let _ = reply.send(PongMsg); @@ -54,9 +58,8 @@ mod actor1 { } mod supervisor { - use uactor::actor::{Actor, EmptyState, Handler, HandleResult, MessageSender}; + use uactor::actor::{Actor, EmptyState, HandleResult, Handler}; use uactor::context::{ActorDied, Context}; - use uactor::data_publisher::{DataPublisher, DataPublisherResult, TryClone}; pub struct Supervisor; @@ -66,7 +69,12 @@ mod supervisor { } impl Handler for Supervisor { - async fn handle(&mut self, _: &mut Self::Inject, ActorDied(name): ActorDied, _: &mut Context) -> HandleResult { + async fn handle( + &mut self, + _: &mut Self::Inject, + ActorDied(name): ActorDied, + _: &mut Context, + ) -> HandleResult { println!("Actor with name: {name:?} - died"); Ok(()) } @@ -90,11 +98,12 @@ async fn main() -> anyhow::Result<()> { let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1); let (supervisor_ref, _) = uactor::spawn_with_ref!(system, supervisor: Supervisor); - system.run_actor::(supervisor_ref.name()).await?; + system + .run_actor::(supervisor_ref.name()) + .await?; system.run_actor::(actor1_ref.name()).await?; - let pong = actor1_ref.ask(|reply| PingMsg(reply)) - .await?; + let pong = actor1_ref.ask(PingMsg).await?; println!("main: received {pong:?} message"); tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/src/uactor/src/actor.rs b/src/uactor/src/actor.rs index 995863e..3d02f7e 100644 --- a/src/uactor/src/actor.rs +++ b/src/uactor/src/actor.rs @@ -1,6 +1,7 @@ -use std::sync::Arc; -use crate::context::{ActorContext, Context}; +use crate::context::ActorContext; use crate::message::Message; +use std::future::Future; +use std::sync::Arc; pub trait State: std::any::Any + Send + 'static {} impl State for T {} @@ -18,16 +19,16 @@ where type Inject: Inject + Sized; - async fn create_state(&mut self) -> Arc { - Arc::new(Default::default()) + fn create_state(&mut self) -> impl Future> + Send { + async { Arc::new(Default::default()) } } - async fn pre_start( + fn pre_start( &mut self, state: &mut Self::Inject, ctx: &mut Self::Context, - ) -> ActorPreStartResult<()> { - Ok(()) + ) -> impl Future> + Send { + async { Ok(()) } } } #[macro_export] @@ -83,10 +84,10 @@ where M: Message, { fn send(&self, msg: M) -> crate::data_publisher::DataPublisherResult; - async fn ask( + fn ask( &self, f: impl FnOnce(tokio::sync::oneshot::Sender) -> M, - ) -> Result; + ) -> impl Future>; } #[cfg(feature = "async_sender")] diff --git a/src/uactor/src/context.rs b/src/uactor/src/context.rs index e640f34..d5f682d 100644 --- a/src/uactor/src/context.rs +++ b/src/uactor/src/context.rs @@ -1,8 +1,6 @@ -use crate::actor::MessageSender; -use crate::data_publisher::{DataPublisher, TryClone}; use crate::message::Message; -use crate::message_impl; -use crate::system::{utils, System}; +use crate::system::System; +use std::future::Future; use std::sync::Arc; pub type ContextResult = Result>; @@ -27,7 +25,10 @@ pub trait ActorContext: Sized + Unpin + 'static { fn is_alive(&mut self) -> bool { true } - async fn create(system: &mut System, name: Arc) -> ContextInitializationError; + fn create( + system: &mut System, + name: Arc, + ) -> impl Future> + Send; } pub struct ActorDied(pub Arc); @@ -65,7 +66,7 @@ impl ActorContext for Context { pub mod supervised { use crate::actor::MessageSender; use crate::context::{ActorContext, ActorDied, ContextInitializationError, ContextResult}; - use crate::data_publisher::{DataPublisher, TryClone}; + use crate::data_publisher::TryClone; use crate::system::{utils, System}; use std::sync::Arc; @@ -75,7 +76,7 @@ pub mod supervised { T: MessageSender, { pub alive: bool, - id: usize, + _id: usize, supervisor: T, name: Arc, } @@ -111,7 +112,7 @@ pub mod supervised { let msg = format!("SupervisedContext can't be used with more than one actor: {:?} of the same kind", utils::type_name::()); tracing::error!(msg); return Err(msg); - } else if found_actors.len() == 0 { + } else if found_actors.is_empty() { let msg = format!( "SupervisedContext can't be used without selected supervisor's actor: {:?}", utils::type_name::() @@ -123,7 +124,7 @@ pub mod supervised { let supervisor = found_actors.remove(0); Ok(Self { alive: true, - id: rand::random(), + _id: rand::random(), supervisor, name, }) @@ -137,7 +138,6 @@ pub mod extensions { use std::fmt; use std::hash::{BuildHasherDefault, Hasher}; use std::ops::{Deref, DerefMut}; - use std::os::macos::raw::stat; use std::sync::Arc; type AnyMap = HashMap, BuildHasherDefault>; @@ -195,7 +195,8 @@ pub mod extensions { // pub fn insert(&mut self, val: T) -> Option { pub fn insert(&mut self, val: T) -> Option { self.map - .get_or_insert_with(|| Box::, BuildHasherDefault>>::default()) + .get_or_insert_with(Box::, BuildHasherDefault>>::default + ) .insert(TypeId::of::(), Box::new(val)) .and_then(|boxed| { (boxed as Box) @@ -303,7 +304,7 @@ pub mod extensions { self.map.as_ref().map_or(true, |map| map.is_empty()) } - /// Get the numer of extensions available. + /// Get the number of extensions available. /// /// # Example /// @@ -380,9 +381,7 @@ pub mod extensions { #[derive(Debug, Clone, Copy)] #[must_use] pub enum Actor { - NamedActor { - name: &'static str - }, + NamedActor { name: &'static str }, All, First, } diff --git a/src/uactor/src/data_publisher.rs b/src/uactor/src/data_publisher.rs index 8e44e6b..f826ff5 100644 --- a/src/uactor/src/data_publisher.rs +++ b/src/uactor/src/data_publisher.rs @@ -8,7 +8,10 @@ mod async_sender { pub trait DataPublisher: TryClone { type Item; - fn publish(&self, data: Self::Item) -> impl std::future::Future + Send; + fn publish( + &self, + data: Self::Item, + ) -> impl std::future::Future + Send; } #[derive(thiserror::Error, Debug)] @@ -56,21 +59,19 @@ mod async_sender { pub type DataPublisherResult = Result<(), DataPublisherErrors>; impl DataPublisher for mpsc::Sender - where - T: Send, + where + T: Send, { type Item = T; async fn publish(&self, data: Self::Item) -> DataPublisherResult { - self.send(data) - .await - .map_err(DataPublisherErrors::from) + self.send(data).await.map_err(DataPublisherErrors::from) } } impl TryClone for Sender - where - T: Send, + where + T: Send, { fn try_clone(&self) -> Result { Ok(self.clone()) @@ -78,20 +79,19 @@ mod async_sender { } impl DataPublisher for mpsc::UnboundedSender - where - T: Send, + where + T: Send, { type Item = T; async fn publish(&self, data: Self::Item) -> DataPublisherResult { - self.send(data) - .map_err(DataPublisherErrors::from) + self.send(data).map_err(DataPublisherErrors::from) } } impl TryClone for UnboundedSender - where - T: Send, + where + T: Send, { fn try_clone(&self) -> Result { Ok(self.clone()) @@ -99,8 +99,8 @@ mod async_sender { } impl DataPublisher for watch::Sender - where - T: Send + Sync, + where + T: Send + Sync, { type Item = T; @@ -110,8 +110,8 @@ mod async_sender { } impl TryClone for watch::Sender - where - T: Send, + where + T: Send, { fn try_clone(&self) -> Result { Err(TryCloneError::CantClone) @@ -119,8 +119,8 @@ mod async_sender { } impl DataPublisher for broadcast::Sender - where - T: Send + Sync, + where + T: Send + Sync, { type Item = T; @@ -132,8 +132,8 @@ mod async_sender { } impl TryClone for broadcast::Sender - where - T: Send, + where + T: Send, { fn try_clone(&self) -> Result { Ok(self.clone()) @@ -191,20 +191,19 @@ mod sync_sender { pub type DataPublisherResult = Result<(), DataPublisherErrors>; impl DataPublisher for mpsc::UnboundedSender - where - T: Send, + where + T: Send, { type Item = T; fn publish(&self, data: Self::Item) -> DataPublisherResult { - self.send(data) - .map_err(DataPublisherErrors::from) + self.send(data).map_err(DataPublisherErrors::from) } } impl TryClone for UnboundedSender - where - T: Send, + where + T: Send, { fn try_clone(&self) -> Result { Ok(self.clone()) @@ -212,8 +211,8 @@ mod sync_sender { } impl DataPublisher for watch::Sender - where - T: Send + Sync, + where + T: Send + Sync, { type Item = T; @@ -223,8 +222,8 @@ mod sync_sender { } impl TryClone for watch::Sender - where - T: Send, + where + T: Send, { fn try_clone(&self) -> Result { Err(TryCloneError::CantClone) @@ -232,8 +231,8 @@ mod sync_sender { } impl DataPublisher for broadcast::Sender - where - T: Send + Sync, + where + T: Send + Sync, { type Item = T; @@ -245,8 +244,8 @@ mod sync_sender { } impl TryClone for broadcast::Sender - where - T: Send, + where + T: Send, { fn try_clone(&self) -> Result { Ok(self.clone()) diff --git a/src/uactor/src/datasource.rs b/src/uactor/src/datasource.rs index 8f19029..193cb34 100644 --- a/src/uactor/src/datasource.rs +++ b/src/uactor/src/datasource.rs @@ -38,8 +38,8 @@ pub trait DataSource { } impl DataSource for mpsc::Receiver - where - T: Send, +where + T: Send, { type Item = T; @@ -53,8 +53,8 @@ impl DataSource for mpsc::Receiver } impl DataSource for mpsc::UnboundedReceiver - where - T: Send, +where + T: Send, { type Item = T; @@ -68,14 +68,13 @@ impl DataSource for mpsc::UnboundedReceiver } impl DataSource for watch::Receiver - where - T: Clone + Send + Sync, +where + T: Clone + Send + Sync, { type Item = T; async fn next(&mut self) -> DataSourceResult { - let _ = self - .changed() + self.changed() .await .map_err(|_| DataSourceErrors::ChannelClosed)?; let value = self.borrow().clone(); @@ -84,8 +83,8 @@ impl DataSource for watch::Receiver } impl DataSource for broadcast::Receiver - where - T: Clone + Send + Sync, +where + T: Clone + Send + Sync, { type Item = T; @@ -95,8 +94,8 @@ impl DataSource for broadcast::Receiver } impl DataSource for oneshot::Receiver - where - T: Send, +where + T: Send, { type Item = T; @@ -118,7 +117,7 @@ impl DataSource for Interval { } pub struct TokioTcpListenerDataSource { - tcp_listener: TcpListener + tcp_listener: TcpListener, } impl TokioTcpListenerDataSource { @@ -127,9 +126,7 @@ impl TokioTcpListenerDataSource { let tcp_listener = TcpListener::bind(&addr).await.unwrap(); - Self { - tcp_listener - } + Self { tcp_listener } } } @@ -137,8 +134,12 @@ impl DataSource for TokioTcpListenerDataSource { type Item = (TcpStream, std::net::SocketAddr); async fn next(&mut self) -> DataSourceResult { - let socket_addr = self.tcp_listener.accept().await.map_err(DataSourceErrors::IoError)?; + let socket_addr = self + .tcp_listener + .accept() + .await + .map_err(DataSourceErrors::IoError)?; Ok(socket_addr) } -} \ No newline at end of file +} diff --git a/src/uactor/src/di.rs b/src/uactor/src/di.rs index 941d3d6..10d1291 100644 --- a/src/uactor/src/di.rs +++ b/src/uactor/src/di.rs @@ -1,5 +1,7 @@ +use std::future::Future; + use crate::context::actor_registry::ActorRegistryErrors; -use crate::context::extensions::{ExtensionErrors, Service}; +use crate::context::extensions::ExtensionErrors; use crate::system::System; #[derive(thiserror::Error, Debug)] @@ -33,18 +35,18 @@ pub enum InjectError { /// } /// ``` pub trait Inject { - async fn inject(system: &System) -> Result - where - Self: Sized; + fn inject(system: &System) -> impl Future> + Send + where + Self: Sized; } pub mod inject_impls { - use std::sync::Arc; use crate::actor::NamedActorRef; - use crate::context::extensions::{ExtensionErrors, Service}; + use crate::context::extensions::Service; use crate::data_publisher::TryClone; use crate::di::{Inject, InjectError}; use crate::system::System; + use std::sync::Arc; impl Inject for () { async fn inject(_: &System) -> Result @@ -55,40 +57,43 @@ pub mod inject_impls { } } - impl Inject for (T1) + impl Inject for T1 where - T1: DependencyProvider, + T1: DependencyProvider, { async fn inject(system: &System) -> Result where Self: Sized, { - let result = T1::get_dependency(&system)?; + let result = T1::get_dependency(system)?; Ok(result) } } - impl Inject for (T1, T2) where - T1: DependencyProvider, - T2: DependencyProvider, + impl Inject for (T1, T2) + where + T1: DependencyProvider, + T2: DependencyProvider, { async fn inject(system: &System) -> Result where Self: Sized, { - let t1 = T1::get_dependency(&system)?; - let t2 = T2::get_dependency(&system)?; + let t1 = T1::get_dependency(system)?; + let t2 = T2::get_dependency(system)?; Ok((t1, t2)) } } - pub trait DependencyProvider { type Dependency; fn get_dependency(system: &System) -> Result; } - impl DependencyProvider for Service where T: Clone + Send + Sync + 'static { + impl DependencyProvider for Service + where + T: Clone + Send + Sync + 'static, + { type Dependency = Service; fn get_dependency(system: &System) -> Result { @@ -109,4 +114,4 @@ pub mod inject_impls { Ok(actor) } } -} \ No newline at end of file +} diff --git a/src/uactor/src/message.rs b/src/uactor/src/message.rs index 9948d6e..dc1ddd9 100644 --- a/src/uactor/src/message.rs +++ b/src/uactor/src/message.rs @@ -15,11 +15,47 @@ pub trait Message { } } -impl Message for Result where A: Message, B: Message { fn static_name() -> &'static str { type_name::>() } } -impl Message for Option where A: Message { fn static_name() -> &'static str { type_name::>() }} -impl Message for Arc where A: Message { fn static_name() -> &'static str { type_name::>() }} -impl Message for Mutex where A: Message { fn static_name() -> &'static str { type_name::>() }} -impl Message for RwLock where A: Message { fn static_name() -> &'static str { type_name::>() }} +impl Message for Result +where + A: Message, + B: Message, +{ + fn static_name() -> &'static str { + type_name::>() + } +} +impl Message for Option +where + A: Message, +{ + fn static_name() -> &'static str { + type_name::>() + } +} +impl Message for Arc +where + A: Message, +{ + fn static_name() -> &'static str { + type_name::>() + } +} +impl Message for Mutex +where + A: Message, +{ + fn static_name() -> &'static str { + type_name::>() + } +} +impl Message for RwLock +where + A: Message, +{ + fn static_name() -> &'static str { + type_name::>() + } +} #[macro_export] macro_rules! message_impl { @@ -44,4 +80,4 @@ pub type Reply = tokio::sync::oneshot::Sender; message_impl! { IntervalMessage, Empty, i64, i32, i16, i8, u64, u32, u16, u8, f64, f32, String, NonZeroI64, NonZeroI32, NonZeroI16, NonZeroI8, NonZeroU64, NonZeroU32, NonZeroU16, NonZeroU8 } #[cfg(feature = "bytes")] -impl Message for bytes::BytesMut { } \ No newline at end of file +impl Message for bytes::BytesMut {} diff --git a/src/uactor/src/select.rs b/src/uactor/src/select.rs index ece18c2..b3ae73d 100644 --- a/src/uactor/src/select.rs +++ b/src/uactor/src/select.rs @@ -1,11 +1,15 @@ use crate::actor::{Actor, HandleResult, Handler}; -use crate::context::Context; use crate::datasource::DataSource; use crate::message::Message; use std::future::pending; pub trait ActorSelect { - fn select(&mut self, inject: &mut A::Inject, ctx: &mut ::Context, actor: &mut A) -> impl std::future::Future + Send; + fn select( + &mut self, + inject: &mut A::Inject, + ctx: &mut ::Context, + actor: &mut A, + ) -> impl std::future::Future + Send; } pub type SelectResult = HandleResult; @@ -13,10 +17,8 @@ pub type SelectResult = HandleResult; #[doc(hidden)] #[allow(non_snake_case)] mod select_from_tuple { - use std::any::type_name; - use tracing::Instrument; - use crate::context::ActorContext; use super::*; + use std::any::type_name; macro_rules! select_from_tuple { ($($T: ident),*) => { @@ -47,24 +49,34 @@ mod select_from_tuple { where ::Inject: Send, { - async fn select(&mut self, _: &mut A::Inject, _: &mut ::Context, _: &mut A) -> SelectResult { + async fn select( + &mut self, + _: &mut A::Inject, + _: &mut ::Context, + _: &mut A, + ) -> SelectResult { pending::().await } } impl ActorSelect for S1 - where - S1::Item: Message + Send, - S1: DataSource + Send, - A: Handler + Send, - ::Inject: Send, + where + S1::Item: Message + Send, + S1: DataSource + Send, + A: Handler + Send, + ::Inject: Send, { #[cfg(feature = "tokio_tracing")] - async fn select(&mut self, inject: &mut A::Inject, ctx: &mut ::Context, actor: &mut A) -> SelectResult { + async fn select( + &mut self, + inject: &mut A::Inject, + ctx: &mut ::Context, + actor: &mut A, + ) -> SelectResult { // let message_name = ::Item::static_name(); - let message_name: &'static str = type_name::<::Item>(); + let _: &'static str = type_name::<::Item>(); if let Ok(msg) = self.next().await { - let _ = actor.handle(inject, msg, ctx).await?; + actor.handle(inject, msg, ctx).await?; } else { tracing::error!("Channel closed"); } @@ -72,7 +84,12 @@ mod select_from_tuple { } #[cfg(not(feature = "tokio_tracing"))] - async fn select(&mut self, inject: &mut A::Inject, ctx: &mut ::Context, actor: &mut A) -> SelectResult { + async fn select( + &mut self, + inject: &mut A::Inject, + ctx: &mut ::Context, + actor: &mut A, + ) -> SelectResult { if let Ok(msg) = self.next().await { let _ = actor.handle(inject, msg, ctx).await?; } else { diff --git a/src/uactor/src/system.rs b/src/uactor/src/system.rs index df81d19..75537f0 100644 --- a/src/uactor/src/system.rs +++ b/src/uactor/src/system.rs @@ -1,18 +1,16 @@ use crate::actor::Actor; +use crate::context::actor_registry::{ActorRegistry, ActorRegistryErrors}; use crate::context::extensions::{ExtensionErrors, Extensions, Service}; -use crate::context::{ActorContext, Context, ContextInitializationError, ContextResult}; +use crate::context::ActorContext; +use crate::data_publisher::{TryClone, TryCloneError}; use crate::di::{Inject, InjectError}; use crate::select::ActorSelect; use crate::system::builder::SystemBuilder; use std::any::Any; use std::collections::HashMap; use std::sync::Arc; -use futures::StreamExt; use tokio::sync::oneshot; use tokio::task::JoinHandle; -use crate::context::actor_registry::{ActorRegistry, ActorRegistryErrors}; -use crate::data_publisher::{TryClone, TryCloneError}; -use crate::datasource::DataSource; #[derive(thiserror::Error, Debug)] pub enum ActorRunningError { @@ -47,45 +45,62 @@ impl System { impl System { pub fn get_service(&self) -> Result, ExtensionErrors> - where - T: Clone + Send + Sync + 'static, + where + T: Clone + Send + Sync + 'static, { let service = self.get::>()?; Ok(service.clone()) } pub fn get_actor(&self, actor_name: Arc) -> Result - where - A: TryClone + Send + Sync + 'static, + where + A: TryClone + Send + Sync + 'static, { - let actor_ref: &A = self.actor_registry.get_actor::(actor_name.clone()) + let actor_ref: &A = self + .actor_registry + .get_actor::(actor_name.clone()) .ok_or_else(|| { let system_name = self.name.clone(); let kind = utils::type_name::(); let actor_name = actor_name.clone(); - ActorRegistryErrors::NotRegisteredActor { system_name, kind, actor_name } + ActorRegistryErrors::NotRegisteredActor { + system_name, + kind, + actor_name, + } })?; let a = actor_ref.try_clone()?; Ok(a) } pub fn get_actors(&self) -> Result, ActorRegistryErrors> - where - A: TryClone + Send + Sync + 'static, + where + A: TryClone + Send + Sync + 'static, { - let actor_ref = self.actor_registry.get_all() + let actor_ref = self + .actor_registry + .get_all() .ok_or_else(|| { let system_name = self.name.clone(); let kind = std::any::type_name::().to_owned(); ActorRegistryErrors::NotRegisteredActorKind { system_name, kind } - })?.into_iter() + })? + .into_iter() .map(|i: &A| i.try_clone()) .collect::, TryCloneError>>()?; Ok(actor_ref) } - pub fn insert_actor(&mut self, actor_name: Arc, actor_ref: T) { - tracing::info!("Insert actor: {actor_name:?}: {} into system context: {:?}", std::any::type_name::(), self.name); + pub fn insert_actor( + &mut self, + actor_name: Arc, + actor_ref: T, + ) { + tracing::info!( + "Insert actor: {actor_name:?}: {} into system context: {:?}", + std::any::type_name::(), + self.name + ); self.actor_registry.insert::(actor_name, actor_ref); } @@ -98,8 +113,8 @@ impl System { } pub fn get(&self) -> Result<&T, ExtensionErrors> - where - T: Clone + Send + Sync + 'static, + where + T: Clone + Send + Sync + 'static, { let option = self.extensions.get::(); if let Some(extension) = option { @@ -116,17 +131,21 @@ impl System { impl System { pub async fn run_actor(&mut self, actor_name: Arc) -> Result<(), ActorRunningError> - where - A: Actor + Any, - ::Inject: Inject + Sized + Send, + where + A: Actor + Any, + ::Inject: Inject + Sized + Send, { if let Some(tx) = self.initialized_actors.remove(&actor_name) { let state_res = A::Inject::inject(self).await; - let ctx = A::Context::create(self, actor_name.clone()).await.map_err(|err| ActorRunningError::ContextError(err))?; + let ctx = A::Context::create(self, actor_name.clone()) + .await + .map_err(ActorRunningError::ContextError)?; if let Err(err) = state_res.as_ref() { - tracing::error!("Can't inject dependencies for {actor_name:?}, actor not started. Err: {err:?}") + tracing::error!( + "Can't inject dependencies for {actor_name:?}, actor not started. Err: {err:?}" + ) } let state = state_res?; @@ -135,16 +154,23 @@ impl System { } } else { eprintln!("actor_name: {:?} already started", actor_name); - return Err(ActorRunningError::MissedInitializationOrAlreadyStarted(actor_name.clone())) + return Err(ActorRunningError::MissedInitializationOrAlreadyStarted( + actor_name.clone(), + )); } Ok(()) } - pub fn init_actor(&mut self, mut actor: A, actor_name: Option>, mut select: S) -> (Arc, JoinHandle<()>) - where - A: Actor + Send, - S: ActorSelect + Send + 'static, - ::Inject: Inject + Sized + Send, + pub fn init_actor( + &mut self, + mut actor: A, + actor_name: Option>, + mut select: S, + ) -> (Arc, JoinHandle<()>) + where + A: Actor + Send, + S: ActorSelect + Send + 'static, + ::Inject: Inject + Sized + Send, { let system_name = self.name.clone(); @@ -154,14 +180,15 @@ impl System { }); let (actor_state_tx, actor_state_rx) = oneshot::channel::>(); - self.initialized_actors.insert(actor_name.clone(), actor_state_tx); + self.initialized_actors + .insert(actor_name.clone(), actor_state_tx); let name = actor_name.clone(); let handle = tokio::spawn(async move { tracing::debug!("The system: {:?} spawned actor: {:?}", system_name, name); if let Ok(boxed_state) = actor_state_rx.await { - let (mut state, mut ctx) = { + let (mut state, mut ctx) = { let boxed_state = boxed_state .downcast::<(::Inject, ::Context)>() .expect("failed to downcast state"); @@ -197,12 +224,10 @@ impl System { } Err(err) => { tracing::error!("Error during actor die: {err:?}"); - return; } - }; + } } else { tracing::error!("Can't run {name:?}, system dropped"); - () } }); (actor_name, handle) @@ -210,9 +235,9 @@ impl System { } pub mod builder { - use std::sync::Arc; use crate::context::extensions::{Extensions, Service}; use crate::system::System; + use std::sync::Arc; const GLOBAL_SYSTEM_NAME: &str = "Global"; @@ -246,12 +271,16 @@ pub mod builder { } pub fn build(self) -> System { - System::new(Arc::from(self.name.as_str()), self.extensions, Default::default(), Default::default()) + System::new( + Arc::from(self.name.as_str()), + self.extensions, + Default::default(), + Default::default(), + ) } } } - pub mod utils { pub fn type_name() -> String { let type_full_name = std::any::type_name::(); @@ -262,4 +291,4 @@ pub mod utils { .to_owned(); type_name } -} \ No newline at end of file +}