Skip to content

Commit

Permalink
implemented shared state for actors and actor_ref
Browse files Browse the repository at this point in the history
  • Loading branch information
EnvOut committed Aug 23, 2024
1 parent 0a77b00 commit 0a90748
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 101 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [
]

[workspace.dependencies]
tokio = { version = "1.35.0", features = ["net", "sync", "time", "rt", "macros"] }
tokio = { version = "1.35.0", features = ["net", "sync", "time", "rt", "macros", "rt-multi-thread"] }
futures = "0.3"

# errors
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Examples can be found [here](src/uactor/examples).
7. Implemented Dependency Injection on pre-start stage to solve cross-references problem ("Actor#1" needs a reference to the "Actor#2", and "Actor#2" needs a reference to "Actor#1")
[Example: dependency injection](src/uactor/examples/dependency_injection.rs)
8. Integration with tokio/tracing, including tracing of actor lifecycle, messages, and handlers
9. Implemented support for actors for which it is necessary to work with multiple message sources (channels) [Example: Multi channel](src/uactor/examples/multiple_incoming_channels.rs)
10. Implemented shared state for actors [Example: Shared state](src/uactor/examples/shared_state.rs)

### Other projects:
1. Actix
Expand Down
5 changes: 1 addition & 4 deletions makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
run_base_example:
cargo run --example base_sample

