diff --git a/Cargo.lock b/Cargo.lock index 425ae94..b1636ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,9 +91,9 @@ dependencies = [ [[package]] name = "actix-router" -version = "0.5.0-beta.4" +version = "0.5.0-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b53c1deabdbf3a8a8b9a949123edd3cafb873abd75da96b5933a8b590f9d6dc2" +checksum = "5e0b59ad08167ffbb686ddb495846707231e96908b829b1fc218198ec581e2ad" dependencies = [ "bytestring", "firestorm", @@ -105,9 +105,9 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.5.1" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82cf33e04d9911b39bfb7be3c01309568b4315895d3358372dce64ed2c2bf32d" +checksum = "cdf3f2183be1241ed4dd22611850b85d38de0b08a09f1f7bcccbd0809084b359" dependencies = [ "actix-macros", "futures-core", @@ -116,9 +116,9 @@ dependencies = [ [[package]] name = "actix-server" -version = "2.0.0-rc.3" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9259b4f3cc9ca96d7d91a7da66b7b01c47653a0da5b0ba3f7f45a344480443b" +checksum = "d9e7472ac180abb0a8e592b653744345983a7a14f44691c8394a799d0df4dbbf" dependencies = [ "actix-rt", "actix-service", @@ -155,9 +155,9 @@ dependencies = [ [[package]] name = "actix-web" -version = "4.0.0-beta.19" +version = "4.0.0-beta.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "229d0d2c11d6c734327cf1fbc15dd644b351b440bcb4608349258f5e00605bdc" +checksum = "aa8ba5081e9f8d0016cf34df516c699198158fd8c77990aa284115b055ead61b" dependencies = [ "actix-codec", "actix-http", @@ -2186,16 +2186,15 @@ dependencies = [ [[package]] name = "rbatis" -version = "3.0.29" +version = "3.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c605c2852c546aa377cb7c672679c6e4e345fb938b8407fcb0bfc149d56999" +checksum = "1b8c968bd15606738295d8b71dca35cfef3bb1a124af89dab45a18ed83b19975" dependencies = [ "async-trait", "chrono", "futures", "futures-core", "hex", - "lazy_static", "log", "once_cell", "rand", @@ -2209,9 +2208,9 @@ dependencies = [ [[package]] name = "rbatis-core" -version = "3.0.19" +version = "3.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2c87fa8e1ff3d9ae75b26188d1f7eca3222edbdd65883eff1ae885a7e12c2e" +checksum = "2cd0da1efe628da94506a15dc8e628a6c25b6d8de77fb2d51149ac9c7893153d" dependencies = [ "base64", "bigdecimal", @@ -2482,7 +2481,7 @@ dependencies = [ [[package]] name = "rustus" -version = "0.3.1" +version = "0.4.0" dependencies = [ "actix-files", "actix-web", @@ -2628,9 +2627,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.74" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2bb9cd061c5865d345bb02ca49fcef1391741b672b54a0bf7b679badec3142" +checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79" dependencies = [ "indexmap", "itoa 1.0.1", diff --git a/Cargo.toml b/Cargo.toml index b6abc7d..fcbb70d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustus" -version = "0.3.1" +version = "0.4.0" edition = "2021" description = "TUS protocol implementation written in Rust." @@ -26,7 +26,7 @@ features = ["vendored"] version = "0.6.0-beta.13" [dependencies.actix-web] -version = "^4.0.0-beta.15" +version = "^4.0.0-beta.20" [dependencies.async-std] features = ["tokio1"] diff --git a/src/config.rs b/src/config.rs index 15c1ab7..2adf14d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -144,11 +144,11 @@ pub struct NotificationsOptions { /// [here](https://tus.io/). pub struct RustusConf { /// Rustus server host - #[structopt(short, long, default_value = "0.0.0.0", env = "RUSTUS_HOST")] + #[structopt(short, long, default_value = "0.0.0.0", env = "RUSTUS_SERVER_HOST")] pub host: String, /// Rustus server port - #[structopt(short, long, default_value = "1081", env = "RUSTUS_PORT")] + #[structopt(short, long, default_value = "1081", env = "RUSTUS_SERVER_PORT")] pub port: u16, /// Rustus base API url @@ -177,12 +177,19 @@ pub struct RustusConf { /// Enabled extensions for TUS protocol. #[structopt( long, - default_value = "getting,creation,termination,creation-with-upload,creation-defer-length", + default_value = "getting,creation,termination,creation-with-upload,creation-defer-length,concatenation", env = "RUSTUS_TUS_EXTENSIONS", use_delimiter = true )] pub tus_extensions: Vec, + /// Remove part files after concatenation is done. + /// By default rustus does nothing with part files after concatenation. + /// + /// This parameter is only needed if concatenation extension is enabled. + #[structopt(long, parse(from_flag))] + pub remove_parts: bool, + #[structopt(flatten)] pub storage_opts: StorageOptions, diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs index c593df4..44c55a1 100644 --- a/src/info_storages/models/file_info.rs +++ b/src/info_storages/models/file_info.rs @@ -15,6 +15,9 @@ pub struct FileInfo { #[serde(with = "ts_seconds")] pub created_at: DateTime, pub deferred_size: bool, + pub is_partial: bool, + pub is_final: bool, + pub parts: Option>, pub storage: String, pub metadata: HashMap, } @@ -55,6 +58,9 @@ impl FileInfo { metadata, deferred_size, offset: 0, + is_final: false, + is_partial: false, + parts: None, created_at: chrono::Utc::now(), } } diff --git a/src/main.rs b/src/main.rs index 80a3b61..b05a51b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use log::LevelFilter; use crate::errors::RustusResult; use crate::info_storages::InfoStorage; use crate::notifiers::models::notification_manager::NotificationManager; +use crate::state::State; use config::RustusConf; use crate::storages::Storage; @@ -23,6 +24,7 @@ mod info_storages; mod notifiers; mod protocol; mod routes; +mod state; mod storages; mod utils; @@ -64,35 +66,22 @@ fn greeting(app_conf: &RustusConf) { /// This function may throw an error /// if the server can't be bound to the /// given address. -pub fn create_server( - storage: Box, - info_storage: Box, - app_conf: RustusConf, - notification_manager: NotificationManager, -) -> Result { - let host = app_conf.host.clone(); - let port = app_conf.port; - let workers = app_conf.workers; - let app_conf_data = web::Data::new(app_conf.clone()); - let info_storage_data: web::Data> = - web::Data::from(Arc::new(info_storage)); - let storage_data: web::Data> = - web::Data::from(Arc::new(storage)); - let manager_data: web::Data> = - web::Data::from(Arc::new(Box::new(notification_manager))); +pub fn create_server(state: State) -> Result { + let host = state.config.host.clone(); + let port = state.config.port; + let config = state.config.clone(); + let workers = state.config.workers; + let state_data: web::Data = web::Data::from(Arc::new(state)); let mut server = HttpServer::new(move || { App::new() - .app_data(app_conf_data.clone()) - .app_data(storage_data.clone()) - .app_data(manager_data.clone()) - .app_data(info_storage_data.clone()) + .app_data(state_data.clone()) // Adds all routes. - .configure(protocol::setup(app_conf.clone())) + .configure(protocol::setup(config.clone())) // Main middleware that appends TUS headers. .wrap( middleware::DefaultHeaders::new() .add(("Tus-Resumable", "1.0.0")) - .add(("Tus-Max-Size", app_conf.max_body_size.to_string())) + .add(("Tus-Max-Size", config.max_body_size.to_string())) .add(("Tus-Version", "1.0.0")), ) .wrap(middleware::Logger::new("\"%r\" \"-\" \"%s\" \"%a\" \"%D\"")) @@ -176,6 +165,11 @@ async fn main() -> std::io::Result<()> { let notification_manager = NotificationManager::new(&app_conf).await?; // Creating actual server and running it. - let server = create_server(storage, info_storage, app_conf, notification_manager)?; + let server = create_server(State::new( + app_conf.clone(), + storage, + info_storage, + notification_manager, + ))?; server.await } diff --git a/src/notifiers/models/message_format.rs b/src/notifiers/models/message_format.rs index 6c9fb9c..6d80200 100644 --- a/src/notifiers/models/message_format.rs +++ b/src/notifiers/models/message_format.rs @@ -63,9 +63,9 @@ impl From for TusdFileInfo { offset: file_info.offset, size: file_info.length, size_is_deferred: deferred_size, - is_final: true, - is_partial: false, - partial_uploads: None, + is_final: file_info.is_final, + is_partial: file_info.is_partial, + partial_uploads: file_info.parts, metadata: file_info.metadata, storage: TusdStorageInfo { storage_type: file_info.storage, diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 79b538b..84509da 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -22,6 +22,7 @@ impl NotificationManager { notifiers: Vec::new(), }; debug!("Initializing notification manager."); + #[cfg(feature = "file_notifiers")] if tus_config.notification_opts.hooks_file.is_some() { debug!("Found hooks file"); manager.notifiers.push(Box::new(FileNotifier::new( diff --git a/src/protocol/core/routes.rs b/src/protocol/core/routes.rs index f9f282d..d910f67 100644 --- a/src/protocol/core/routes.rs +++ b/src/protocol/core/routes.rs @@ -4,7 +4,7 @@ use crate::errors::RustusError; use crate::notifiers::Hook; use crate::protocol::extensions::Extensions; use crate::utils::headers::{check_header, parse_header}; -use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; +use crate::{RustusConf, State}; #[allow(clippy::needless_pass_by_value)] pub fn server_info(app_conf: web::Data) -> HttpResponse { @@ -16,27 +16,52 @@ pub fn server_info(app_conf: web::Data) -> HttpResponse { .join(","); HttpResponse::Ok() .insert_header(("Tus-Extension", ext_str.as_str())) - .body("") + .finish() } pub async fn get_file_info( - info_storage: web::Data>, - storage: web::Data>, + state: web::Data, request: HttpRequest, ) -> actix_web::Result { // Getting file id from URL. if request.match_info().get("file_id").is_none() { - return Ok(HttpResponse::NotFound().body("")); + return Ok(HttpResponse::NotFound().body("No file id provided.")); } let file_id = request.match_info().get("file_id").unwrap(); // Getting file info from info_storage. - let file_info = info_storage.get_info(file_id).await?; - if file_info.storage != storage.to_string() { - return Ok(HttpResponse::NotFound().body("")); + let file_info = state.info_storage.get_info(file_id).await?; + if file_info.storage != state.data_storage.to_string() { + return Ok(HttpResponse::NotFound().body("File not found.")); } let mut builder = HttpResponse::Ok(); + if file_info.is_partial { + builder.insert_header(("Upload-Concat", "partial")); + } + if file_info.is_final && file_info.parts.is_some() { + #[allow(clippy::or_fun_call)] + let parts = file_info + .parts + .clone() + .unwrap() + .iter() + .map(|file| { + format!( + "{}/{}", + state + .config + .base_url() + .strip_suffix('/') + .unwrap_or(state.config.base_url().as_str()), + file.as_str() + ) + }) + .collect::>() + .join(" "); + builder.insert_header(("Upload-Concat", format!("final; {}", parts))); + } builder + .no_chunking(file_info.offset as u64) .insert_header(("Upload-Offset", file_info.offset.to_string())) .insert_header(("Content-Length", file_info.offset.to_string())); // Upload length is known. @@ -48,35 +73,34 @@ pub async fn get_file_info( if let Some(meta) = file_info.get_metadata_string() { builder.insert_header(("Upload-Metadata", meta)); } - Ok(builder.body("")) + Ok(builder.finish()) } pub async fn write_bytes( request: HttpRequest, bytes: Bytes, - storage: web::Data>, - info_storage: web::Data>, - notification_manager: web::Data>, - app_conf: web::Data, + state: web::Data, ) -> actix_web::Result { // Checking if request has required headers. - if !check_header(&request, "Content-Type", "application/offset+octet-stream") { - return Ok(HttpResponse::UnsupportedMediaType().body("")); + let check_content_type = |val: &str| val == "application/offset+octet-stream"; + if !check_header(&request, "Content-Type", check_content_type) { + return Ok(HttpResponse::UnsupportedMediaType().body("Unknown content-type.")); } // Getting current offset. let offset: Option = parse_header(&request, "Upload-Offset"); if offset.is_none() { - return Ok(HttpResponse::UnsupportedMediaType().body("")); + return Ok(HttpResponse::UnsupportedMediaType().body("No offset provided.")); } if request.match_info().get("file_id").is_none() { - return Ok(HttpResponse::NotFound().body("")); + return Ok(HttpResponse::NotFound().body("No file id provided.")); } // New upload length. // Parses header `Upload-Length` only if the creation-defer-length extension is enabled. - let updated_len = if app_conf + let updated_len = if state + .config .extensions_vec() .contains(&Extensions::CreationDeferLength) { @@ -87,15 +111,20 @@ pub async fn write_bytes( let file_id = request.match_info().get("file_id").unwrap(); // Getting file info. - let mut file_info = info_storage.get_info(file_id).await?; + let mut file_info = state.info_storage.get_info(file_id).await?; + + // According to TUS protocol you can't update final uploads. + if file_info.is_final { + return Ok(HttpResponse::Forbidden().finish()); + } // Checking if file was stored in the same storage. - if file_info.storage != storage.to_string() { - return Ok(HttpResponse::NotFound().body("")); + if file_info.storage != state.data_storage.to_string() { + return Ok(HttpResponse::NotFound().finish()); } // Checking if offset from request is the same as the real offset. if offset.unwrap() != file_info.offset { - return Ok(HttpResponse::Conflict().body("")); + return Ok(HttpResponse::Conflict().finish()); } // If someone want to update file length. @@ -125,29 +154,34 @@ pub async fn write_bytes( } // Appending bytes to file. - storage.add_bytes(&file_info, bytes.as_ref()).await?; + state + .data_storage + .add_bytes(&file_info, bytes.as_ref()) + .await?; // Updating offset. file_info.offset += bytes.len(); // Saving info to info storage. - info_storage.set_info(&file_info, false).await?; + state.info_storage.set_info(&file_info, false).await?; let mut hook = Hook::PostReceive; if file_info.length == Some(file_info.offset) { hook = Hook::PostFinish; } - if app_conf.hook_is_active(hook) { - let message = app_conf + if state.config.hook_is_active(hook) { + let message = state + .config .notification_opts .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); tokio::spawn(async move { - notification_manager + state + .notification_manager .send_message(message, hook, &headers) .await }); } Ok(HttpResponse::NoContent() .insert_header(("Upload-Offset", file_info.offset.to_string())) - .body("")) + .finish()) } diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 61bbdca..21c710d 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -7,7 +7,7 @@ use crate::info_storages::FileInfo; use crate::notifiers::Hook; use crate::protocol::extensions::Extensions; use crate::utils::headers::{check_header, parse_header}; -use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; +use crate::State; /// Get metadata info from request. /// @@ -49,6 +49,17 @@ fn get_metadata(request: &HttpRequest) -> Option> { }) } +fn get_upload_parts(request: &HttpRequest) -> Vec { + let concat_header = request.headers().get("Upload-Concat").unwrap(); + let header_str = concat_header.to_str().unwrap(); + let urls = header_str.strip_prefix("final;").unwrap(); + + urls.split(' ') + .filter_map(|val: &str| val.split('/').last().map(String::from)) + .filter(|val| val.trim() != "") + .collect() +} + /// Create file. /// /// This method allows you to create file to start uploading. @@ -57,29 +68,35 @@ fn get_metadata(request: &HttpRequest) -> Option> { /// you don't know actual file length and /// you can upload first bytes if creation-with-upload /// extension is enabled. +#[allow(clippy::too_many_lines)] pub async fn create_file( - storage: web::Data>, - info_storage: web::Data>, - notification_manager: web::Data>, - app_conf: web::Data, + state: web::Data, request: HttpRequest, bytes: Bytes, ) -> actix_web::Result { // Getting Upload-Length header value as usize. let length = parse_header(&request, "Upload-Length"); // Checking Upload-Defer-Length header. - let defer_size = check_header(&request, "Upload-Defer-Length", "1"); + let defer_size = check_header(&request, "Upload-Defer-Length", |val| val == "1"); // Indicator that creation-defer-length is enabled. - let defer_ext = app_conf + let defer_ext = state + .config .extensions_vec() .contains(&Extensions::CreationDeferLength); + let is_final = check_header(&request, "Upload-Concat", |val| val.starts_with("final;")); + + let concat_ext = state + .config + .extensions_vec() + .contains(&Extensions::Concatenation); + // Check that Upload-Length header is provided. // Otherwise checking that defer-size feature is enabled // and header provided. - if length.is_none() && (defer_ext && !defer_size) { - return Ok(HttpResponse::BadRequest().body("")); + if length.is_none() && !((defer_ext && defer_size) || (concat_ext && is_final)) { + return Ok(HttpResponse::BadRequest().body("Upload-Length header is required")); } let meta = get_metadata(&request); @@ -89,44 +106,97 @@ pub async fn create_file( file_id.as_str(), length, None, - storage.to_string(), + state.data_storage.to_string(), meta.clone(), ); - if app_conf.hook_is_active(Hook::PreCreate) { - let message = app_conf + let is_partial = check_header(&request, "Upload-Concat", |val| val == "partial"); + + if concat_ext { + if is_final { + file_info.is_final = true; + file_info.parts = Some(get_upload_parts(&request)); + file_info.deferred_size = false; + } + if is_partial { + file_info.is_partial = true; + } + } + + if state.config.hook_is_active(Hook::PreCreate) { + let message = state + .config .notification_opts .hooks_format .format(&request, &file_info)?; let headers = request.headers(); - notification_manager + state + .notification_manager .send_message(message, Hook::PreCreate, headers) .await?; } // Create file and get the it's path. - file_info.path = Some(storage.create_file(&file_info).await?); + file_info.path = Some(state.data_storage.create_file(&file_info).await?); + + if file_info.is_final { + let mut final_size = 0; + let mut parts_info = Vec::new(); + for part_id in file_info.clone().parts.unwrap() { + let part = state.info_storage.get_info(part_id.as_str()).await?; + if part.length != Some(part.offset) { + return Ok( + HttpResponse::BadRequest().body(format!("{} upload is not complete.", part.id)) + ); + } + if !part.is_partial { + return Ok( + HttpResponse::BadRequest().body(format!("{} upload is not partial.", part.id)) + ); + } + final_size += &part.length.unwrap(); + parts_info.push(part.clone()); + } + state + .data_storage + .concat_files(&file_info, parts_info.clone()) + .await?; + file_info.offset = final_size; + file_info.length = Some(final_size); + if state.config.remove_parts { + for part in parts_info { + state.data_storage.remove_file(&part).await?; + state.info_storage.remove_info(part.id.as_str()).await?; + } + } + } // Create upload URL for this file. let upload_url = request.url_for("core:write_bytes", &[file_info.id.clone()])?; // Checking if creation-with-upload extension is enabled. - let with_upload = app_conf + let with_upload = state + .config .extensions_vec() .contains(&Extensions::CreationWithUpload); - if with_upload && !bytes.is_empty() { - if !check_header(&request, "Content-Type", "application/offset+octet-stream") { - return Ok(HttpResponse::BadRequest().body("")); + if with_upload && !bytes.is_empty() && !(concat_ext && is_final) { + let octet_stream = |val: &str| val == "application/offset+octet-stream"; + if !check_header(&request, "Content-Type", octet_stream) { + return Ok(HttpResponse::BadRequest().finish()); } // Writing first bytes. - storage.add_bytes(&file_info, bytes.as_ref()).await?; + state + .data_storage + .add_bytes(&file_info, bytes.as_ref()) + .await?; file_info.offset += bytes.len(); } - info_storage.set_info(&file_info, true).await?; + state.info_storage.set_info(&file_info, true).await?; - if app_conf.hook_is_active(Hook::PostCreate) { - let message = app_conf + if state.config.hook_is_active(Hook::PostCreate) { + let message = state + .config .notification_opts .hooks_format .format(&request, &file_info)?; @@ -134,7 +204,8 @@ pub async fn create_file( // Adding send_message task to tokio reactor. // Thin function would be executed in background. tokio::spawn(async move { - notification_manager + state + .notification_manager .send_message(message, Hook::PostCreate, &headers) .await }); @@ -143,5 +214,5 @@ pub async fn create_file( Ok(HttpResponse::Created() .insert_header(("Location", upload_url.as_str())) .insert_header(("Upload-Offset", file_info.offset.to_string())) - .body("")) + .finish()) } diff --git a/src/protocol/extensions.rs b/src/protocol/extensions.rs index 5624b83..4eba162 100644 --- a/src/protocol/extensions.rs +++ b/src/protocol/extensions.rs @@ -14,6 +14,8 @@ pub enum Extensions { Creation, #[display(fmt = "termination")] Termination, + #[display(fmt = "concatenation")] + Concatenation, #[display(fmt = "getting")] Getting, } diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs index 44186c8..97af31a 100644 --- a/src/protocol/getting/routes.rs +++ b/src/protocol/getting/routes.rs @@ -1,23 +1,19 @@ use actix_web::{web, HttpRequest, Responder}; use crate::errors::RustusError; -use crate::{InfoStorage, Storage}; +use crate::State; /// Retrieve actual file. /// /// This method allows you to download files directly from storage. -pub async fn get_file( - request: HttpRequest, - storage: web::Data>, - info_storage: web::Data>, -) -> impl Responder { +pub async fn get_file(request: HttpRequest, state: web::Data) -> impl Responder { let file_id_opt = request.match_info().get("file_id").map(String::from); if let Some(file_id) = file_id_opt { - let file_info = info_storage.get_info(file_id.as_str()).await?; - if file_info.storage != storage.to_string() { + let file_info = state.info_storage.get_info(file_id.as_str()).await?; + if file_info.storage != state.data_storage.to_string() { return Err(RustusError::FileNotFound); } - storage.get_contents(&file_info).await + state.data_storage.get_contents(&file_info).await } else { Err(RustusError::FileNotFound) } diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs index 16d8094..8e70f9d 100644 --- a/src/protocol/termination/routes.rs +++ b/src/protocol/termination/routes.rs @@ -2,39 +2,38 @@ use actix_web::{web, HttpRequest, HttpResponse}; use crate::errors::RustusResult; use crate::notifiers::Hook; -use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; +use crate::State; /// Terminate uploading. /// /// This method will remove all data by id. /// It removes info and actual data. pub async fn terminate( - storage: web::Data>, - info_storage: web::Data>, request: HttpRequest, - notification_manager: web::Data>, - app_conf: web::Data, + state: web::Data, ) -> RustusResult { let file_id_opt = request.match_info().get("file_id").map(String::from); if let Some(file_id) = file_id_opt { - let file_info = info_storage.get_info(file_id.as_str()).await?; - if file_info.storage != storage.to_string() { - return Ok(HttpResponse::NotFound().body("")); + let file_info = state.info_storage.get_info(file_id.as_str()).await?; + if file_info.storage != state.data_storage.to_string() { + return Ok(HttpResponse::NotFound().finish()); } - info_storage.remove_info(file_id.as_str()).await?; - storage.remove_file(&file_info).await?; - if app_conf.hook_is_active(Hook::PostTerminate) { - let message = app_conf + state.info_storage.remove_info(file_id.as_str()).await?; + state.data_storage.remove_file(&file_info).await?; + if state.config.hook_is_active(Hook::PostTerminate) { + let message = state + .config .notification_opts .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); tokio::spawn(async move { - notification_manager + state + .notification_manager .send_message(message, Hook::PostTerminate, &headers) .await }); } } - Ok(HttpResponse::NoContent().body("")) + Ok(HttpResponse::NoContent().finish()) } diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..16bc4f2 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,24 @@ +use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; + +pub struct State { + pub config: RustusConf, + pub data_storage: Box, + pub info_storage: Box, + pub notification_manager: NotificationManager, +} + +impl State { + pub fn new( + config: RustusConf, + data_storage: Box, + info_storage: Box, + notification_manager: NotificationManager, + ) -> Self { + Self { + config, + data_storage, + info_storage, + notification_manager, + } + } +} diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index 8d8c989..12d8b15 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -1,7 +1,8 @@ use std::path::PathBuf; use actix_files::NamedFile; -use async_std::fs::{remove_file, DirBuilder, OpenOptions}; +use async_std::fs::{remove_file, DirBuilder, File, OpenOptions}; +use async_std::io::copy; use async_std::prelude::*; use async_trait::async_trait; use log::error; @@ -128,6 +129,33 @@ impl Storage for FileStorage { Ok(file_path.display().to_string()) } + async fn concat_files( + &self, + file_info: &FileInfo, + parts_info: Vec, + ) -> RustusResult<()> { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(false) + .create_new(false) + .open(file_info.path.as_ref().unwrap().clone()) + .await + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + for part in parts_info { + if part.path.is_none() { + return Err(RustusError::FileNotFound); + } + let mut part_file = File::open(part.path.as_ref().unwrap()).await?; + copy(&mut part_file, &mut file).await?; + } + file.sync_data().await?; + Ok(()) + } + async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { // Let's remove the file itself. let data_path = PathBuf::from(file_info.path.as_ref().unwrap().clone()); diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs index e129443..0d790b9 100644 --- a/src/storages/models/storage.rs +++ b/src/storages/models/storage.rs @@ -53,6 +53,21 @@ pub trait Storage: Display { /// `file_info` - info about current file. async fn create_file(&self, file_info: &FileInfo) -> RustusResult; + /// Concatenate files. + /// + /// This method is used to merge multiple files together. + /// + /// This function is used by concat extension of the protocol. + /// + /// # Params + /// `file_info` - info about current file. + /// `parts_info` - info about merged files. + async fn concat_files( + &self, + file_info: &FileInfo, + parts_info: Vec, + ) -> RustusResult<()>; + /// Remove file from storage /// /// This method removes file and all associated diff --git a/src/utils/headers.rs b/src/utils/headers.rs index 4a5b678..f2b0c42 100644 --- a/src/utils/headers.rs +++ b/src/utils/headers.rs @@ -28,15 +28,15 @@ pub fn parse_header(request: &HttpRequest, header_name: &str) -> Opt }) } -/// Check that header's value is equal to some value. +/// Check that header value satisfies some predicate. /// -/// Returns false if header is not present or values don't match. -pub fn check_header(request: &HttpRequest, header_name: &str, value: &str) -> bool { +/// Passes header as a parameter to expr if header is present. +pub fn check_header(request: &HttpRequest, header_name: &str, expr: fn(&str) -> bool) -> bool { request .headers() .get(header_name) .and_then(|header_val| match header_val.to_str() { - Ok(val) => Some(val == value), + Ok(val) => Some(expr(val)), Err(_) => None, }) .unwrap_or(false)