Skip to content

Commit

Permalink
Added bunch of notifiers.
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kirilin <[email protected]>
  • Loading branch information
s3rius committed Nov 17, 2023
1 parent 0e344be commit 753487d
Show file tree
Hide file tree
Showing 12 changed files with 1,146 additions and 10 deletions.
647 changes: 643 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sha2 = { version = "^0.10.1", features = ["compress"] }
md-5 = "^0.10.1"
digest = "^0.10.1"
reqwest = "0.11.22"
lapin = "2.3.1"


[profile.release]
Expand Down
90 changes: 87 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,21 @@ pub struct DataStorageConfig {
/// S3 bucket to upload files to.
///
/// This parameter is required fo s3-based storages.
#[arg(long, required_if_eq("data_storage", "hybrid-s3"), env = "RUSTUS_S3_BUCKET")]
#[arg(
long,
required_if_eq("data_storage", "hybrid-s3"),
env = "RUSTUS_S3_BUCKET"
)]
pub s3_bucket: Option<String>,

/// S3 region.
///
/// This parameter is required fo s3-based storages.
#[arg(long, required_if_eq("data_storage", "hybrid-s3"), env = "RUSTUS_S3_REGION")]
#[arg(
long,
required_if_eq("data_storage", "hybrid-s3"),
env = "RUSTUS_S3_REGION"
)]
pub s3_region: Option<String>,

/// S3 access key.
Expand Down Expand Up @@ -112,7 +120,11 @@ pub struct DataStorageConfig {
/// S3 URL.
///
/// This parameter is required fo s3-based storages.
#[arg(long, required_if_eq("data_storage", "hybrid-s3"), env = "RUSTUS_S3_URL")]
#[arg(
long,
required_if_eq("data_storage", "hybrid-s3"),
env = "RUSTUS_S3_URL"
)]
pub s3_url: Option<String>,

/// S3 force path style.
Expand Down Expand Up @@ -148,6 +160,78 @@ pub struct DataStorageConfig {
pub s3_headers: Option<String>,
}

#[derive(Parser, Clone, Debug)]
pub struct AMQPHooksOptions {
/// Url for AMQP server.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_URL")]
pub hooks_amqp_url: Option<String>,

/// Rustus will create exchange if enabled.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_DECLARE_EXCHANGE")]
pub hooks_amqp_declare_exchange: bool,

/// Rustus will create all queues for communication and bind them
/// to exchange if enabled.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_DECLARE_QUEUES")]
pub hooks_amqp_declare_queues: bool,

/// Durability type of exchange.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_DURABLE_EXCHANGE")]
pub hooks_amqp_durable_exchange: bool,

/// Durability type of queues.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_DURABLE_QUEUES")]
pub hooks_amqp_durable_queues: bool,

/// Adds celery specific headers.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_CELERY")]
pub hooks_amqp_celery: bool,

/// Name of amqp exchange.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE", default_value = "rustus")]
pub hooks_amqp_exchange: String,

/// Exchange kind.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE_KIND", default_value = "topic")]
pub hooks_amqp_exchange_kind: String,

/// Routing key to use when sending message to an exchange.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_ROUTING_KEY")]
pub hooks_amqp_routing_key: Option<String>,

/// Prefix for all AMQP queues.
#[arg(
long,
env = "RUSTUS_HOOKS_AMQP_QUEUES_PREFIX",
default_value = "rustus"
)]
pub hooks_amqp_queues_prefix: String,

/// Maximum number of connections for RabbitMQ.
#[arg(
long,
env = "RUSTUS_HOOKS_AMQP_CONNECTION_POOL_SIZE",
default_value = "10"
)]
pub hooks_amqp_connection_pool_size: u64,

/// Maximum number of opened channels for each connection.
#[arg(
long,
env = "RUSTUS_HOOKS_AMQP_CHANNEL_POOL_SIZE",
default_value = "10"
)]
pub hooks_amqp_channel_pool_size: u64,

