Skip to content

Commit

Permalink
Added sentry support.
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kirilin <[email protected]>
  • Loading branch information
s3rius committed Jun 23, 2024
1 parent 3131cf5 commit ea1a590
Show file tree
Hide file tree
Showing 18 changed files with 451 additions and 468 deletions.
677 changes: 300 additions & 377 deletions Cargo.lock

Large diffs are not rendered by default.

43 changes: 22 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,42 @@ path = "src/main.rs"

[dependencies]
async-trait = "0.1.74"
axum = { version = "0.7.1" }
axum = { version = "0.7.5" }
base64 = "0.22.1"
bytes = "1.5.0"
chrono = { version = "0.4.26", features = ["serde"] }
bytes = "1.6.0"
chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.3.21", features = ["env", "derive"] }
enum_dispatch = "0.3.12"
enum_dispatch = "0.3.13"
fern = { version = "0.6.2", features = ["colored", "chrono"] }
futures = "0.3.29"
log = "0.4.20"
futures = "0.3.30"
log = "0.4.21"
mime = "0.3.17"
mime_guess = "2.0.4"
mobc = "0.8.3"
mobc = "0.8.4"
redis = { version = "0.25.3", features = ["tokio-comp", "connection-manager"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.192", features = ["derive"] }
serde_json = "1.0.108"
rustc-hash = "2.0.0"
serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117"
strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0.50"
tokio = { version = "1.31.0", features = ["full", "bytes"] }
tokio-util = { version = "0.7.10", features = ["io"] }
uuid = { version = "1.5.0", features = ["v4"] }
rust-s3 = "^0.33"
thiserror = "1.0.60"
tokio = { version = "1.37.0", features = ["full", "bytes"] }
tokio-util = { version = "0.7.11", features = ["io"] }
uuid = { version = "1.8.0", features = ["v4"] }
rust-s3 = "0.34.0"
tower = "0.4.13"
# Hashsums
sha1 = { version = "^0.10.1", features = ["compress"] }
sha2 = { version = "^0.10.1", features = ["compress"] }
md-5 = "^0.10.1"
digest = "^0.10.1"
reqwest = "0.12.4"
lapin = "2.3.1"
tower-http = { version = "0.5.0", features = ["cors", "trace"] }
wildmatch = "2.1.1"
lapin = "2.3.4"
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
wildmatch = "2.3.3"
tracing = "0.1.40"
sentry = "0.32.0"
sentry-tracing = "0.32.0"
sentry-tower = { version = "0.32.0", features = [
sentry = "0.34.0"
sentry-tracing = "0.34.0"
sentry-tower = { version = "0.34.0", features = [
"http",
"axum",
"axum-matched-path",
Expand All @@ -61,6 +61,7 @@ tracing-subscriber = { version = "0.3.18", features = [
"parking_lot",
"time",
] }
tracing-error = { version = "0.2.0", features = ["traced-error"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5.4", features = [
Expand Down
8 changes: 4 additions & 4 deletions src/data_storage/impls/file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Storage for FileStorage {
Ok(resp)
}

async fn add_bytes(&self, file_info: &FileInfo, mut bytes: Bytes) -> RustusResult<()> {
async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> {
// In normal situation this `if` statement is not
// gonna be called, but what if it is ...
let Some(path) = &file_info.path else {
Expand All @@ -101,7 +101,6 @@ impl Storage for FileStorage {
writer.get_ref().sync_data().await?;
}
writer.into_inner().shutdown().await?;
bytes.clear();
Ok(())
}

Expand Down Expand Up @@ -147,10 +146,11 @@ impl Storage for FileStorage {
reader.shutdown().await?;
}
writer.flush().await?;
let mut inner_file = writer.into_inner();
if self.force_fsync {
writer.get_ref().sync_data().await?;
inner_file.sync_data().await?;
}
writer.into_inner().shutdown().await?;
inner_file.shutdown().await?;
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions src/data_storage/impls/s3_hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use axum::response::{IntoResponse, Response};
use bytes::Bytes;
use s3::{
command::Command,
request::{tokio_backend::Reqwest, Request as S3Request},
request::{tokio_backend::HyperRequest, Request as S3Request},
Bucket,
};

Expand Down Expand Up @@ -138,9 +138,9 @@ impl Storage for S3HybridStorage {
}
let key = self.get_s3_key(file_info);
let command = Command::GetObject;
let s3_request = Reqwest::new(&self.bucket, &key, command).unwrap();
let s3_response = s3_request.response().await.unwrap();
let mut resp = axum::body::Body::from_stream(s3_response.bytes_stream()).into_response();
let s3_request = HyperRequest::new(&self.bucket, &key, command).await?;
let s3_response = s3_request.response_data_to_stream().await?;
let mut resp = axum::body::Body::from_stream(s3_response.bytes).into_response();
resp.headers_mut()
.generate_disposition(file_info.get_filename());
Ok(resp)
Expand Down
9 changes: 9 additions & 0 deletions src/data_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl base::Storage for DataStorageImpl {
}
}

#[tracing::instrument(err, skip(self, bytes), fields(storage = self.get_name()))]
async fn add_bytes(
&self,
file_info: &crate::models::file_info::FileInfo,
Expand All @@ -111,10 +112,17 @@ impl base::Storage for DataStorageImpl {
}
}

#[tracing::instrument("FileStorage create_file", skip(self, file_info), fields(storage = self.get_name()))]
async fn create_file(
&self,
file_info: &crate::models::file_info::FileInfo,
) -> RustusResult<String> {
tracing::info!(
path = file_info.path,
length = file_info.length,
"Creating file: {:?}",
file_info.id
);
match self {
Self::File(file) => file.create_file(file_info).await,
Self::S3Hybrid(s3) => s3.create_file(file_info).await,
Expand All @@ -132,6 +140,7 @@ impl base::Storage for DataStorageImpl {
}
}

#[tracing::instrument(skip(self), fields(storage = self.get_name()))]
async fn remove_file(
&self,
file_info: &crate::models::file_info::FileInfo,
Expand Down
8 changes: 3 additions & 5 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::io::{Error, ErrorKind};

use axum::response::IntoResponse;

use axum::http::StatusCode;
Expand Down Expand Up @@ -69,9 +67,9 @@ pub enum RustusError {
}

/// This conversion allows us to use `RustusError` in the `main` function.
impl From<RustusError> for Error {
impl From<RustusError> for std::io::Error {
fn from(err: RustusError) -> Self {
Error::new(ErrorKind::Other, err)
std::io::Error::new(std::io::ErrorKind::Other, err)
}
}

Expand All @@ -98,7 +96,7 @@ impl IntoResponse for RustusError {
fn into_response(self) -> axum::response::Response {
let status_code = self.get_status_code();
if status_code != StatusCode::NOT_FOUND {
tracing::error!(err=?self, "{self}");
tracing::error!(err=%self, "{self}");
}
match self {
RustusError::HTTPHookError(_, proxy_response, content_type) => {
Expand Down
4 changes: 4 additions & 0 deletions src/info_storages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,28 @@ impl base::InfoStorage for InfoStorageImpl {
}
}

#[tracing::instrument(err, skip(self, file_info))]
async fn set_info(
&self,
file_info: &crate::models::file_info::FileInfo,
create: bool,
) -> RustusResult<()> {
tracing::debug!("Setting file info: {:?}", file_info);
match self {
Self::Redis(redis) => redis.set_info(file_info, create).await,
Self::File(file) => file.set_info(file_info, create).await,
}
}

#[tracing::instrument(err, skip(self))]
async fn get_info(&self, file_id: &str) -> RustusResult<crate::models::file_info::FileInfo> {
match self {
Self::Redis(redis) => redis.get_info(file_id).await,
Self::File(file) => file.get_info(file_id).await,
}
}

#[tracing::instrument(err, skip(self))]
async fn remove_info(&self, file_id: &str) -> RustusResult<()> {
match self {
Self::Redis(redis) => redis.remove_info(file_id).await,
Expand Down
18 changes: 18 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{borrow::Cow, str::FromStr};

use errors::RustusResult;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

pub mod config;
pub mod data_storage;
Expand Down Expand Up @@ -49,7 +50,9 @@ fn main() -> RustusResult<()> {
greeting(&args);
#[allow(clippy::no_effect_underscore_binding)]
let mut _guard = None;
let mut sentry_layer = None;
if let Some(sentry_dsn) = &args.sentry_config.dsn {
sentry_layer = Some(sentry_tracing::layer());
let default_options = sentry::ClientOptions::default();
_guard = Some(sentry::init(sentry::ClientOptions {
dsn: sentry::types::Dsn::from_str(sentry_dsn.as_str()).ok(),
Expand All @@ -74,6 +77,21 @@ fn main() -> RustusResult<()> {
}));
}

tracing_subscriber::registry()
.with(tracing_subscriber::filter::LevelFilter::from_level(
args.log_level,
))
.with(
tracing_subscriber::fmt::layer()
.with_level(true)
.with_file(false)
.with_line_number(false)
.with_target(false),
)
.with(tracing_error::ErrorLayer::default())
.with(sentry_layer)
.init();

let mut builder = if Some(1) == args.workers {
tokio::runtime::Builder::new_current_thread()
} else {
Expand Down
1 change: 1 addition & 0 deletions src/notifiers/impls/amqp_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl Notifier for AMQPNotifier {
hook: &Hook,
_header_map: &HeaderMap,
) -> RustusResult<()> {
tracing::info!("Sending message to AMQP.");
let chan = self.channel_pool.get().await?;
let queue = self.get_queue_name(hook);
let routing_key = self.routing_key.as_ref().unwrap_or(&queue);
Expand Down
8 changes: 3 additions & 5 deletions src/notifiers/impls/dir_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ impl Notifier for DirNotifier {
) -> RustusResult<()> {
let hook_path = self.dir.join(hook.to_string());
if !hook_path.exists() {
tracing::debug!("Hook {} not found.", hook.to_string());
return Err(RustusError::HookError(format!(
"Hook file {hook} not found."
)));
tracing::warn!("Hook {} not found.", hook.to_string());
return Ok(());
}
tracing::debug!("Running hook: {}", hook_path.as_path().display());
tracing::info!("Running dir hook: {}", hook_path.as_path().display());
let mut command = Command::new(hook_path).arg(message).spawn()?;
let stat = command.wait().await?;
if !stat.success() {
Expand Down
40 changes: 32 additions & 8 deletions src/notifiers/impls/file_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,47 @@ impl Notifier for FileNotifier {
Ok(())
}

#[tracing::instrument(err, skip(self, message, _headers_map), fields(exit_status = tracing::field::Empty))]
#[tracing::instrument(
err,
skip(self, message, _headers_map),
fields(
exit_status = tracing::field::Empty,
sout = tracing::field::Empty,
serr = tracing::field::Empty,
)
)]
async fn send_message(
&self,
message: &str,
hook: &Hook,
_headers_map: &HeaderMap,
) -> RustusResult<()> {
tracing::debug!("Running command: {}", self.command.as_str());
let mut command = Command::new(self.command.as_str())
.arg(hook.to_string())
let hook_str = hook.to_string();
tracing::info!(
"Running command: `{} \"{}\" \"{{message}}\"`",
self.command.as_str(),
&hook_str
);

let command = Command::new(self.command.as_str())
.arg(hook_str)
.arg(message)
.stderr(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()?;
let stat = command.wait().await?;
if !stat.success() {
tracing::Span::current().record("exit_status", stat.code().unwrap_or(0));
return Err(RustusError::HookError("Returned wrong status code".into()));
let output = command.wait_with_output().await?;

tracing::Span::current()
.record("exit_status", output.status.code().unwrap_or(0))
.record("sout", String::from_utf8_lossy(&output.stdout).to_string())
.record("serr", String::from_utf8_lossy(&output.stderr).to_string());

if !output.status.success() {
return Err(RustusError::HookError(String::from(
"Returned wrong status code",
)));
}

Ok(())
}
}
19 changes: 8 additions & 11 deletions src/notifiers/impls/http_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ impl Notifier for HttpNotifier {
hook: &Hook,
header_map: &HeaderMap,
) -> RustusResult<()> {
tracing::debug!("Starting HTTP Hook.");
tracing::info!("Starting HTTP Hook.");
let idempotency_key = uuid::Uuid::new_v4().to_string();
let body_bytes = bytes::Bytes::copy_from_slice(message.as_bytes());
let requests_vec = self.urls.iter().map(|url| {
tracing::debug!("Preparing request for {}", url);
for url in &self.urls {
let mut request = self
.client
.post(url.as_str())
Expand All @@ -57,17 +56,15 @@ impl Notifier for HttpNotifier {
request = request.header(item.as_str(), value.as_bytes());
}
}
request.body(body_bytes.clone()).send()
});
for response in requests_vec {
let real_resp = response.await?;
if !real_resp.status().is_success() {
let content_type = real_resp
tracing::info!("Sending request to {}", url);
let response = request.body(body_bytes.clone()).send().await?;
if !response.status().is_success() {
let content_type = response
.headers()
.get("Content-Type")
.and_then(|hval| hval.to_str().ok().map(String::from));
let status = real_resp.status().as_u16();
let text = real_resp.text().await.unwrap_or_default();
let status = response.status().as_u16();
let text = response.text().await.unwrap_or_default();
tracing::Span::current().record("response_body", &text);
return Err(RustusError::HTTPHookError(status, text, content_type));
}
Expand Down
4 changes: 3 additions & 1 deletion src/notifiers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl NotificationManager {
///
/// This method might fail in case if any of the notifiers fails.
pub async fn prepare(&mut self) -> crate::errors::RustusResult<()> {
tracing::info!("Preparing notifiers.");
for notifier in &mut self.notifiers {
notifier.prepare().await?;
}
Expand All @@ -84,13 +85,14 @@ impl NotificationManager {
/// # Errors
///
/// This method might fail in case if any of the notifiers fails.
#[tracing::instrument(skip(self, hook, headers_map))]
#[tracing::instrument(skip(self, message, hook, headers_map))]
pub async fn notify_all(
&self,
message: &str,
hook: &super::hooks::Hook,
headers_map: &HeaderMap,
) -> crate::errors::RustusResult<()> {
tracing::info!("Sending message to all notifiers.");
let collect = self.notifiers.iter().map(|notifier| {
notifier
.send_message(message, hook, headers_map)
Expand Down
Loading

0 comments on commit ea1a590

Please sign in to comment.