Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor #4

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions src/uactor/examples/base_sample.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::time::Duration;

use tracing::level_filters::LevelFilter;
use tracing_subscriber::fmt::writer::MakeWriterExt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

use uactor::select::ActorSelect;
use uactor::system::System;

use crate::actor1::Actor1;
Expand Down Expand Up @@ -37,7 +35,7 @@ pub mod messages {
}

pub mod actor1 {
use uactor::actor::{Actor, Handler, HandleResult};
use uactor::actor::{Actor, HandleResult, Handler};
use uactor::context::Context;

use crate::messages::{PingPongMsg, ReqMsg, RespMsg};
Expand All @@ -52,14 +50,24 @@ pub mod actor1 {
}

impl Handler<PingPongMsg> for Actor1 {
async fn handle(&mut self, _: &mut Self::Inject, msg: PingPongMsg, ctx: &mut Self::Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
msg: PingPongMsg,
_ctx: &mut Self::Context,
) -> HandleResult {
println!("actor1 handle PingPongMsg: {msg:?}");
Ok(())
}
}

impl Handler<ReqMsg> for Actor1 {
async fn handle(&mut self, _: &mut Self::Inject, msg: ReqMsg, ctx: &mut Self::Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
msg: ReqMsg,
_ctx: &mut Self::Context,
) -> HandleResult {
println!("actor1 handle ReqMsg: {msg:?}");
self.resp_tx.send(RespMsg::Ok).await?;
Ok(())
Expand All @@ -68,15 +76,20 @@ pub mod actor1 {
}

pub mod actor2 {
use uactor::actor::{Actor, Handler, HandleResult};
use uactor::actor::{Actor, HandleResult, Handler};
use uactor::context::Context;

use crate::messages::RespMsg;

pub struct Actor2;

impl Handler<RespMsg> for Actor2 {
async fn handle(&mut self, _: &mut Self::Inject, msg: RespMsg, _: &mut Self::Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
msg: RespMsg,
_: &mut Self::Context,
) -> HandleResult {
println!("actor2 handle RespMsg: {msg:?}");
Ok(())
}
Expand Down
66 changes: 40 additions & 26 deletions src/uactor/examples/dependency_injection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use time::ext::NumericalStdDuration;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use uactor::actor::MessageSender;

use uactor::system::System;
Expand Down Expand Up @@ -36,9 +34,9 @@ mod messages {
mod actor1 {
use tokio::sync::mpsc::UnboundedSender;

use uactor::actor::{Actor, EmptyState, Handler, HandleResult, MessageSender};
use uactor::context::Context;
use uactor::actor::{Actor, EmptyState, HandleResult, Handler, MessageSender};
use uactor::context::extensions::Service;
use uactor::context::Context;
use uactor::di::{Inject, InjectError};
use uactor::system::System;

Expand All @@ -56,11 +54,12 @@ mod actor1 {

impl Inject for Services {
async fn inject(system: &System) -> Result<Self, InjectError>
where
Self: Sized,
where
Self: Sized,
{
let service1 = system.get_service()?;
let actor2_ref = system.get_actor::<Actor2Ref<UnboundedSender<Actor2Msg>>>("actor2".into())?;
let actor2_ref =
system.get_actor::<Actor2Ref<UnboundedSender<Actor2Msg>>>("actor2".into())?;
Ok(Services::new(service1, actor2_ref))
}
}
Expand All @@ -71,7 +70,12 @@ mod actor1 {
}

impl Handler<PingMsg> for Actor1 {
async fn handle(&mut self, Services { service1, .. }: &mut Self::Inject, ping: PingMsg, ctx: &mut Context) -> HandleResult {
async fn handle(
&mut self,
Services { service1, .. }: &mut Self::Inject,
ping: PingMsg,
_ctx: &mut Context,
) -> HandleResult {
println!("actor1: Received ping message");

service1.do_something();
Expand All @@ -83,21 +87,25 @@ mod actor1 {
}

impl Handler<MessageWithoutReply> for Actor1 {
async fn handle(&mut self, Services { actor2_ref, .. }: &mut Self::Inject, msg: MessageWithoutReply, ctx: &mut Context) -> HandleResult {
async fn handle(
&mut self,
Services { actor2_ref, .. }: &mut Self::Inject,
msg: MessageWithoutReply,
_ctx: &mut Context,
) -> HandleResult {
println!("actor1: Received {msg:?} message, sending PrintMessage to the actor2");
actor2_ref.send(PrintMessage::new(msg.into()))?;
Ok(())
}
}

uactor::generate_actor_ref!(Actor1, { PingMsg, MessageWithoutReply }, EmptyState);

}

mod actor2 {
use uactor::actor::{Actor, EmptyState, Handler, HandleResult};
use uactor::context::Context;
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::context::extensions::Service;
use uactor::context::Context;
use uactor::di::{Inject, InjectError};
use uactor::system::System;

Expand All @@ -111,8 +119,8 @@ mod actor2 {

impl Inject for Services {
async fn inject(system: &System) -> Result<Self, InjectError>
where
Self: Sized,
where
Self: Sized,
{
let service2 = system.get_service::<Service2>()?;
Ok(Services(service2))
Expand All @@ -125,7 +133,12 @@ mod actor2 {
}

impl Handler<PingMsg> for Actor2 {
async fn handle(&mut self, Services(service2): &mut Self::Inject, ping: PingMsg, _: &mut Context) -> HandleResult {
async fn handle(
&mut self,
Services(service2): &mut Self::Inject,
ping: PingMsg,
_ctx: &mut Context,
) -> HandleResult {
println!("actor2: Received ping message");

service2.do_something();
Expand All @@ -137,7 +150,12 @@ mod actor2 {
}

impl Handler<PrintMessage> for Actor2 {
async fn handle(&mut self, _: &mut Self::Inject, msg: PrintMessage, _: &mut Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
msg: PrintMessage,
_ctx: &mut Context,
) -> HandleResult {
println!("actor2: Received message: {msg:?}");
Ok(())
}
Expand Down Expand Up @@ -192,25 +210,21 @@ async fn main() -> anyhow::Result<()> {
let actor2 = Actor2;
let (actor2_ref, _) = uactor::spawn_with_ref!(system, actor2: Actor2);


// Run actors
system.run_actor::<Actor1>(actor1_ref.name()).await?;
system.run_actor::<Actor2>(actor2_ref.name()).await?;

// Case #1: send messages and call injected (not from &self) services inside handlers
println!("-- Case #1: send messages and call injected (not from &self) services inside handlers");
let pong1 = actor1_ref
.ask::<PongMsg>(|reply| PingMsg(reply))
.await?;
let pong2 = actor2_ref
.ask::<PongMsg>(|reply| PingMsg(reply))
.await?;
println!(
"-- Case #1: send messages and call injected (not from &self) services inside handlers"
);
let pong1 = actor1_ref.ask::<PongMsg>(PingMsg).await?;
let pong2 = actor2_ref.ask::<PongMsg>(PingMsg).await?;
println!("main: received {pong1:?} and {pong2:?} messages");

// Case #2: send message#1 to actor1 and reply to actor2 without actor2 reference inside message#1
println!("\n-- Case #2: send message#1 to actor1 and reply to actor2 without actor2 reference inside message#1");
let pong1 = actor1_ref
.send(MessageWithoutReply("login:password".to_owned()))?;
actor1_ref.send(MessageWithoutReply("login:password".to_owned()))?;

tokio::time::sleep(1.std_milliseconds()).await;
Ok(())
Expand Down
31 changes: 22 additions & 9 deletions src/uactor/examples/interval.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::Context;
use time::ext::NumericalStdDuration;
use uactor::actor::MessageSender;

Expand All @@ -23,7 +22,7 @@ mod messages {
}

mod actor1 {
use uactor::actor::{Actor, EmptyState, Handler, HandleResult};
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::context::Context;
use uactor::message::IntervalMessage;

Expand All @@ -40,7 +39,12 @@ mod actor1 {
}

impl Handler<PingMsg> for Actor1 {
async fn handle(&mut self, _: &mut Self::Inject, ping: PingMsg, _: &mut Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
ping: PingMsg,
_ctx: &mut Context,
) -> HandleResult {
println!("actor1: Received ping message");
let PingMsg(reply) = ping;
let _ = reply.send(PongMsg);
Expand All @@ -49,9 +53,20 @@ mod actor1 {
}

impl Handler<IntervalMessage> for Actor1 {
async fn handle(&mut self, _: &mut Self::Inject, IntervalMessage { time: _, duration }: IntervalMessage, _: &mut Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
IntervalMessage {
time: _,
duration: _,
}: IntervalMessage,
_ctx: &mut Context,
) -> HandleResult {
self.interval_count += 1;
println!("actor1: received {}nd interval message", self.interval_count);
println!(
"actor1: received {}nd interval message",
self.interval_count
);
Ok(())
}
}
Expand All @@ -68,13 +83,11 @@ async fn main() -> anyhow::Result<()> {
// 1 second interval
let interval = tokio::time::interval(1.std_seconds());

let (mut actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1, interval);
let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1, interval);

system.run_actor::<Actor1>(actor1_ref.name()).await?;

let pong = actor1_ref
.ask::<PongMsg>(|reply| PingMsg(reply))
.await?;
let pong = actor1_ref.ask::<PongMsg>(PingMsg).await?;
println!("main: received {pong:?} message");

// waiting 10 seconds and expecting new message each 1 second
Expand Down
12 changes: 8 additions & 4 deletions src/uactor/examples/single_channel_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod messages {
}

mod actor1 {
use uactor::actor::{Actor, EmptyState, Handler, HandleResult};
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::context::Context;

use crate::messages::{PingMsg, PongMsg};
Expand All @@ -33,7 +33,12 @@ mod actor1 {
}

impl Handler<PingMsg> for Actor1 {
async fn handle(&mut self, _: &mut Self::Inject, ping: PingMsg, _: &mut Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
ping: PingMsg,
_: &mut Context,
) -> HandleResult {
println!("actor1: Received ping message");
let PingMsg(reply) = ping;
let _ = reply.send(PongMsg);
Expand All @@ -54,8 +59,7 @@ async fn main() -> anyhow::Result<()> {

system.run_actor::<Actor1>(actor1_ref.name()).await?;

let pong = actor1_ref.ask(|reply| PingMsg(reply))
.await?;
let pong = actor1_ref.ask(PingMsg).await?;
println!("main: received {pong:?} message");

Ok(())
Expand Down
33 changes: 21 additions & 12 deletions src/uactor/examples/supervised_actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::ops::Shl;
use std::time::Duration;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
Expand Down Expand Up @@ -26,12 +25,12 @@ mod messages {
}

mod actor1 {
use tokio::sync::mpsc;
use uactor::actor::{Actor, EmptyState, Handler, HandleResult, MessageSender};
use uactor::context::ActorContext;
use uactor::context::supervised::SupervisedContext;
use crate::messages::{PingMsg, PongMsg};
use crate::supervisor::{SupervisorMsg, SupervisorRef};
use tokio::sync::mpsc;
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::context::supervised::SupervisedContext;
use uactor::context::ActorContext;

pub struct Actor1;

Expand All @@ -41,7 +40,12 @@ mod actor1 {
}

impl Handler<PingMsg> for Actor1 {
async fn handle(&mut self, _: &mut Self::Inject, ping: PingMsg, ctx: &mut Self::Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
ping: PingMsg,
ctx: &mut Self::Context,
) -> HandleResult {
println!("actor1: Received ping message");
let PingMsg(reply) = ping;
let _ = reply.send(PongMsg);
Expand All @@ -54,9 +58,8 @@ mod actor1 {
}

mod supervisor {
use uactor::actor::{Actor, EmptyState, Handler, HandleResult, MessageSender};
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::context::{ActorDied, Context};
use uactor::data_publisher::{DataPublisher, DataPublisherResult, TryClone};

pub struct Supervisor;

Expand All @@ -66,7 +69,12 @@ mod supervisor {
}

impl Handler<ActorDied> for Supervisor {
async fn handle(&mut self, _: &mut Self::Inject, ActorDied(name): ActorDied, _: &mut Context) -> HandleResult {
async fn handle(
&mut self,
_: &mut Self::Inject,
ActorDied(name): ActorDied,
_: &mut Context,
) -> HandleResult {
println!("Actor with name: {name:?} - died");
Ok(())
}
Expand All @@ -90,11 +98,12 @@ async fn main() -> anyhow::Result<()> {
let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1);
let (supervisor_ref, _) = uactor::spawn_with_ref!(system, supervisor: Supervisor);

system.run_actor::<Supervisor>(supervisor_ref.name()).await?;
system
.run_actor::<Supervisor>(supervisor_ref.name())
.await?;
system.run_actor::<Actor1>(actor1_ref.name()).await?;

let pong = actor1_ref.ask(|reply| PingMsg(reply))
.await?;
let pong = actor1_ref.ask(PingMsg).await?;
println!("main: received {pong:?} message");

tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down
Loading
Loading