Skip to content

Commit

Permalink
Cleaner hook futures
Browse files Browse the repository at this point in the history
  • Loading branch information
udoprog committed Apr 23, 2023
1 parent e98bc29 commit 018947b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 27 deletions.
23 changes: 7 additions & 16 deletions crates/oxidize-chat/src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl ChatLoop<'_> {
None
};

let mut hook_futures = common::Futures::new();
let mut hook_futures = Vec::new();

for module in modules {
tracing::trace!("Initializing module: {}", module.ty());
Expand Down Expand Up @@ -320,6 +320,10 @@ impl ChatLoop<'_> {
messages_future
}

for future in &mut hook_futures {
futures.push(future.as_mut());
}

// We maintain separate collections of local futures which we add
// potentially user-defined tasks to, to allow them to .await
// concurrently with the main chat task.
Expand Down Expand Up @@ -400,19 +404,6 @@ impl ChatLoop<'_> {
}
}
}
Some(future) = hook_futures.next() => {
match future {
Ok(..) => {
tracing::warn!("Chat component exited, exiting...");
return Ok(());
}
Err(e) => {
common::log_warn!(e, "Chat component errored, restarting in 5 seconds");
tokio::time::sleep(time::Duration::from_secs(5)).await;
return Ok(());
}
}
}
Some(future) = futures.next() => {
match future {
Ok(..) => {
Expand Down Expand Up @@ -449,7 +440,7 @@ impl ChatLoop<'_> {
handler.send_ping()?;
}
_ = &mut *handler.pong_timeout => {
bail!("server not responding");
bail!("Server not responding");
}
update = whitelisted_hosts_stream.recv() => {
handler.whitelisted_hosts = update;
Expand Down Expand Up @@ -483,7 +474,7 @@ impl ChatLoop<'_> {
}
}
_ = &mut outgoing => {
bail!("outgoing future ended unexpectedly");
bail!("Outgoing future ended unexpectedly");
}
_ = &mut leave => {
break;
Expand Down
3 changes: 2 additions & 1 deletion crates/oxidize-chat/src/module.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_injector::Injector;
use async_trait::async_trait;
use common::BoxFuture;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -38,7 +39,7 @@ pub struct HookContext<'a, 'task> {
pub sender: &'a sender::Sender,
pub settings: &'a settings::Settings<::auth::Scope>,
pub handlers: &'a mut Handlers,
pub tasks: &'a mut common::Futures<'task, Result<()>>,
pub tasks: &'a mut Vec<BoxFuture<'task, Result<()>>>,
}

/// Trait used to hook up a module.
Expand Down
10 changes: 1 addition & 9 deletions crates/oxidize-common/src/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,14 @@ use crate::BoxFuture;
/// Collection of boxed futures to drive.
pub type Futures<'a, O> = ::futures_util::stream::FuturesUnordered<BoxFuture<'a, O>>;

/// Collection of local futures to drive.
pub type LocalFutures<'a, O> = ::futures_util::stream::FuturesUnordered<
std::pin::Pin<Box<dyn std::future::Future<Output = O> + 'a>>,
>;

/// Run a collection of borrowed futures.
pub struct BorrowedFutures<'a, O> {
inner: ::futures_util::stream::FuturesUnordered<Pin<&'a mut dyn Future<Output = O>>>,
}

impl<'a, O> BorrowedFutures<'a, O> {
/// Push a borrowed future into the local collection.
pub fn push<F>(&mut self, future: Pin<&'a mut F>)
where
F: Future<Output = O>,
{
pub fn push(&mut self, future: Pin<&'a mut dyn Future<Output = O>>) {
self.inner.push(future);
}

Expand Down
2 changes: 1 addition & 1 deletion crates/oxidize-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub use self::percentage::percentage;

#[macro_use]
mod futures;
pub use self::futures::{BorrowedFutures, Futures, LocalFutures};
pub use self::futures::{BorrowedFutures, Futures};

/// A boxed future.
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
Expand Down

0 comments on commit 018947b

Please sign in to comment.