run_all_examples:
cargo run --example base_sample && \
cargo run --example multiple_incoming_channels && \
cargo run --example dependency_injection && \
cargo run --example interval && \
cargo run --example single_channel_actor
Expand Down
2 changes: 1 addition & 1 deletion src/uactor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "uactor"
version = "0.10.4"
version = "0.11.0"
edition = "2021"
repository = "https://github.com/EnvOut/uactor"
license = "MIT"
Expand Down
14 changes: 10 additions & 4 deletions src/uactor/examples/dependency_injection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod messages {
mod actor1 {
use tokio::sync::mpsc::UnboundedSender;

use uactor::actor::{Actor, EmptyState, HandleResult, Handler, MessageSender};
use uactor::actor::{Actor, HandleResult, Handler, MessageSender};
use uactor::context::extensions::Service;
use uactor::context::Context;
use uactor::di::{Inject, InjectError};
Expand Down Expand Up @@ -65,6 +65,7 @@ mod actor1 {
impl Actor for Actor1 {
type Context = Context;
type Inject = Services;
type State = ();
}

impl Handler<PingMsg> for Actor1 {
Expand All @@ -73,6 +74,7 @@ mod actor1 {
Services { service1, .. }: &mut Self::Inject,
ping: PingMsg,
_ctx: &mut Context,
state: &Self::State,
) -> HandleResult {
println!("actor1: Received ping message");

Expand All @@ -90,18 +92,19 @@ mod actor1 {
Services { actor2_ref, .. }: &mut Self::Inject,
msg: MessageWithoutReply,
_ctx: &mut Context,
state: &Self::State,
) -> HandleResult {
println!("actor1: Received {msg:?} message, sending PrintMessage to the actor2");
actor2_ref.send(PrintMessage::new(msg.into()))?;
Ok(())
}
}

uactor::generate_actor_ref!(Actor1, { PingMsg, MessageWithoutReply }, EmptyState);
uactor::generate_actor_ref!(Actor1, { PingMsg, MessageWithoutReply });
}

mod actor2 {
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::actor::{Actor, HandleResult, Handler};
use uactor::context::extensions::Service;
use uactor::context::Context;
use uactor::di::{Inject, InjectError};
Expand All @@ -128,6 +131,7 @@ mod actor2 {
impl Actor for Actor2 {
type Context = Context;
type Inject = Services;
type State = ();
}

impl Handler<PingMsg> for Actor2 {
Expand All @@ -136,6 +140,7 @@ mod actor2 {
Services(service2): &mut Self::Inject,
ping: PingMsg,
_ctx: &mut Context,
state: &Self::State,
) -> HandleResult {
println!("actor2: Received ping message");

Expand All @@ -153,13 +158,14 @@ mod actor2 {
_: &mut Self::Inject,
msg: PrintMessage,
_ctx: &mut Context,
state: &Self::State,
) -> HandleResult {
println!("actor2: Received message: {msg:?}");
Ok(())
}
}

uactor::generate_actor_ref!(Actor2, { PingMsg, PrintMessage }, EmptyState);
uactor::generate_actor_ref!(Actor2, { PingMsg, PrintMessage });
}

pub mod services {
Expand Down
7 changes: 5 additions & 2 deletions src/uactor/examples/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod messages {
}

mod actor1 {
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::actor::{Actor, HandleResult, Handler};
use uactor::context::Context;
use uactor::message::IntervalMessage;

Expand All @@ -34,6 +34,7 @@ mod actor1 {
impl Actor for Actor1 {
type Context = Context;
type Inject = ();
type State = ();
}

impl Handler<PingMsg> for Actor1 {
Expand All @@ -42,6 +43,7 @@ mod actor1 {
_: &mut Self::Inject,
ping: PingMsg,
_ctx: &mut Context,
state: &Self::State,
) -> HandleResult {
println!("actor1: Received ping message");
let PingMsg(reply) = ping;
Expand All @@ -59,6 +61,7 @@ mod actor1 {
duration: _,
}: IntervalMessage,
_ctx: &mut Context,
state: &Self::State,
) -> HandleResult {
self.interval_count += 1;
println!(
Expand All @@ -69,7 +72,7 @@ mod actor1 {
}
}

uactor::generate_actor_ref!(Actor1, { PingMsg }, EmptyState);
uactor::generate_actor_ref!(Actor1, { PingMsg });
}

#[tokio::main]
Expand Down
9 changes: 7 additions & 2 deletions src/uactor/examples/multiple_incoming_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod actor1 {
impl Actor for Actor1 {
type Context = Context;
type Inject = ();
type State = ();
}

impl Handler<PingPongMsg> for Actor1 {
Expand All @@ -55,6 +56,7 @@ pub mod actor1 {
_: &mut Self::Inject,
msg: PingPongMsg,
_ctx: &mut Self::Context,
state: &Self::State,
) -> HandleResult {
println!("actor1 handle PingPongMsg: {msg:?}");
Ok(())
Expand All @@ -67,6 +69,7 @@ pub mod actor1 {
_: &mut Self::Inject,
msg: ReqMsg,
_ctx: &mut Self::Context,
state: &Self::State,
) -> HandleResult {
println!("actor1 handle ReqMsg: {msg:?}");
self.resp_tx.send(RespMsg::Ok).await?;
Expand All @@ -89,6 +92,7 @@ pub mod actor2 {
_: &mut Self::Inject,
msg: RespMsg,
_: &mut Self::Context,
state: &Self::State,
) -> HandleResult {
println!("actor2 handle RespMsg: {msg:?}");
Ok(())
Expand All @@ -98,6 +102,7 @@ pub mod actor2 {
impl Actor for Actor2 {
type Context = Context;
type Inject = ();
type State = ();
}
}

Expand All @@ -121,8 +126,8 @@ async fn main() {
let mut system = System::global().build();

// Initialize actors
let (actor1_name, handle1) = system.init_actor(actor1, None, (ping_rx, req_rx));
let (actor2_name, handle2) = system.init_actor(actor2, None, resp_rx);
let (actor1_name, shared_state, handle1) = system.init_actor(actor1, None, (ping_rx, req_rx));
let (actor2_name, shared_state, handle2) = system.init_actor(actor2, None, resp_rx);

// Run actors
system.run_actor::<Actor1>(actor1_name).await.unwrap();
Expand Down
67 changes: 67 additions & 0 deletions src/uactor/examples/shared_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::Duration;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use uactor::actor::{Actor, HandleResult, Handler, MessageSender};
use uactor::context::Context;
use uactor::system::System;

use uactor::message::{Message, Reply};
pub struct PingMsg;

uactor::message_impl!(PingMsg);

pub struct Actor1;

#[derive(Default)]
pub struct Actor1State {
pub counter: AtomicU8,
}

impl Actor for Actor1 {
type Context = Context;
type Inject = ();
type State = Actor1State;
}

impl Handler<PingMsg> for Actor1 {
async fn handle(
&mut self,
_: &mut Self::Inject,
ping: PingMsg,
ctx: &mut Self::Context,
state: &Self::State,
) -> HandleResult {
state.counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}

uactor::generate_actor_ref!(Actor1, { PingMsg });

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(LevelFilter::INFO)
.with(tracing_subscriber::fmt::layer())
.init();

let mut system = System::global().build();

let actor1 = Actor1;

let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1);

system.run_actor::<Actor1>(actor1_ref.name()).await?;

let pong = actor1_ref.send(PingMsg);
let pong = actor1_ref.send(PingMsg);
let pong = actor1_ref.send(PingMsg);

tokio::time::sleep(Duration::from_secs(1)).await;

assert_eq!(actor1_ref.state.counter.load(Ordering::Relaxed), 3);

Ok(())
}
7 changes: 4 additions & 3 deletions src/uactor/examples/single_channel_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod messages {
}

mod actor1 {
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::actor::{Actor, HandleResult, Handler};
use uactor::context::Context;

use crate::messages::{PingMsg, PongMsg};
Expand All @@ -28,6 +28,7 @@ mod actor1 {
impl Actor for Actor1 {
type Context = Context;
type Inject = ();
type State = ();
}

impl Handler<PingMsg> for Actor1 {
Expand All @@ -36,6 +37,7 @@ mod actor1 {
_: &mut Self::Inject,
ping: PingMsg,
_: &mut Context,
state: &Self::State,
) -> HandleResult {
println!("actor1: Received ping message");
let PingMsg(reply) = ping;
Expand All @@ -44,7 +46,7 @@ mod actor1 {
}
}

uactor::generate_actor_ref!(Actor1, { PingMsg }, EmptyState);
uactor::generate_actor_ref!(Actor1, { PingMsg });
}

#[tokio::main]
Expand All @@ -54,7 +56,6 @@ async fn main() -> anyhow::Result<()> {
let mut system = System::global().build();

let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1);

system.run_actor::<Actor1>(actor1_ref.name()).await?;

let pong = actor1_ref.ask(PingMsg).await?;
Expand Down
13 changes: 9 additions & 4 deletions src/uactor/examples/supervised_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod actor1 {
use crate::messages::{PingMsg, PongMsg};
use crate::supervisor::{SupervisorMsg, SupervisorRef};
use tokio::sync::mpsc;
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use uactor::actor::{Actor, HandleResult, Handler};
use uactor::context::supervised::SupervisedContext;
use uactor::context::ActorContext;

Expand All @@ -35,6 +35,7 @@ mod actor1 {
impl Actor for Actor1 {
type Context = SupervisedContext<SupervisorRef<mpsc::UnboundedSender<SupervisorMsg>>>;
type Inject = ();
type State = ();
}

impl Handler<PingMsg> for Actor1 {
Expand All @@ -43,6 +44,7 @@ mod actor1 {
_: &mut Self::Inject,
ping: PingMsg,
ctx: &mut Self::Context,
state: &Self::State,
) -> HandleResult {
println!("actor1: Received ping message");
let PingMsg(reply) = ping;
Expand All @@ -52,18 +54,20 @@ mod actor1 {
}
}

uactor::generate_actor_ref!(Actor1, { PingMsg }, EmptyState);
uactor::generate_actor_ref!(Actor1, { PingMsg });
}

mod supervisor {
use uactor::actor::{Actor, EmptyState, HandleResult, Handler};
use std::os::macos::raw::stat;
use uactor::actor::{Actor, HandleResult, Handler};
use uactor::context::{ActorDied, Context};

pub struct Supervisor;

impl Actor for Supervisor {
type Context = Context;
type Inject = ();
type State = ();
}

impl Handler<ActorDied> for Supervisor {
Expand All @@ -72,13 +76,14 @@ mod supervisor {
_: &mut Self::Inject,
ActorDied(name): ActorDied,
_: &mut Context,
state: &Self::State,
) -> HandleResult {
println!("Actor with name: {name:?} - died");
Ok(())
}
}

uactor::generate_actor_ref!(Supervisor, { ActorDied }, EmptyState);
uactor::generate_actor_ref!(Supervisor, { ActorDied });
}

#[tokio::main]
Expand Down
Loading

0 comments on commit 0a90748

Please sign in to comment.