/// After this amount of time the connection will be dropped.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CONNECTION_TIMEOUT")]
pub hooks_amqp_idle_connection_timeout: Option<u64>,

/// After this amount of time in seconds, the channel will be closed.
#[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CHANNELS_TIMEOUT")]
pub hooks_amqp_idle_channels_timeout: Option<u64>,
}

Check failure on line 233 in src/config.rs

View workflow job for this annotation

GitHub Actions / clippy

more than 3 bools in a struct

error: more than 3 bools in a struct --> src/config.rs:164:1 | 164 | / pub struct AMQPHooksOptions { 165 | | /// Url for AMQP server. 166 | | #[arg(long, env = "RUSTUS_HOOKS_AMQP_URL")] 167 | | pub hooks_amqp_url: Option<String>, ... | 232 | | pub hooks_amqp_idle_channels_timeout: Option<u64>, 233 | | } | |_^ | = help: consider using a state machine or refactoring bools into two-variant enums = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#struct_excessive_bools = note: `-D clippy::struct-excessive-bools` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::struct_excessive_bools)]`

#[derive(Parser, Clone, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Config {
Expand Down
4 changes: 1 addition & 3 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ pub enum RustusError {
HookError(String),
#[error("Unable to configure logging: {0}")]
LogConfigError(#[from] log::SetLoggerError),
#[cfg(feature = "amqp_notifier")]
#[error("AMQP error: {0}")]
AMQPError(#[from] lapin::Error),
#[cfg(feature = "amqp_notifier")]
#[error("AMQP pooling error error: {0}")]
AMQPPoolError(#[from] bb8::RunError<lapin::Error>),
AMQPPoolError(#[from] mobc::Error<lapin::Error>),
#[error("Std error: {0}")]
StdError(#[from] std::io::Error),
#[error("Can't spawn task: {0}")]
Expand Down
173 changes: 173 additions & 0 deletions src/notifiers/impls/amqp_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use std::time::Duration;

use axum::http::HeaderMap;
use lapin::{
options::{ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, BasicPublishOptions},
types::{AMQPValue, FieldTable, LongString},
ConnectionProperties, ExchangeKind, BasicProperties,
};
use mobc::Pool;
use strum::IntoEnumIterator;

use crate::{
config::AMQPHooksOptions,
errors::RustusResult,
notifiers::{base::Notifier, hooks::Hook},
utils::lapin_pool::{ChannelPool, ConnnectionPool},
};

#[allow(clippy::struct_excessive_bools)]
#[derive(Clone)]
pub struct DeclareOptions {
pub declare_exchange: bool,
pub durable_exchange: bool,
pub declare_queues: bool,
pub durable_queues: bool,
}

#[derive(Clone)]
pub struct AMQPNotifier {
exchange_name: String,
channel_pool: Pool<ChannelPool>,
queues_prefix: String,
exchange_kind: String,
routing_key: Option<String>,
declare_options: DeclareOptions,
celery: bool,
}

/// ManagerConnection for ChannelPool.

Check failure on line 39 in src/notifiers/impls/amqp_notifier.rs

View workflow job for this annotation

GitHub Actions / clippy

item in documentation is missing backticks

error: item in documentation is missing backticks --> src/notifiers/impls/amqp_notifier.rs:39:27 | 39 | /// ManagerConnection for ChannelPool. | ^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown help: try | 39 | /// ManagerConnection for `ChannelPool`. | ~~~~~~~~~~~~~

Check failure on line 39 in src/notifiers/impls/amqp_notifier.rs

View workflow job for this annotation

GitHub Actions / clippy

item in documentation is missing backticks

error: item in documentation is missing backticks --> src/notifiers/impls/amqp_notifier.rs:39:5 | 39 | /// ManagerConnection for ChannelPool. | ^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown help: try | 39 | /// `ManagerConnection` for ChannelPool. | ~~~~~~~~~~~~~~~~~~~
///
/// This manager helps you maintain opened channels.
impl AMQPNotifier {
#[allow(clippy::fn_params_excessive_bools)]
pub async fn new(options: AMQPHooksOptions) -> RustusResult<Self> {

Check failure on line 44 in src/notifiers/impls/amqp_notifier.rs

View workflow job for this annotation

GitHub Actions / clippy

docs for function returning `Result` missing `# Errors` section

error: docs for function returning `Result` missing `# Errors` section --> src/notifiers/impls/amqp_notifier.rs:44:5 | 44 | pub async fn new(options: AMQPHooksOptions) -> RustusResult<Self> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc

Check failure on line 44 in src/notifiers/impls/amqp_notifier.rs

View workflow job for this annotation

GitHub Actions / clippy

docs for function which may panic missing `# Panics` section

error: docs for function which may panic missing `# Panics` section --> src/notifiers/impls/amqp_notifier.rs:44:5 | 44 | pub async fn new(options: AMQPHooksOptions) -> RustusResult<Self> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | note: first possible panic found here --> src/notifiers/impls/amqp_notifier.rs:46:13 | 46 | options.hooks_amqp_url.unwrap().clone(), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_panics_doc
let manager = ConnnectionPool::new(
options.hooks_amqp_url.unwrap().clone(),
ConnectionProperties::default(),
);
let connection_pool = mobc::Pool::builder()
.max_idle_lifetime(
options
.hooks_amqp_idle_connection_timeout
.map(Duration::from_secs),
)
.max_open(options.hooks_amqp_connection_pool_size)
.build(manager);
let channel_pool = mobc::Pool::builder()
.max_idle_lifetime(
options
.hooks_amqp_idle_channels_timeout
.map(Duration::from_secs),
)
.max_open(options.hooks_amqp_channel_pool_size)
.build(ChannelPool::new(connection_pool));

Ok(Self {
channel_pool,
celery: options.hooks_amqp_celery,
routing_key: options.hooks_amqp_routing_key,
declare_options: DeclareOptions {
declare_exchange: options.hooks_amqp_declare_exchange,
durable_exchange: options.hooks_amqp_durable_exchange,
declare_queues: options.hooks_amqp_declare_queues,
durable_queues: options.hooks_amqp_durable_queues,
},
exchange_kind: options.hooks_amqp_exchange_kind,
exchange_name: options.hooks_amqp_exchange,
queues_prefix: options.hooks_amqp_queues_prefix,
})
}

Check failure on line 80 in src/notifiers/impls/amqp_notifier.rs

View workflow job for this annotation

GitHub Actions / clippy

unused `async` for function with no await statements

error: unused `async` for function with no await statements --> src/notifiers/impls/amqp_notifier.rs:44:5 | 44 | / pub async fn new(options: AMQPHooksOptions) -> RustusResult<Self> { 45 | | let manager = ConnnectionPool::new( 46 | | options.hooks_amqp_url.unwrap().clone(), 47 | | ConnectionProperties::default(), ... | 79 | | }) 80 | | } | |_____^ | = help: consider removing the `async` from this function = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#unused_async = note: `-D clippy::unused-async` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::unused_async)]`

/// Generate queue name based on hook type.
///
/// If specific routing key is not empty, it returns it.
/// Otherwise it will generate queue name based on hook name.
pub fn get_queue_name(&self, hook: &Hook) -> String {

Check failure on line 86 in src/notifiers/impls/amqp_notifier.rs

View workflow job for this annotation

GitHub Actions / clippy

this method could have a `#[must_use]` attribute

error: this method could have a `#[must_use]` attribute --> src/notifiers/impls/amqp_notifier.rs:86:5 | 86 | pub fn get_queue_name(&self, hook: &Hook) -> String { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: add the attribute: `#[must_use] pub fn get_queue_name(&self, hook: &Hook) -> String` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#must_use_candidate
if let Some(routing_key) = self.routing_key.as_ref() {
routing_key.into()
} else {
format!("{}.{hook}", self.queues_prefix.as_str())
}
}
}

impl Notifier for AMQPNotifier {
async fn prepare(&mut self) -> RustusResult<()> {
let chan = self.channel_pool.get().await?;
if self.declare_options.declare_exchange {
chan.exchange_declare(
self.exchange_name.as_str(),
ExchangeKind::Custom(self.exchange_kind.clone()),
ExchangeDeclareOptions {
durable: self.declare_options.durable_exchange,
..ExchangeDeclareOptions::default()
},
FieldTable::default(),
)
.await?;
}
if self.declare_options.declare_queues {
for hook in Hook::iter() {
let queue_name = self.get_queue_name(&hook);
chan.queue_declare(
queue_name.as_str(),
QueueDeclareOptions {
durable: self.declare_options.durable_queues,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await?;
chan.queue_bind(
queue_name.as_str(),
self.exchange_name.as_str(),
queue_name.as_str(),
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
}
}
Ok(())
}

async fn send_message(
&self,
message: String,
hook: Hook,
_header_map: &HeaderMap,
) -> RustusResult<()> {
let chan = self.channel_pool.get().await?;
let queue = self.get_queue_name(&hook);
let routing_key = self.routing_key.as_ref().unwrap_or(&queue);
let payload = if self.celery {
format!("[[{message}], {{}}, {{}}]").as_bytes().to_vec()
} else {
message.as_bytes().to_vec()
};
let mut headers = FieldTable::default();
if self.celery {
headers.insert(
"id".into(),
AMQPValue::LongString(LongString::from(uuid::Uuid::new_v4().to_string())),
);
headers.insert(
"task".into(),
AMQPValue::LongString(LongString::from(format!("rustus.{hook}"))),
);
}
chan.basic_publish(
self.exchange_name.as_str(),
routing_key.as_str(),
BasicPublishOptions::default(),
payload.as_slice(),
BasicProperties::default()
.with_headers(headers)
.with_content_type("application/json".into())
.with_content_encoding("utf-8".into()),
)
.await?;
Ok(())
}
}
49 changes: 49 additions & 0 deletions src/notifiers/impls/dir_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::{
errors::RustusError,
notifiers::{base::Notifier, hooks::Hook},
RustusResult,
};
use axum::http::HeaderMap;
use log::debug;
use std::path::PathBuf;
use tokio::process::Command;

#[derive(Clone)]
pub struct DirNotifier {
pub dir: PathBuf,
}

impl DirNotifier {
pub fn new(dir: PathBuf) -> Self {

Check failure on line 17 in src/notifiers/impls/dir_notifier.rs

View workflow job for this annotation

GitHub Actions / clippy

this method could have a `#[must_use]` attribute

error: this method could have a `#[must_use]` attribute --> src/notifiers/impls/dir_notifier.rs:17:5 | 17 | pub fn new(dir: PathBuf) -> Self { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: add the attribute: `#[must_use] pub fn new(dir: PathBuf) -> Self` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#must_use_candidate
Self { dir }
}
}

impl Notifier for DirNotifier {
#[cfg_attr(coverage, no_coverage)]
async fn prepare(&mut self) -> RustusResult<()> {
Ok(())
}

async fn send_message(
&self,
message: String,
hook: Hook,
_headers_map: &HeaderMap,
) -> RustusResult<()> {
let hook_path = self.dir.join(hook.to_string());
if !hook_path.exists() {
debug!("Hook {} not found.", hook.to_string());
return Err(RustusError::HookError(format!(
"Hook file {hook} not found."
)));
}
debug!("Running hook: {}", hook_path.as_path().display());
let mut command = Command::new(hook_path).arg(message).spawn()?;
let stat = command.wait().await?;
if !stat.success() {
return Err(RustusError::HookError("Returned wrong status code".into()));
}
Ok(())
}
}
Loading

0 comments on commit 753487d

Please sign in to comment.