diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 300c468..1e12134 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,7 +32,7 @@ jobs: - name: Adding component run: rustup component add rustfmt - name: Checking code format - run: cargo fmt -- --check + run: cargo fmt -- --check --config use_try_shorthand=true,imports_granularity=Crate code_check: needs: pre_job diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ffc5f67..64c437a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,6 +15,9 @@ repos: pass_filenames: false args: - fmt + - -- + - --config + - use_try_shorthand=true,imports_granularity=Crate - id: clippy types: diff --git a/Cargo.lock b/Cargo.lock index 96a971d..decd5b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16,7 +16,24 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", - "tokio-util", + "tokio-util 0.6.9", +] + +[[package]] +name = "actix-codec" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-sink", + "log", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util 0.7.0", ] [[package]] @@ -44,11 +61,11 @@ dependencies = [ [[package]] name = "actix-http" -version = "3.0.0-rc.2" +version = "3.0.0-rc.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb0185d65352deeea60d92231708068c04dc64f1ab307a1a307206a47d5a45d3" +checksum = "512e43eb60683c4ea2d1183f20076c899b2977849b896d8a639a12bd4652dd6d" dependencies = [ - "actix-codec", + "actix-codec 0.5.0", "actix-rt", "actix-service", "actix-utils", @@ -91,9 +108,9 @@ dependencies = [ [[package]] name = "actix-router" -version = "0.5.0-rc.3" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb6506dbef336634ff35d994d58daa0a412ea23751f15f9b4dcac4d594b1ed1f" +checksum = "eb60846b52c118f2f04a56cc90880a274271c489b2498623d58176f8ca21fa80" dependencies = [ "bytestring", "firestorm", @@ -159,7 +176,7 @@ version = "4.0.0-rc.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83e3c85bc4116b69913b03f16cff8cade1212508fcd321847d9cfe3d3e41f991" dependencies = [ - "actix-codec", + "actix-codec 0.4.2", "actix-http", "actix-macros", "actix-router", @@ -308,9 +325,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "askama_escape" -version = "0.10.2" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1bb320f97e6edf9f756bf015900038e43c7700e059688e5724a928c8f3b8d5" +checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341" [[package]] name = "async-task" @@ -515,9 +532,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.72" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" dependencies = [ "jobserver", ] @@ -579,7 +596,7 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", - "tokio-util", + "tokio-util 0.6.9", ] [[package]] @@ -709,11 +726,12 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4600d695eb3f6ce1cd44e6e291adceb2cc3ab12f20a33777ecd0bf6eba34e06" +checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8" dependencies = [ "generic-array 0.14.5", + "typenum", ] [[package]] @@ -779,9 +797,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.10.2" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cb780dce4f9a8f5c087362b3a4595936b2019e7c8b30f2c3e9a7e94e6ae9837" +checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" dependencies = [ "block-buffer 0.10.2", "crypto-common", @@ -1062,9 +1080,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" +checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" dependencies = [ "cfg-if", "libc", @@ -1086,7 +1104,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.6.9", "tracing", ] @@ -1181,9 +1199,9 @@ dependencies = [ [[package]] name = "http-range" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eee9694f83d9b7c09682fdb32213682939507884e5bcf227be9aff5d644b90dc" +checksum = "21dec9db110f5f872ed9699c3ecf50cf16f423502706ba5c72462e28d3157573" [[package]] name = "httparse" @@ -1374,9 +1392,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.117" +version = "0.2.119" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c" +checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4" [[package]] name = "libm" @@ -1468,9 +1486,9 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "mime_guess" -version = "2.0.3" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" dependencies = [ "mime", "unicase", @@ -2192,7 +2210,7 @@ dependencies = [ "pin-project-lite", "sha1", "tokio", - "tokio-util", + "tokio-util 0.6.9", "url", ] @@ -2331,9 +2349,9 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.21.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4214023b1223d02a4aad9f0bb9828317634a56530870a2eaf7200a99c0c10f68" +checksum = "d37baa70cf8662d2ba1c1868c5983dda16ef32b105cce41fb5c47e72936a90b3" dependencies = [ "arrayvec", "num-traits", @@ -2355,7 +2373,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver 1.0.5", + "semver 1.0.6", ] [[package]] @@ -2373,16 +2391,18 @@ dependencies = [ [[package]] name = "rustus" -version = "0.4.5" +version = "0.4.6" dependencies = [ "actix-files", "actix-rt", "actix-web", "async-trait", "base64", + "bytes", "chrono", "derive_more", "fern", + "futures", "httptest", "lapin", "lazy_static", @@ -2478,9 +2498,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0486718e92ec9a68fbed73bb5ef687d71103b142595b406835649bebd33f72c7" +checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d" [[package]] name = "semver-parser" @@ -2574,7 +2594,7 @@ checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.2", + "digest 0.10.3", ] [[package]] @@ -2673,9 +2693,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "518be6f6fff5ca76f985d434f9c37f3662af279642acf730388f271dff7b9016" +checksum = "195183bf6ff8328bb82c0511a83faf60aacf75840103388851db61d7a9854ae3" dependencies = [ "ahash", "atoi", @@ -2687,9 +2707,7 @@ dependencies = [ "bytes", "chrono", "crc", - "crossbeam-channel", "crossbeam-queue", - "crossbeam-utils", "digest 0.9.0", "dirs", "either", @@ -2713,7 +2731,7 @@ dependencies = [ "memchr", "num-bigint", "once_cell", - "parking_lot 0.11.2", + "paste", "percent-encoding", "rand 0.8.5", "regex", @@ -2740,9 +2758,9 @@ dependencies = [ [[package]] name = "sqlx-rt" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8061cbaa91ee75041514f67a09398c65a64efed72c90151ecd47593bad53da99" +checksum = "b555e70fbbf84e269ec3858b7a6515bcfe7a166a7cc9c636dd6efd20431678b6" dependencies = [ "actix-rt", "once_cell", @@ -3055,19 +3073,20 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.16.1" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ "bytes", "libc", "memchr", - "mio 0.7.14", + "mio 0.8.0", "num_cpus", "once_cell", - "parking_lot 0.11.2", + "parking_lot 0.12.0", "pin-project-lite", "signal-hook-registry", + "socket2", "winapi", ] @@ -3128,6 +3147,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower-service" version = "0.3.1" @@ -3136,9 +3169,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9" +checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f" dependencies = [ "cfg-if", "pin-project-lite", @@ -3487,9 +3520,9 @@ dependencies = [ [[package]] name = "zeroize_derive" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81e8f13fef10b63c06356d65d416b070798ddabcadc10d3ece0c5be9b3c7eddb" +checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index d634cad..4af3e82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustus" -version = "0.4.5" +version = "0.4.6" edition = "2021" description = "TUS protocol implementation written in Rust." @@ -16,6 +16,10 @@ serde_json = "1" strfmt = "^0.1.6" thiserror = "^1.0" url = "2.2.2" +bytes = "1.1.0" + +[dependencies.futures] +version = "0.3.21" [dependencies.serde] version = "1" @@ -29,7 +33,7 @@ features = ["vendored"] version = "0.6.0-beta.13" [dependencies.actix-web] -version = "^4.0.0-beta.20" +version = "^4.0.0-rc.3" [dependencies.chrono] features = ["serde"] @@ -79,7 +83,7 @@ features = ["derive"] version = "0.23" [dependencies.tokio] -features = ["time", "process"] +features = ["time", "process", "fs", "io-std", "io-util", "rt-multi-thread", "bytes"] version = "1.4.0" [dependencies.tokio-amqp] @@ -111,7 +115,7 @@ httptest = "0.15.4" [profile] [profile.release] -lto = true +lto = "fat" panic = "abort" opt-level = 3 codegen-units = 1 diff --git a/src/config.rs b/src/config.rs index f772c6f..50b8b79 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,12 @@ -use std::ffi::OsString; -use std::path::PathBuf; +use std::{ffi::OsString, path::PathBuf}; use structopt::StructOpt; -use crate::info_storages::AvailableInfoStores; -use crate::notifiers::{Format, Hook}; -use crate::protocol::extensions::Extensions; +use crate::{ + info_storages::AvailableInfoStores, + notifiers::{Format, Hook}, + protocol::extensions::Extensions, +}; use crate::storages::AvailableStores; @@ -25,8 +26,23 @@ pub struct StorageOptions { #[structopt(long, env = "RUSTUS_DATA_DIR", default_value = "./data")] pub data_dir: PathBuf, + /// Storage directory structure. + /// This template shows inner directory structure. + /// You can use following variables: + /// day, month, year or even environment variables. + /// Example: "/year/month/day/env[HOSTNAME]/". + /// #[structopt(long, env = "RUSTUS_DIR_STRUCTURE", default_value = "")] pub dir_structure: String, + + /// Forces fsync call after writing chunk to filesystem. + /// This parameter can help you when working with + /// Network file systems. It guarantees that + /// everything is written on disk correctly. + /// + /// In most cases this parameter is redundant. + #[structopt(long, parse(from_flag))] + pub force_fsync: bool, } #[derive(StructOpt, Debug, Clone)] @@ -109,6 +125,7 @@ pub struct NotificationsOptions { #[structopt(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE", default_value = "rustus")] pub hooks_amqp_exchange: String, + /// Prefix for all AMQP queues. #[cfg(feature = "amqp_notifier")] #[structopt( long, @@ -117,9 +134,13 @@ pub struct NotificationsOptions { )] pub hooks_amqp_queues_prefix: String, + /// Directory for executable hook files. + /// This parameter is used to call executables from dir. #[structopt(long, env = "RUSTUS_HOOKS_DIR")] pub hooks_dir: Option, + /// Executable file which must be called for + /// notifying about upload status. #[structopt(long, env = "RUSTUS_HOOKS_FILE")] pub hooks_file: Option, } @@ -213,9 +234,7 @@ impl RustusConf { pub fn base_url(&self) -> String { format!( "/{}", - self.url - .strip_prefix('/') - .unwrap_or_else(|| self.url.as_str()) + self.url.strip_prefix('/').unwrap_or(self.url.as_str()) ) } @@ -225,9 +244,7 @@ impl RustusConf { let base_url = self.base_url(); format!( "{}/{}", - base_url - .strip_suffix('/') - .unwrap_or_else(|| base_url.as_str()), + base_url.strip_suffix('/').unwrap_or(base_url.as_str()), file_id ) } diff --git a/src/errors.rs b/src/errors.rs index 942fe39..c2a4f49 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,7 +1,6 @@ use std::io::{Error, ErrorKind}; -use actix_web::http::StatusCode; -use actix_web::{HttpResponse, HttpResponseBuilder, ResponseError}; +use actix_web::{http::StatusCode, HttpResponse, HttpResponseBuilder, ResponseError}; use log::error; pub type RustusResult = Result; @@ -58,6 +57,8 @@ pub enum RustusError { AMQPPoolError(#[from] mobc_lapin::mobc::Error), #[error("Std error: {0}")] StdError(#[from] std::io::Error), + #[error("Can't spawn task: {0}")] + TokioSpawnError(#[from] tokio::task::JoinError), } /// This conversion allows us to use `RustusError` in the `main` function. diff --git a/src/info_storages/db_info_storage.rs b/src/info_storages/db_info_storage.rs index 164f2bc..74901bf 100644 --- a/src/info_storages/db_info_storage.rs +++ b/src/info_storages/db_info_storage.rs @@ -1,14 +1,12 @@ use std::time::Duration; use async_trait::async_trait; -use rbatis::crud::CRUD; -use rbatis::crud_table; -use rbatis::db::DBPoolOptions; -use rbatis::executor::Executor; -use rbatis::rbatis::Rbatis; +use rbatis::{crud::CRUD, crud_table, db::DBPoolOptions, executor::Executor, rbatis::Rbatis}; -use crate::errors::{RustusError, RustusResult}; -use crate::info_storages::{FileInfo, InfoStorage}; +use crate::{ + errors::{RustusError, RustusResult}, + info_storages::{FileInfo, InfoStorage}, +}; #[crud_table] struct DbModel { @@ -66,7 +64,7 @@ impl InfoStorage for DBInfoStorage { async fn get_info(&self, file_id: &str) -> RustusResult { let model: Option = self.db.fetch_by_column("id", file_id).await?; if let Some(info) = model { - serde_json::from_str(info.info.as_str()).map_err(RustusError::from) + FileInfo::from_json(info.info.to_string()).await } else { Err(RustusError::FileNotFound) } @@ -84,8 +82,7 @@ impl InfoStorage for DBInfoStorage { #[cfg(test)] mod tests { use super::{DBInfoStorage, DbModel}; - use crate::info_storages::FileInfo; - use crate::InfoStorage; + use crate::{info_storages::FileInfo, InfoStorage}; use rbatis::crud::CRUD; async fn get_info_storage() -> DBInfoStorage { diff --git a/src/info_storages/file_info_storage.rs b/src/info_storages/file_info_storage.rs index fad265b..a2cdf35 100644 --- a/src/info_storages/file_info_storage.rs +++ b/src/info_storages/file_info_storage.rs @@ -1,12 +1,20 @@ -use std::path::PathBuf; +use std::{ + io::{Read, Write}, + path::PathBuf, +}; use async_trait::async_trait; use log::error; -use tokio::fs::{read_to_string, remove_file, DirBuilder, OpenOptions}; -use tokio::io::copy; +use std::{ + fs::{remove_file, File, OpenOptions}, + io::{BufReader, BufWriter}, +}; +use tokio::fs::DirBuilder; -use crate::errors::{RustusError, RustusResult}; -use crate::info_storages::{FileInfo, InfoStorage}; +use crate::{ + errors::{RustusError, RustusResult}, + info_storages::{FileInfo, InfoStorage}, +}; pub struct FileInfoStorage { info_dir: PathBuf, @@ -35,57 +43,69 @@ impl InfoStorage for FileInfoStorage { } async fn set_info(&self, file_info: &FileInfo, create: bool) -> RustusResult<()> { - let mut file = OpenOptions::new() - .write(true) - .create(create) - .truncate(true) - .open(self.info_file_path(file_info.id.as_str()).as_path()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - let data = serde_json::to_string(&file_info).map_err(|err| { - error!("{:#?}", err); - err - })?; - copy(&mut data.as_bytes(), &mut file).await?; - file.sync_data().await?; - Ok(()) + let info = file_info.clone(); + let path = self.info_file_path(info.id.as_str()); + actix_web::rt::task::spawn_blocking(move || { + let file = OpenOptions::new() + .write(true) + .create(create) + .truncate(true) + .open(path) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + let data = serde_json::to_string(&info).map_err(RustusError::from)?; + { + let mut writer = BufWriter::new(file); + writer.write_all(data.as_bytes())?; + writer.flush()?; + } + Ok(()) + }) + .await? } async fn get_info(&self, file_id: &str) -> RustusResult { let info_path = self.info_file_path(file_id); - if !info_path.exists() { - return Err(RustusError::FileNotFound); - } - let contents = read_to_string(info_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToReadInfo - })?; - serde_json::from_str::(contents.as_str()).map_err(RustusError::from) + actix_web::rt::task::spawn_blocking(move || { + if !info_path.exists() { + return Err(RustusError::FileNotFound); + } + let info = File::open(info_path)?; + let mut contents = String::new(); + let mut reader = BufReader::new(info); + reader.read_to_string(&mut contents)?; + serde_json::from_str::(contents.as_str()).map_err(RustusError::from) + }) + .await? } async fn remove_info(&self, file_id: &str) -> RustusResult<()> { - let info_path = self.info_file_path(file_id); - if !info_path.exists() { - return Err(RustusError::FileNotFound); - } - remove_file(info_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToRemove(String::from(file_id)) + let id = String::from(file_id); + let info_path = self.info_file_path(id.as_str()); + actix_web::rt::task::spawn_blocking(move || { + if !info_path.exists() { + return Err(RustusError::FileNotFound); + } + remove_file(info_path).map_err(|err| { + error!("{:?}", err); + RustusError::UnableToRemove(id) + }) }) + .await? } } #[cfg(test)] mod tests { use super::FileInfoStorage; - use crate::info_storages::FileInfo; - use crate::InfoStorage; - use std::collections::HashMap; - use std::fs::File; - use std::io::{Read, Write}; + use crate::{info_storages::FileInfo, InfoStorage}; + use std::{ + collections::HashMap, + fs::File, + io::{Read, Write}, + }; #[actix_rt::test] async fn preparation() { diff --git a/src/info_storages/mod.rs b/src/info_storages/mod.rs index 66338b6..fe5d56d 100644 --- a/src/info_storages/mod.rs +++ b/src/info_storages/mod.rs @@ -7,6 +7,6 @@ pub mod redis_info_storage; pub mod models; -pub use models::available_info_storages::AvailableInfoStores; -pub use models::file_info::FileInfo; -pub use models::info_store::InfoStorage; +pub use models::{ + available_info_storages::AvailableInfoStores, file_info::FileInfo, info_store::InfoStorage, +}; diff --git a/src/info_storages/models/available_info_storages.rs b/src/info_storages/models/available_info_storages.rs index e5e481f..698190a 100644 --- a/src/info_storages/models/available_info_storages.rs +++ b/src/info_storages/models/available_info_storages.rs @@ -1,7 +1,6 @@ use derive_more::{Display, From}; -use crate::errors::RustusResult; -use crate::{from_str, RustusConf}; +use crate::{errors::RustusResult, from_str, RustusConf}; use crate::info_storages::{file_info_storage, InfoStorage}; use strum::EnumIter; diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs index bace48b..42089ab 100644 --- a/src/info_storages/models/file_info.rs +++ b/src/info_storages/models/file_info.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; -use chrono::serde::ts_seconds; -use chrono::{DateTime, Utc}; +use crate::{errors::RustusError, RustusResult}; +use chrono::{serde::ts_seconds, DateTime, Utc}; +use log::error; use serde::{Deserialize, Serialize}; /// Information about file. @@ -88,6 +89,29 @@ impl FileInfo { } } + pub async fn json(&self) -> RustusResult { + let info_clone = self.clone(); + actix_web::rt::task::spawn_blocking(move || { + serde_json::to_string(&info_clone).map_err(RustusError::from) + }) + .await + .map_err(|err| { + error!("{}", err); + RustusError::UnableToWrite("Can't serialize info".into()) + })? + } + + pub async fn from_json(data: String) -> RustusResult { + actix_web::rt::task::spawn_blocking(move || { + serde_json::from_str::(data.as_str()).map_err(RustusError::from) + }) + .await + .map_err(|err| { + error!("{}", err); + RustusError::UnableToWrite("Can't serialize info".into()) + })? + } + #[cfg(test)] pub fn new_test() -> Self { FileInfo::new( diff --git a/src/info_storages/models/info_store.rs b/src/info_storages/models/info_store.rs index 66418e0..ec5ffe6 100644 --- a/src/info_storages/models/info_store.rs +++ b/src/info_storages/models/info_store.rs @@ -1,5 +1,4 @@ -use crate::errors::RustusResult; -use crate::info_storages::FileInfo; +use crate::{errors::RustusResult, info_storages::FileInfo}; use async_trait::async_trait; /// Trait for every info storage. diff --git a/src/info_storages/redis_info_storage.rs b/src/info_storages/redis_info_storage.rs index b9e0b3b..07adebe 100644 --- a/src/info_storages/redis_info_storage.rs +++ b/src/info_storages/redis_info_storage.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; -use mobc_redis::mobc::Pool; -use mobc_redis::redis; -use mobc_redis::RedisConnectionManager; +use mobc_redis::{mobc::Pool, redis, RedisConnectionManager}; use redis::aio::Connection; -use crate::errors::{RustusError, RustusResult}; -use crate::info_storages::{FileInfo, InfoStorage}; +use crate::{ + errors::{RustusError, RustusResult}, + info_storages::{FileInfo, InfoStorage}, +}; pub struct RedisStorage { pool: Pool, @@ -30,7 +30,7 @@ impl InfoStorage for RedisStorage { let mut conn = self.pool.get().await?; redis::cmd("SET") .arg(file_info.id.as_str()) - .arg(serde_json::to_string(file_info)?.as_str()) + .arg(file_info.json().await?.as_str()) .query_async::(&mut conn) .await .map_err(RustusError::from)?; @@ -46,7 +46,7 @@ impl InfoStorage for RedisStorage { if res.is_none() { return Err(RustusError::FileNotFound); } - serde_json::from_str(res.unwrap().as_str()).map_err(RustusError::from) + FileInfo::from_json(res.unwrap()).await } async fn remove_info(&self, file_id: &str) -> RustusResult<()> { @@ -66,10 +66,8 @@ impl InfoStorage for RedisStorage { #[cfg(feature = "test_redis")] mod tests { use super::RedisStorage; - use crate::info_storages::FileInfo; - use crate::InfoStorage; - use mobc_redis::redis; - use mobc_redis::redis::AsyncCommands; + use crate::{info_storages::FileInfo, InfoStorage}; + use mobc_redis::{redis, redis::AsyncCommands}; async fn get_storage() -> RedisStorage { let redis_url = std::env::var("TEST_REDIS_URL").unwrap(); diff --git a/src/main.rs b/src/main.rs index 5b5330c..400db9f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,25 @@ #![cfg_attr(coverage, feature(no_coverage))] -use std::str::FromStr; -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; -use actix_web::http::Method; use actix_web::{ dev::{Server, Service}, + http::Method, middleware, web, App, HttpServer, }; -use fern::colors::{Color, ColoredLevelConfig}; -use fern::Dispatch; +use fern::{ + colors::{Color, ColoredLevelConfig}, + Dispatch, +}; use log::LevelFilter; use config::RustusConf; -use crate::errors::RustusResult; -use crate::info_storages::InfoStorage; -use crate::notifiers::models::notification_manager::NotificationManager; -use crate::server::rustus_service; -use crate::state::State; -use crate::storages::Storage; +use crate::{ + errors::RustusResult, info_storages::InfoStorage, + notifiers::models::notification_manager::NotificationManager, server::rustus_service, + state::State, storages::Storage, +}; mod config; mod errors; diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs index a7088f9..8810a3f 100644 --- a/src/notifiers/amqp_notifier.rs +++ b/src/notifiers/amqp_notifier.rs @@ -1,14 +1,15 @@ -use crate::notifiers::{Hook, Notifier}; -use crate::RustusResult; +use crate::{ + notifiers::{Hook, Notifier}, + RustusResult, +}; use actix_web::http::header::HeaderMap; use async_trait::async_trait; -use lapin::options::{ - BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, +use lapin::{ + options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions}, + types::FieldTable, + BasicProperties, ConnectionProperties, ExchangeKind, }; -use lapin::types::FieldTable; -use lapin::{BasicProperties, ConnectionProperties, ExchangeKind}; -use mobc_lapin::mobc::Pool; -use mobc_lapin::RMQConnectionManager; +use mobc_lapin::{mobc::Pool, RMQConnectionManager}; use strum::IntoEnumIterator; use tokio_amqp::LapinTokioExt; diff --git a/src/notifiers/dir_notifier.rs b/src/notifiers/dir_notifier.rs index 008d79c..303b21b 100644 --- a/src/notifiers/dir_notifier.rs +++ b/src/notifiers/dir_notifier.rs @@ -1,6 +1,8 @@ -use crate::errors::RustusError; -use crate::notifiers::{Hook, Notifier}; -use crate::RustusResult; +use crate::{ + errors::RustusError, + notifiers::{Hook, Notifier}, + RustusResult, +}; use actix_web::http::header::HeaderMap; use async_trait::async_trait; use log::debug; @@ -19,6 +21,7 @@ impl DirNotifier { #[async_trait] impl Notifier for DirNotifier { + #[cfg_attr(coverage, no_coverage)] async fn prepare(&mut self) -> RustusResult<()> { Ok(()) } @@ -52,10 +55,12 @@ mod tests { use super::DirNotifier; use crate::notifiers::{Hook, Notifier}; use actix_web::http::header::HeaderMap; - use std::fs::File; - use std::io::{Read, Write}; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; + use std::{ + fs::File, + io::{Read, Write}, + }; use tempdir::TempDir; #[actix_rt::test] diff --git a/src/notifiers/file_notifier.rs b/src/notifiers/file_notifier.rs index 5ac7e31..911c279 100644 --- a/src/notifiers/file_notifier.rs +++ b/src/notifiers/file_notifier.rs @@ -1,6 +1,8 @@ -use crate::errors::RustusError; -use crate::notifiers::{Hook, Notifier}; -use crate::RustusResult; +use crate::{ + errors::RustusError, + notifiers::{Hook, Notifier}, + RustusResult, +}; use actix_web::http::header::HeaderMap; use async_trait::async_trait; use log::debug; @@ -47,10 +49,12 @@ mod tests { use super::FileNotifier; use crate::notifiers::{Hook, Notifier}; use actix_web::http::header::HeaderMap; - use std::fs::File; - use std::io::{Read, Write}; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; + use std::{ + fs::File, + io::{Read, Write}, + }; #[cfg(unix)] #[actix_rt::test] diff --git a/src/notifiers/http_notifier.rs b/src/notifiers/http_notifier.rs index 36956d5..a6887cc 100644 --- a/src/notifiers/http_notifier.rs +++ b/src/notifiers/http_notifier.rs @@ -67,10 +67,8 @@ mod tests { use super::HttpNotifier; use crate::notifiers::{Hook, Notifier}; use actix_web::http::header::{HeaderMap, HeaderName, HeaderValue}; - use httptest::matchers::contains; - use httptest::responders::status_code; - use std::str::FromStr; - use std::time::Duration; + use httptest::{matchers::contains, responders::status_code}; + use std::{str::FromStr, time::Duration}; #[actix_rt::test] async fn success_request() { diff --git a/src/notifiers/mod.rs b/src/notifiers/mod.rs index 0359510..d9957a1 100644 --- a/src/notifiers/mod.rs +++ b/src/notifiers/mod.rs @@ -6,6 +6,4 @@ mod file_notifier; pub mod http_notifier; pub mod models; -pub use models::hooks::Hook; -pub use models::message_format::Format; -pub use models::notifier::Notifier; +pub use models::{hooks::Hook, message_format::Format, notifier::Notifier}; diff --git a/src/notifiers/models/message_format.rs b/src/notifiers/models/message_format.rs index 6d80200..9a00077 100644 --- a/src/notifiers/models/message_format.rs +++ b/src/notifiers/models/message_format.rs @@ -1,10 +1,8 @@ -use crate::errors::RustusResult; -use crate::info_storages::FileInfo; +use crate::{errors::RustusResult, info_storages::FileInfo}; use actix_web::HttpRequest; use derive_more::{Display, From}; use serde::Serialize; -use serde_json::Map; -use serde_json::Value; +use serde_json::{Map, Value}; use std::collections::HashMap; use crate::from_str; diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 68c2ae9..441c99a 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -1,12 +1,12 @@ -use crate::errors::RustusResult; #[cfg(feature = "amqp_notifier")] use crate::notifiers::amqp_notifier; -use crate::notifiers::dir_notifier::DirNotifier; -use crate::notifiers::file_notifier::FileNotifier; #[cfg(feature = "http_notifier")] use crate::notifiers::http_notifier; -use crate::notifiers::{Hook, Notifier}; -use crate::RustusConf; +use crate::{ + errors::RustusResult, + notifiers::{dir_notifier::DirNotifier, file_notifier::FileNotifier, Hook, Notifier}, + RustusConf, +}; use actix_web::http::header::HeaderMap; use log::debug; diff --git a/src/protocol/core/get_info.rs b/src/protocol/core/get_info.rs index bc8cc8a..ddb014a 100644 --- a/src/protocol/core/get_info.rs +++ b/src/protocol/core/get_info.rs @@ -1,6 +1,6 @@ -use actix_web::{web, HttpRequest, HttpResponse}; - use crate::errors::RustusError; +use actix_web::{web, HttpRequest, HttpResponse}; +use futures::stream::empty; use crate::{RustusResult, State}; @@ -47,18 +47,20 @@ pub async fn get_file_info( } builder .no_chunking(file_info.offset as u64) - .insert_header(("Upload-Offset", file_info.offset.to_string())) - .insert_header(("Content-Length", file_info.offset.to_string())); + .insert_header(("Upload-Offset", file_info.offset.to_string())); // Upload length is known. if let Some(upload_len) = file_info.length { - builder.insert_header(("Upload-Length", upload_len.to_string())); + builder + .no_chunking(upload_len as u64) + .insert_header(("Content-Length", file_info.offset.to_string())) + .insert_header(("Upload-Length", upload_len.to_string())); } else { builder.insert_header(("Upload-Defer-Length", "1")); } if let Some(meta) = file_info.get_metadata_string() { builder.insert_header(("Upload-Metadata", meta)); } - Ok(builder.finish()) + Ok(builder.streaming(empty::>())) } #[cfg(test)] @@ -66,8 +68,10 @@ mod tests { use actix_web::http::{Method, StatusCode}; use crate::{rustus_service, State}; - use actix_web::test::{call_service, init_service, TestRequest}; - use actix_web::{web, App}; + use actix_web::{ + test::{call_service, init_service, TestRequest}, + web, App, + }; #[actix_rt::test] async fn success() { diff --git a/src/protocol/core/server_info.rs b/src/protocol/core/server_info.rs index f5217cd..3648e84 100644 --- a/src/protocol/core/server_info.rs +++ b/src/protocol/core/server_info.rs @@ -19,12 +19,10 @@ pub async fn server_info(state: web::Data) -> HttpResponse { #[cfg(test)] mod tests { - use crate::protocol::extensions::Extensions; - use crate::{rustus_service, State}; + use crate::{protocol::extensions::Extensions, rustus_service, State}; use actix_web::test::{call_service, init_service, TestRequest}; - use actix_web::http::Method; - use actix_web::{web, App}; + use actix_web::{http::Method, web, App}; #[actix_rt::test] async fn test_server_info() { diff --git a/src/protocol/core/write_bytes.rs b/src/protocol/core/write_bytes.rs index 322b219..93cf900 100644 --- a/src/protocol/core/write_bytes.rs +++ b/src/protocol/core/write_bytes.rs @@ -1,10 +1,12 @@ use actix_web::{web, web::Bytes, HttpRequest, HttpResponse}; -use crate::errors::RustusError; -use crate::notifiers::Hook; -use crate::protocol::extensions::Extensions; -use crate::utils::headers::{check_header, parse_header}; -use crate::{RustusResult, State}; +use crate::{ + errors::RustusError, + notifiers::Hook, + protocol::extensions::Extensions, + utils::headers::{check_header, parse_header}, + RustusResult, State, +}; pub async fn write_bytes( request: HttpRequest, @@ -31,7 +33,7 @@ pub async fn write_bytes( // Parses header `Upload-Length` only if the creation-defer-length extension is enabled. let updated_len = if state .config - .extensions_vec() + .tus_extensions .contains(&Extensions::CreationDeferLength) { parse_header(&request, "Upload-Length") @@ -82,20 +84,19 @@ pub async fn write_bytes( if Some(file_info.offset) == file_info.length { return Err(RustusError::FrozenFile); } - + let chunk_len = bytes.len(); // Appending bytes to file. - state - .data_storage - .add_bytes(&file_info, bytes.as_ref()) - .await?; + state.data_storage.add_bytes(&file_info, bytes).await?; // Updating offset. - file_info.offset += bytes.len(); + file_info.offset += chunk_len; // Saving info to info storage. state.info_storage.set_info(&file_info, false).await?; let mut hook = Hook::PostReceive; + let mut keep_alive = true; if file_info.length == Some(file_info.offset) { hook = Hook::PostFinish; + keep_alive = false; } if state.config.hook_is_active(hook) { let message = state @@ -104,24 +105,33 @@ pub async fn write_bytes( .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, hook, &headers) .await }); } - Ok(HttpResponse::NoContent() - .insert_header(("Upload-Offset", file_info.offset.to_string())) - .finish()) + if keep_alive { + Ok(HttpResponse::NoContent() + .insert_header(("Upload-Offset", file_info.offset.to_string())) + .keep_alive() + .finish()) + } else { + Ok(HttpResponse::NoContent() + .insert_header(("Upload-Offset", file_info.offset.to_string())) + .finish()) + } } #[cfg(test)] mod tests { use crate::{rustus_service, State}; - use actix_web::http::StatusCode; - use actix_web::test::{call_service, init_service, TestRequest}; - use actix_web::{web, App}; + use actix_web::{ + http::StatusCode, + test::{call_service, init_service, TestRequest}, + web, App, + }; #[actix_rt::test] /// Success test for writing bytes. diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 20df718..c11fe66 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -1,13 +1,14 @@ use std::collections::HashMap; -use actix_web::web::Bytes; -use actix_web::{web, HttpRequest, HttpResponse}; +use actix_web::{web, web::Bytes, HttpRequest, HttpResponse}; -use crate::info_storages::FileInfo; -use crate::notifiers::Hook; -use crate::protocol::extensions::Extensions; -use crate::utils::headers::{check_header, parse_header}; -use crate::State; +use crate::{ + info_storages::FileInfo, + notifiers::Hook, + protocol::extensions::Extensions, + utils::headers::{check_header, parse_header}, + State, +}; /// Get metadata info from request. /// @@ -180,11 +181,11 @@ pub async fn create_file( let octet_stream = |val: &str| val == "application/offset+octet-stream"; if check_header(&request, "Content-Type", octet_stream) { // Writing first bytes. - state - .data_storage - .add_bytes(&file_info, bytes.as_ref()) - .await?; - file_info.offset += bytes.len(); + let chunk_len = bytes.len(); + // Appending bytes to file. + state.data_storage.add_bytes(&file_info, bytes).await?; + // Updating offset. + file_info.offset += chunk_len; } } @@ -199,7 +200,7 @@ pub async fn create_file( let headers = request.headers().clone(); // Adding send_message task to tokio reactor. // Thin function would be executed in background. - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, Hook::PostCreate, &headers) @@ -218,11 +219,12 @@ pub async fn create_file( #[cfg(test)] mod tests { - use crate::server::rustus_service; - use crate::State; - use actix_web::http::StatusCode; - use actix_web::test::{call_service, init_service, TestRequest}; - use actix_web::{web, App}; + use crate::{server::rustus_service, State}; + use actix_web::{ + http::StatusCode, + test::{call_service, init_service, TestRequest}, + web, App, + }; #[actix_rt::test] async fn success() { diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs index e3e7c0a..6371d6e 100644 --- a/src/protocol/getting/routes.rs +++ b/src/protocol/getting/routes.rs @@ -1,8 +1,7 @@ use actix_files::NamedFile; use actix_web::{web, HttpRequest}; -use crate::errors::RustusError; -use crate::{RustusResult, State}; +use crate::{errors::RustusError, RustusResult, State}; /// Retrieve actual file. /// @@ -21,12 +20,14 @@ pub async fn get_file(request: HttpRequest, state: web::Data) -> RustusRe } #[cfg(test)] -#[cfg_attr(coverage, no_coverage)] mod test { use crate::{rustus_service, State}; - use actix_web::http::StatusCode; - use actix_web::test::{call_service, init_service, TestRequest}; - use actix_web::{web, App}; + use actix_web::{ + http::StatusCode, + test::{call_service, init_service, TestRequest}, + web, App, + }; + use bytes::Bytes; #[actix_rt::test] async fn success() { @@ -38,7 +39,7 @@ mod test { let file_info = state.create_test_file().await; state .data_storage - .add_bytes(&file_info, "data".as_bytes()) + .add_bytes(&file_info, Bytes::from("testing")) .await .unwrap(); let request = TestRequest::get() diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs index 82ae88e..8da92a2 100644 --- a/src/protocol/termination/routes.rs +++ b/src/protocol/termination/routes.rs @@ -1,8 +1,10 @@ use actix_web::{web, HttpRequest, HttpResponse}; -use crate::errors::{RustusError, RustusResult}; -use crate::notifiers::Hook; -use crate::State; +use crate::{ + errors::{RustusError, RustusResult}, + notifiers::Hook, + State, +}; /// Terminate uploading. /// @@ -27,7 +29,7 @@ pub async fn terminate( .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, Hook::PostTerminate, &headers) @@ -41,9 +43,11 @@ pub async fn terminate( #[cfg(test)] mod tests { use crate::{rustus_service, State}; - use actix_web::http::StatusCode; - use actix_web::test::{call_service, init_service, TestRequest}; - use actix_web::{web, App}; + use actix_web::{ + http::StatusCode, + test::{call_service, init_service, TestRequest}, + web, App, + }; use std::path::PathBuf; #[actix_rt::test] diff --git a/src/server.rs b/src/server.rs index b688e88..02496d3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,5 @@ use crate::{protocol, State}; -use actix_web::web::PayloadConfig; -use actix_web::{middleware, web}; +use actix_web::{middleware, web, web::PayloadConfig}; pub fn rustus_service(state: web::Data) -> Box { Box::new(move |web_app| { diff --git a/src/state.rs b/src/state.rs index 07de073..45fa622 100644 --- a/src/state.rs +++ b/src/state.rs @@ -31,6 +31,7 @@ impl State { data_storage: Box::new(crate::storages::file_storage::FileStorage::new( config.storage_opts.data_dir.clone(), config.storage_opts.dir_structure.clone(), + config.storage_opts.force_fsync, )), info_storage: Box::new( crate::info_storages::file_info_storage::FileInfoStorage::new( diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index 2653412..537378f 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -1,15 +1,20 @@ -use std::path::PathBuf; +use std::{io::Write, path::PathBuf}; use actix_files::NamedFile; use async_trait::async_trait; +use bytes::Bytes; use log::error; -use tokio::fs::{remove_file, DirBuilder, OpenOptions}; -use tokio::io::{copy, BufReader}; +use std::{ + fs::{remove_file, DirBuilder, OpenOptions}, + io::{copy, BufReader, BufWriter}, +}; -use crate::errors::{RustusError, RustusResult}; -use crate::info_storages::FileInfo; -use crate::storages::Storage; -use crate::utils::dir_struct::dir_struct; +use crate::{ + errors::{RustusError, RustusResult}, + info_storages::FileInfo, + storages::Storage, + utils::dir_struct::dir_struct, +}; use derive_more::Display; #[derive(Display)] @@ -17,17 +22,19 @@ use derive_more::Display; pub struct FileStorage { data_dir: PathBuf, dir_struct: String, + force_fsync: bool, } impl FileStorage { - pub fn new(data_dir: PathBuf, dir_struct: String) -> FileStorage { + pub fn new(data_dir: PathBuf, dir_struct: String, force_fsync: bool) -> FileStorage { FileStorage { data_dir, dir_struct, + force_fsync, } } - pub async fn data_file_path(&self, file_id: &str) -> RustusResult { + pub fn data_file_path(&self, file_id: &str) -> RustusResult { let dir = self .data_dir // We're working wit absolute paths, because tus.io says so. @@ -40,7 +47,6 @@ impl FileStorage { DirBuilder::new() .recursive(true) .create(dir.as_path()) - .await .map_err(|err| { error!("{}", err); RustusError::UnableToWrite(err.to_string()) @@ -58,7 +64,6 @@ impl Storage for FileStorage { DirBuilder::new() .recursive(true) .create(self.data_dir.as_path()) - .await .map_err(|err| RustusError::UnableToPrepareStorage(err.to_string()))?; } Ok(()) @@ -76,47 +81,61 @@ impl Storage for FileStorage { }) } - async fn add_bytes(&self, info: &FileInfo, bytes: &[u8]) -> RustusResult<()> { + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> { // In normal situation this `if` statement is not // gonna be called, but what if it is ... - if info.path.is_none() { + if file_info.path.is_none() { return Err(RustusError::FileNotFound); } - // Opening file in w+a mode. - // It means that we're going to append some - // bytes to the end of a file. - let mut file = OpenOptions::new() - .write(true) - .append(true) - .create(false) - .open(info.path.as_ref().unwrap()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - let mut reader = BufReader::new(bytes); - copy(&mut reader, &mut file).await?; - file.sync_data().await?; - Ok(()) + let path = String::from(file_info.path.as_ref().unwrap()); + let force_sync = self.force_fsync; + actix_web::rt::task::spawn_blocking(move || { + // Opening file in w+a mode. + // It means that we're going to append some + // bytes to the end of a file. + let file = OpenOptions::new() + .write(true) + .append(true) + .create(false) + .read(false) + .truncate(false) + .open(path.as_str()) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + { + let mut writer = BufWriter::new(file); + writer.write_all(bytes.as_ref())?; + writer.flush()?; + if force_sync { + writer.get_ref().sync_data()?; + } + } + Ok(()) + }) + .await? } async fn create_file(&self, file_info: &FileInfo) -> RustusResult { + let info = file_info.clone(); // New path to file. - let file_path = self.data_file_path(file_info.id.as_str()).await?; - // Creating new file. - OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .create_new(true) - .open(file_path.as_path()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::FileAlreadyExists(file_info.id.clone()) - })?; - Ok(file_path.display().to_string()) + let file_path = self.data_file_path(info.id.as_str())?; + actix_web::rt::task::spawn_blocking(move || { + // Creating new file. + OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .create_new(true) + .open(file_path.as_path()) + .map_err(|err| { + error!("{:?}", err); + RustusError::FileAlreadyExists(info.id.clone()) + })?; + Ok(file_path.display().to_string()) + }) + .await? } async fn concat_files( @@ -124,58 +143,67 @@ impl Storage for FileStorage { file_info: &FileInfo, parts_info: Vec, ) -> RustusResult<()> { - let mut file = OpenOptions::new() - .write(true) - .append(true) - .create(true) - .open(file_info.path.as_ref().unwrap().clone()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - for part in parts_info { - if part.path.is_none() { - return Err(RustusError::FileNotFound); + let info = file_info.clone(); + actix_web::rt::task::spawn_blocking(move || { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(info.path.as_ref().unwrap().clone()) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + for part in parts_info { + if part.path.is_none() { + return Err(RustusError::FileNotFound); + } + let part_file = OpenOptions::new() + .read(true) + .open(part.path.as_ref().unwrap())?; + let mut reader = BufReader::new(part_file); + copy(&mut reader, &mut file)?; } - let mut part_file = OpenOptions::new() - .read(true) - .open(part.path.as_ref().unwrap()) - .await?; - copy(&mut part_file, &mut file).await?; - } - file.sync_data().await?; - Ok(()) + file.sync_data()?; + Ok(()) + }) + .await? } async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { - // Let's remove the file itself. - let data_path = PathBuf::from(file_info.path.as_ref().unwrap().clone()); - if !data_path.exists() { - return Err(RustusError::FileNotFound); - } - remove_file(data_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToRemove(file_info.id.clone()) - })?; - Ok(()) + let info = file_info.clone(); + actix_web::rt::task::spawn_blocking(move || { + // Let's remove the file itself. + let data_path = PathBuf::from(info.path.as_ref().unwrap().clone()); + if !data_path.exists() { + return Err(RustusError::FileNotFound); + } + remove_file(data_path).map_err(|err| { + error!("{:?}", err); + RustusError::UnableToRemove(info.id.clone()) + })?; + Ok(()) + }) + .await? } } #[cfg(test)] mod tests { use super::FileStorage; - use crate::info_storages::FileInfo; - use crate::Storage; - use std::fs::File; - use std::io::{Read, Write}; - use std::path::PathBuf; + use crate::{info_storages::FileInfo, Storage}; + use bytes::Bytes; + use std::{ + fs::File, + io::{Read, Write}, + path::PathBuf, + }; #[actix_rt::test] async fn preparation() { let dir = tempdir::TempDir::new("file_storage").unwrap(); let target_path = dir.into_path().join("not_exist"); - let mut storage = FileStorage::new(target_path.clone(), "".into()); + let mut storage = FileStorage::new(target_path.clone(), "".into(), false); assert_eq!(target_path.exists(), false); storage.prepare().await.unwrap(); assert_eq!(target_path.exists(), true); @@ -184,7 +212,7 @@ mod tests { #[actix_rt::test] async fn create_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); let new_path = storage.create_file(&file_info).await.unwrap(); assert!(PathBuf::from(new_path).exists()); @@ -194,7 +222,7 @@ mod tests { async fn create_file_but_it_exists() { let dir = tempdir::TempDir::new("file_storage").unwrap(); let base_path = dir.into_path().clone(); - let storage = FileStorage::new(base_path.clone(), "".into()); + let storage = FileStorage::new(base_path.clone(), "".into(), false); let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); File::create(base_path.join("test_id")).unwrap(); let result = storage.create_file(&file_info).await; @@ -204,13 +232,13 @@ mod tests { #[actix_rt::test] async fn adding_bytes() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let mut file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); let new_path = storage.create_file(&file_info).await.unwrap(); let test_data = "MyTestData"; file_info.path = Some(new_path.clone()); storage - .add_bytes(&file_info, test_data.as_bytes()) + .add_bytes(&file_info, Bytes::from(test_data)) .await .unwrap(); let mut file = File::open(new_path).unwrap(); @@ -222,7 +250,7 @@ mod tests { #[actix_rt::test] async fn adding_bytes_to_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -231,14 +259,14 @@ mod tests { None, ); let test_data = "MyTestData"; - let result = storage.add_bytes(&file_info, test_data.as_bytes()).await; + let result = storage.add_bytes(&file_info, Bytes::from(test_data)).await; assert!(result.is_err()) } #[actix_rt::test] async fn get_contents_of_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -253,7 +281,7 @@ mod tests { #[actix_rt::test] async fn remove_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -268,7 +296,7 @@ mod tests { #[actix_rt::test] async fn success_concatenation() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let mut parts = Vec::new(); let part1_path = storage.data_dir.as_path().join("part1"); diff --git a/src/storages/mod.rs b/src/storages/mod.rs index 083458b..eba4b63 100644 --- a/src/storages/mod.rs +++ b/src/storages/mod.rs @@ -1,5 +1,4 @@ pub mod file_storage; mod models; -pub use models::available_stores::AvailableStores; -pub use models::storage::Storage; +pub use models::{available_stores::AvailableStores, storage::Storage}; diff --git a/src/storages/models/available_stores.rs b/src/storages/models/available_stores.rs index 7bde098..b22c7a9 100644 --- a/src/storages/models/available_stores.rs +++ b/src/storages/models/available_stores.rs @@ -1,5 +1,4 @@ -use crate::storages::file_storage; -use crate::{from_str, RustusConf, Storage}; +use crate::{from_str, storages::file_storage, RustusConf, Storage}; use derive_more::{Display, From}; use strum::EnumIter; @@ -26,6 +25,7 @@ impl AvailableStores { Self::FileStorage => Box::new(file_storage::FileStorage::new( config.storage_opts.data_dir.clone(), config.storage_opts.dir_structure.clone(), + config.storage_opts.force_fsync, )), } } diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs index 0d790b9..2ef0f6b 100644 --- a/src/storages/models/storage.rs +++ b/src/storages/models/storage.rs @@ -1,7 +1,7 @@ -use crate::errors::RustusResult; -use crate::info_storages::FileInfo; +use crate::{errors::RustusResult, info_storages::FileInfo}; use actix_files::NamedFile; use async_trait::async_trait; +use bytes::Bytes; use std::fmt::Display; #[async_trait] @@ -41,7 +41,7 @@ pub trait Storage: Display { /// # Params /// `file_info` - info about current file. /// `bytes` - bytes to append to the file. - async fn add_bytes(&self, file_info: &FileInfo, bytes: &[u8]) -> RustusResult<()>; + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()>; /// Create file in storage. /// diff --git a/src/utils/dir_struct.rs b/src/utils/dir_struct.rs index 9c18757..96bb690 100644 --- a/src/utils/dir_struct.rs +++ b/src/utils/dir_struct.rs @@ -1,8 +1,7 @@ use chrono::{Datelike, Timelike}; use lazy_static::lazy_static; use log::error; -use std::collections::HashMap; -use std::env; +use std::{collections::HashMap, env}; lazy_static! { /// Freezing ENVS on startup. diff --git a/src/utils/headers.rs b/src/utils/headers.rs index 493b09a..1bec0b8 100644 --- a/src/utils/headers.rs +++ b/src/utils/headers.rs @@ -17,7 +17,7 @@ pub fn parse_header(request: &HttpRequest, header_name: &str) -> Opt .and_then(|value| // Parsing it to string. match value.to_str() { - Ok(header_str) => Some(String::from(header_str)), + Ok(header_str) => Some(header_str), Err(_) => None, }) .and_then(|val|