From e77986e4dca5924c67b7221c43678a259bba2d73 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Mon, 10 Jan 2022 10:31:02 +0400 Subject: [PATCH 1/9] Bumped version to 0.4.0. Signed-off-by: Pavel Kirilin --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 425ae94..3913587 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2482,7 +2482,7 @@ dependencies = [ [[package]] name = "rustus" -version = "0.3.1" +version = "0.4.0" dependencies = [ "actix-files", "actix-web", diff --git a/Cargo.toml b/Cargo.toml index b6abc7d..e94783a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustus" -version = "0.3.1" +version = "0.4.0" edition = "2021" description = "TUS protocol implementation written in Rust." From 781bb04a5c6c84e21b96926a317c6bdc3910a46b Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 11 Jan 2022 01:25:42 +0400 Subject: [PATCH 2/9] Updated application state. Signed-off-by: Pavel Kirilin --- src/main.rs | 40 +++++++++++++----------------- src/protocol/core/routes.rs | 36 ++++++++++++++------------- src/protocol/creation/routes.rs | 40 +++++++++++++++++------------- src/protocol/getting/routes.rs | 14 ++++------- src/protocol/termination/routes.rs | 23 ++++++++--------- src/state.rs | 24 ++++++++++++++++++ 6 files changed, 99 insertions(+), 78 deletions(-) create mode 100644 src/state.rs diff --git a/src/main.rs b/src/main.rs index 80a3b61..b05a51b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use log::LevelFilter; use crate::errors::RustusResult; use crate::info_storages::InfoStorage; use crate::notifiers::models::notification_manager::NotificationManager; +use crate::state::State; use config::RustusConf; use crate::storages::Storage; @@ -23,6 +24,7 @@ mod info_storages; mod notifiers; mod protocol; mod routes; +mod state; mod storages; mod utils; @@ -64,35 +66,22 @@ fn greeting(app_conf: &RustusConf) { /// This function may throw an error /// if the server can't be bound to the /// given address. -pub fn create_server( - storage: Box, - info_storage: Box, - app_conf: RustusConf, - notification_manager: NotificationManager, -) -> Result { - let host = app_conf.host.clone(); - let port = app_conf.port; - let workers = app_conf.workers; - let app_conf_data = web::Data::new(app_conf.clone()); - let info_storage_data: web::Data> = - web::Data::from(Arc::new(info_storage)); - let storage_data: web::Data> = - web::Data::from(Arc::new(storage)); - let manager_data: web::Data> = - web::Data::from(Arc::new(Box::new(notification_manager))); +pub fn create_server(state: State) -> Result { + let host = state.config.host.clone(); + let port = state.config.port; + let config = state.config.clone(); + let workers = state.config.workers; + let state_data: web::Data = web::Data::from(Arc::new(state)); let mut server = HttpServer::new(move || { App::new() - .app_data(app_conf_data.clone()) - .app_data(storage_data.clone()) - .app_data(manager_data.clone()) - .app_data(info_storage_data.clone()) + .app_data(state_data.clone()) // Adds all routes. - .configure(protocol::setup(app_conf.clone())) + .configure(protocol::setup(config.clone())) // Main middleware that appends TUS headers. .wrap( middleware::DefaultHeaders::new() .add(("Tus-Resumable", "1.0.0")) - .add(("Tus-Max-Size", app_conf.max_body_size.to_string())) + .add(("Tus-Max-Size", config.max_body_size.to_string())) .add(("Tus-Version", "1.0.0")), ) .wrap(middleware::Logger::new("\"%r\" \"-\" \"%s\" \"%a\" \"%D\"")) @@ -176,6 +165,11 @@ async fn main() -> std::io::Result<()> { let notification_manager = NotificationManager::new(&app_conf).await?; // Creating actual server and running it. - let server = create_server(storage, info_storage, app_conf, notification_manager)?; + let server = create_server(State::new( + app_conf.clone(), + storage, + info_storage, + notification_manager, + ))?; server.await } diff --git a/src/protocol/core/routes.rs b/src/protocol/core/routes.rs index f9f282d..72fb39e 100644 --- a/src/protocol/core/routes.rs +++ b/src/protocol/core/routes.rs @@ -4,7 +4,7 @@ use crate::errors::RustusError; use crate::notifiers::Hook; use crate::protocol::extensions::Extensions; use crate::utils::headers::{check_header, parse_header}; -use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; +use crate::{RustusConf, State}; #[allow(clippy::needless_pass_by_value)] pub fn server_info(app_conf: web::Data) -> HttpResponse { @@ -20,8 +20,7 @@ pub fn server_info(app_conf: web::Data) -> HttpResponse { } pub async fn get_file_info( - info_storage: web::Data>, - storage: web::Data>, + state: web::Data, request: HttpRequest, ) -> actix_web::Result { // Getting file id from URL. @@ -31,8 +30,8 @@ pub async fn get_file_info( let file_id = request.match_info().get("file_id").unwrap(); // Getting file info from info_storage. - let file_info = info_storage.get_info(file_id).await?; - if file_info.storage != storage.to_string() { + let file_info = state.info_storage.get_info(file_id).await?; + if file_info.storage != state.data_storage.to_string() { return Ok(HttpResponse::NotFound().body("")); } let mut builder = HttpResponse::Ok(); @@ -54,10 +53,7 @@ pub async fn get_file_info( pub async fn write_bytes( request: HttpRequest, bytes: Bytes, - storage: web::Data>, - info_storage: web::Data>, - notification_manager: web::Data>, - app_conf: web::Data, + state: web::Data, ) -> actix_web::Result { // Checking if request has required headers. if !check_header(&request, "Content-Type", "application/offset+octet-stream") { @@ -76,7 +72,8 @@ pub async fn write_bytes( // New upload length. // Parses header `Upload-Length` only if the creation-defer-length extension is enabled. - let updated_len = if app_conf + let updated_len = if state + .config .extensions_vec() .contains(&Extensions::CreationDeferLength) { @@ -87,10 +84,10 @@ pub async fn write_bytes( let file_id = request.match_info().get("file_id").unwrap(); // Getting file info. - let mut file_info = info_storage.get_info(file_id).await?; + let mut file_info = state.info_storage.get_info(file_id).await?; // Checking if file was stored in the same storage. - if file_info.storage != storage.to_string() { + if file_info.storage != state.data_storage.to_string() { return Ok(HttpResponse::NotFound().body("")); } // Checking if offset from request is the same as the real offset. @@ -125,24 +122,29 @@ pub async fn write_bytes( } // Appending bytes to file. - storage.add_bytes(&file_info, bytes.as_ref()).await?; + state + .data_storage + .add_bytes(&file_info, bytes.as_ref()) + .await?; // Updating offset. file_info.offset += bytes.len(); // Saving info to info storage. - info_storage.set_info(&file_info, false).await?; + state.info_storage.set_info(&file_info, false).await?; let mut hook = Hook::PostReceive; if file_info.length == Some(file_info.offset) { hook = Hook::PostFinish; } - if app_conf.hook_is_active(hook) { - let message = app_conf + if state.config.hook_is_active(hook) { + let message = state + .config .notification_opts .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); tokio::spawn(async move { - notification_manager + state + .notification_manager .send_message(message, hook, &headers) .await }); diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 61bbdca..0e8a30a 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -7,7 +7,7 @@ use crate::info_storages::FileInfo; use crate::notifiers::Hook; use crate::protocol::extensions::Extensions; use crate::utils::headers::{check_header, parse_header}; -use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; +use crate::State; /// Get metadata info from request. /// @@ -58,10 +58,7 @@ fn get_metadata(request: &HttpRequest) -> Option> { /// you can upload first bytes if creation-with-upload /// extension is enabled. pub async fn create_file( - storage: web::Data>, - info_storage: web::Data>, - notification_manager: web::Data>, - app_conf: web::Data, + state: web::Data, request: HttpRequest, bytes: Bytes, ) -> actix_web::Result { @@ -71,7 +68,8 @@ pub async fn create_file( let defer_size = check_header(&request, "Upload-Defer-Length", "1"); // Indicator that creation-defer-length is enabled. - let defer_ext = app_conf + let defer_ext = state + .config .extensions_vec() .contains(&Extensions::CreationDeferLength); @@ -89,29 +87,32 @@ pub async fn create_file( file_id.as_str(), length, None, - storage.to_string(), + state.data_storage.to_string(), meta.clone(), ); - if app_conf.hook_is_active(Hook::PreCreate) { - let message = app_conf + if state.config.hook_is_active(Hook::PreCreate) { + let message = state + .config .notification_opts .hooks_format .format(&request, &file_info)?; let headers = request.headers(); - notification_manager + state + .notification_manager .send_message(message, Hook::PreCreate, headers) .await?; } // Create file and get the it's path. - file_info.path = Some(storage.create_file(&file_info).await?); + file_info.path = Some(state.data_storage.create_file(&file_info).await?); // Create upload URL for this file. let upload_url = request.url_for("core:write_bytes", &[file_info.id.clone()])?; // Checking if creation-with-upload extension is enabled. - let with_upload = app_conf + let with_upload = state + .config .extensions_vec() .contains(&Extensions::CreationWithUpload); if with_upload && !bytes.is_empty() { @@ -119,14 +120,18 @@ pub async fn create_file( return Ok(HttpResponse::BadRequest().body("")); } // Writing first bytes. - storage.add_bytes(&file_info, bytes.as_ref()).await?; + state + .data_storage + .add_bytes(&file_info, bytes.as_ref()) + .await?; file_info.offset += bytes.len(); } - info_storage.set_info(&file_info, true).await?; + state.info_storage.set_info(&file_info, true).await?; - if app_conf.hook_is_active(Hook::PostCreate) { - let message = app_conf + if state.config.hook_is_active(Hook::PostCreate) { + let message = state + .config .notification_opts .hooks_format .format(&request, &file_info)?; @@ -134,7 +139,8 @@ pub async fn create_file( // Adding send_message task to tokio reactor. // Thin function would be executed in background. tokio::spawn(async move { - notification_manager + state + .notification_manager .send_message(message, Hook::PostCreate, &headers) .await }); diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs index 44186c8..97af31a 100644 --- a/src/protocol/getting/routes.rs +++ b/src/protocol/getting/routes.rs @@ -1,23 +1,19 @@ use actix_web::{web, HttpRequest, Responder}; use crate::errors::RustusError; -use crate::{InfoStorage, Storage}; +use crate::State; /// Retrieve actual file. /// /// This method allows you to download files directly from storage. -pub async fn get_file( - request: HttpRequest, - storage: web::Data>, - info_storage: web::Data>, -) -> impl Responder { +pub async fn get_file(request: HttpRequest, state: web::Data) -> impl Responder { let file_id_opt = request.match_info().get("file_id").map(String::from); if let Some(file_id) = file_id_opt { - let file_info = info_storage.get_info(file_id.as_str()).await?; - if file_info.storage != storage.to_string() { + let file_info = state.info_storage.get_info(file_id.as_str()).await?; + if file_info.storage != state.data_storage.to_string() { return Err(RustusError::FileNotFound); } - storage.get_contents(&file_info).await + state.data_storage.get_contents(&file_info).await } else { Err(RustusError::FileNotFound) } diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs index 16d8094..dd302e8 100644 --- a/src/protocol/termination/routes.rs +++ b/src/protocol/termination/routes.rs @@ -2,35 +2,34 @@ use actix_web::{web, HttpRequest, HttpResponse}; use crate::errors::RustusResult; use crate::notifiers::Hook; -use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; +use crate::State; /// Terminate uploading. /// /// This method will remove all data by id. /// It removes info and actual data. pub async fn terminate( - storage: web::Data>, - info_storage: web::Data>, request: HttpRequest, - notification_manager: web::Data>, - app_conf: web::Data, + state: web::Data, ) -> RustusResult { let file_id_opt = request.match_info().get("file_id").map(String::from); if let Some(file_id) = file_id_opt { - let file_info = info_storage.get_info(file_id.as_str()).await?; - if file_info.storage != storage.to_string() { + let file_info = state.info_storage.get_info(file_id.as_str()).await?; + if file_info.storage != state.data_storage.to_string() { return Ok(HttpResponse::NotFound().body("")); } - info_storage.remove_info(file_id.as_str()).await?; - storage.remove_file(&file_info).await?; - if app_conf.hook_is_active(Hook::PostTerminate) { - let message = app_conf + state.info_storage.remove_info(file_id.as_str()).await?; + state.data_storage.remove_file(&file_info).await?; + if state.config.hook_is_active(Hook::PostTerminate) { + let message = state + .config .notification_opts .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); tokio::spawn(async move { - notification_manager + state + .notification_manager .send_message(message, Hook::PostTerminate, &headers) .await }); diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..16bc4f2 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,24 @@ +use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; + +pub struct State { + pub config: RustusConf, + pub data_storage: Box, + pub info_storage: Box, + pub notification_manager: NotificationManager, +} + +impl State { + pub fn new( + config: RustusConf, + data_storage: Box, + info_storage: Box, + notification_manager: NotificationManager, + ) -> Self { + Self { + config, + data_storage, + info_storage, + notification_manager, + } + } +} From 9f42b33fa97f76e3d640b15db6f0176c71d2fd56 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 20 Jan 2022 00:35:43 +0400 Subject: [PATCH 3/9] Fixed missing cfg macro. Signed-off-by: Pavel Kirilin --- src/notifiers/models/notification_manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 79b538b..84509da 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -22,6 +22,7 @@ impl NotificationManager { notifiers: Vec::new(), }; debug!("Initializing notification manager."); + #[cfg(feature = "file_notifiers")] if tus_config.notification_opts.hooks_file.is_some() { debug!("Found hooks file"); manager.notifiers.push(Box::new(FileNotifier::new( From 46d199419568d668198c353e6cb151ba09bdd8d3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 17 Jan 2022 13:21:09 +0000 Subject: [PATCH 4/9] Bump rbatis from 3.0.29 to 3.0.30 Bumps [rbatis](https://github.com/rbatis/rbatis) from 3.0.29 to 3.0.30. - [Release notes](https://github.com/rbatis/rbatis/releases) - [Commits](https://github.com/rbatis/rbatis/compare/v3.0.29...v3.0.30) --- updated-dependencies: - dependency-name: rbatis dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- Cargo.lock | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3913587..5308e96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2186,16 +2186,15 @@ dependencies = [ [[package]] name = "rbatis" -version = "3.0.29" +version = "3.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c605c2852c546aa377cb7c672679c6e4e345fb938b8407fcb0bfc149d56999" +checksum = "1b8c968bd15606738295d8b71dca35cfef3bb1a124af89dab45a18ed83b19975" dependencies = [ "async-trait", "chrono", "futures", "futures-core", "hex", - "lazy_static", "log", "once_cell", "rand", @@ -2209,9 +2208,9 @@ dependencies = [ [[package]] name = "rbatis-core" -version = "3.0.19" +version = "3.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2c87fa8e1ff3d9ae75b26188d1f7eca3222edbdd65883eff1ae885a7e12c2e" +checksum = "2cd0da1efe628da94506a15dc8e628a6c25b6d8de77fb2d51149ac9c7893153d" dependencies = [ "base64", "bigdecimal", From dfad64faac8b9461ed65bc51c94bdd05fe8f2eb7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 17 Jan 2022 13:20:58 +0000 Subject: [PATCH 5/9] Bump serde_json from 1.0.74 to 1.0.75 Bumps [serde_json](https://github.com/serde-rs/json) from 1.0.74 to 1.0.75. - [Release notes](https://github.com/serde-rs/json/releases) - [Commits](https://github.com/serde-rs/json/compare/v1.0.74...v1.0.75) --- updated-dependencies: - dependency-name: serde_json dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5308e96..1163a4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2627,9 +2627,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.74" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2bb9cd061c5865d345bb02ca49fcef1391741b672b54a0bf7b679badec3142" +checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79" dependencies = [ "indexmap", "itoa 1.0.1", From 002755eb889d77e2a8f9be41a379d515a0eb6afd Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 20 Jan 2022 18:19:01 +0400 Subject: [PATCH 6/9] Initial concat protocol. Signed-off-by: Pavel Kirilin --- src/config.rs | 2 +- src/info_storages/models/file_info.rs | 6 +++ src/notifiers/models/message_format.rs | 6 +-- src/protocol/core/routes.rs | 3 +- src/protocol/creation/routes.rs | 68 ++++++++++++++++++++++++-- src/protocol/extensions.rs | 2 + src/storages/file_storage.rs | 30 +++++++++++- src/storages/models/storage.rs | 15 ++++++ src/utils/headers.rs | 8 +-- 9 files changed, 125 insertions(+), 15 deletions(-) diff --git a/src/config.rs b/src/config.rs index 15c1ab7..2a22ab3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -177,7 +177,7 @@ pub struct RustusConf { /// Enabled extensions for TUS protocol. #[structopt( long, - default_value = "getting,creation,termination,creation-with-upload,creation-defer-length", + default_value = "getting,creation,termination,creation-with-upload,creation-defer-length,concatenation", env = "RUSTUS_TUS_EXTENSIONS", use_delimiter = true )] diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs index c593df4..44c55a1 100644 --- a/src/info_storages/models/file_info.rs +++ b/src/info_storages/models/file_info.rs @@ -15,6 +15,9 @@ pub struct FileInfo { #[serde(with = "ts_seconds")] pub created_at: DateTime, pub deferred_size: bool, + pub is_partial: bool, + pub is_final: bool, + pub parts: Option>, pub storage: String, pub metadata: HashMap, } @@ -55,6 +58,9 @@ impl FileInfo { metadata, deferred_size, offset: 0, + is_final: false, + is_partial: false, + parts: None, created_at: chrono::Utc::now(), } } diff --git a/src/notifiers/models/message_format.rs b/src/notifiers/models/message_format.rs index 6c9fb9c..6d80200 100644 --- a/src/notifiers/models/message_format.rs +++ b/src/notifiers/models/message_format.rs @@ -63,9 +63,9 @@ impl From for TusdFileInfo { offset: file_info.offset, size: file_info.length, size_is_deferred: deferred_size, - is_final: true, - is_partial: false, - partial_uploads: None, + is_final: file_info.is_final, + is_partial: file_info.is_partial, + partial_uploads: file_info.parts, metadata: file_info.metadata, storage: TusdStorageInfo { storage_type: file_info.storage, diff --git a/src/protocol/core/routes.rs b/src/protocol/core/routes.rs index 72fb39e..de51352 100644 --- a/src/protocol/core/routes.rs +++ b/src/protocol/core/routes.rs @@ -56,7 +56,8 @@ pub async fn write_bytes( state: web::Data, ) -> actix_web::Result { // Checking if request has required headers. - if !check_header(&request, "Content-Type", "application/offset+octet-stream") { + let check_content_type = |val: &str| val == "application/offset+octet-stream"; + if !check_header(&request, "Content-Type", check_content_type) { return Ok(HttpResponse::UnsupportedMediaType().body("")); } // Getting current offset. diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 0e8a30a..54d5181 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -49,6 +49,17 @@ fn get_metadata(request: &HttpRequest) -> Option> { }) } +fn get_upload_parts(request: &HttpRequest) -> Vec { + let concat_header = request.headers().get("Upload-Concat").unwrap(); + let header_str = concat_header.to_str().unwrap(); + let urls = header_str.strip_prefix("final;").unwrap(); + + urls.split(' ') + .filter_map(|val: &str| val.split('/').last().map(String::from)) + .filter(|val| val.trim() != "") + .collect() +} + /// Create file. /// /// This method allows you to create file to start uploading. @@ -57,6 +68,7 @@ fn get_metadata(request: &HttpRequest) -> Option> { /// you don't know actual file length and /// you can upload first bytes if creation-with-upload /// extension is enabled. +#[allow(clippy::too_many_lines)] pub async fn create_file( state: web::Data, request: HttpRequest, @@ -65,7 +77,7 @@ pub async fn create_file( // Getting Upload-Length header value as usize. let length = parse_header(&request, "Upload-Length"); // Checking Upload-Defer-Length header. - let defer_size = check_header(&request, "Upload-Defer-Length", "1"); + let defer_size = check_header(&request, "Upload-Defer-Length", |val| val == "1"); // Indicator that creation-defer-length is enabled. let defer_ext = state @@ -73,11 +85,18 @@ pub async fn create_file( .extensions_vec() .contains(&Extensions::CreationDeferLength); + let is_final = check_header(&request, "Upload-Concat", |val| val.starts_with("final;")); + + let concat_ext = state + .config + .extensions_vec() + .contains(&Extensions::Concatenation); + // Check that Upload-Length header is provided. // Otherwise checking that defer-size feature is enabled // and header provided. - if length.is_none() && (defer_ext && !defer_size) { - return Ok(HttpResponse::BadRequest().body("")); + if length.is_none() && !((defer_ext && defer_size) || (concat_ext && is_final)) { + return Ok(HttpResponse::BadRequest().body("Upload-Length header is required")); } let meta = get_metadata(&request); @@ -91,6 +110,18 @@ pub async fn create_file( meta.clone(), ); + let is_partial = check_header(&request, "Upload-Concat", |val| val == "partial"); + + if concat_ext { + if is_final { + file_info.is_final = true; + file_info.parts = Some(get_upload_parts(&request)); + } + if is_partial { + file_info.is_partial = true; + } + } + if state.config.hook_is_active(Hook::PreCreate) { let message = state .config @@ -107,6 +138,32 @@ pub async fn create_file( // Create file and get the it's path. file_info.path = Some(state.data_storage.create_file(&file_info).await?); + if file_info.is_final { + let mut final_size = 0; + let mut parts_info = Vec::new(); + for part_id in file_info.clone().parts.unwrap() { + let part = state.info_storage.get_info(part_id.as_str()).await?; + if part.length != Some(part.offset) { + return Ok( + HttpResponse::BadRequest().body(format!("{} upload is not complete.", part.id)) + ); + } + if !part.is_partial { + return Ok( + HttpResponse::BadRequest().body(format!("{} upload is not partial.", part.id)) + ); + } + final_size += &part.length.unwrap(); + parts_info.push(part.clone()); + } + state + .data_storage + .concat_files(&file_info, parts_info) + .await?; + file_info.offset = final_size; + file_info.length = Some(final_size); + } + // Create upload URL for this file. let upload_url = request.url_for("core:write_bytes", &[file_info.id.clone()])?; @@ -115,8 +172,9 @@ pub async fn create_file( .config .extensions_vec() .contains(&Extensions::CreationWithUpload); - if with_upload && !bytes.is_empty() { - if !check_header(&request, "Content-Type", "application/offset+octet-stream") { + if with_upload && !bytes.is_empty() && !(concat_ext && is_final) { + let octet_stream = |val: &str| val == "application/offset+octet-stream"; + if !check_header(&request, "Content-Type", octet_stream) { return Ok(HttpResponse::BadRequest().body("")); } // Writing first bytes. diff --git a/src/protocol/extensions.rs b/src/protocol/extensions.rs index 5624b83..4eba162 100644 --- a/src/protocol/extensions.rs +++ b/src/protocol/extensions.rs @@ -14,6 +14,8 @@ pub enum Extensions { Creation, #[display(fmt = "termination")] Termination, + #[display(fmt = "concatenation")] + Concatenation, #[display(fmt = "getting")] Getting, } diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index 8d8c989..12d8b15 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -1,7 +1,8 @@ use std::path::PathBuf; use actix_files::NamedFile; -use async_std::fs::{remove_file, DirBuilder, OpenOptions}; +use async_std::fs::{remove_file, DirBuilder, File, OpenOptions}; +use async_std::io::copy; use async_std::prelude::*; use async_trait::async_trait; use log::error; @@ -128,6 +129,33 @@ impl Storage for FileStorage { Ok(file_path.display().to_string()) } + async fn concat_files( + &self, + file_info: &FileInfo, + parts_info: Vec, + ) -> RustusResult<()> { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(false) + .create_new(false) + .open(file_info.path.as_ref().unwrap().clone()) + .await + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + for part in parts_info { + if part.path.is_none() { + return Err(RustusError::FileNotFound); + } + let mut part_file = File::open(part.path.as_ref().unwrap()).await?; + copy(&mut part_file, &mut file).await?; + } + file.sync_data().await?; + Ok(()) + } + async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { // Let's remove the file itself. let data_path = PathBuf::from(file_info.path.as_ref().unwrap().clone()); diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs index e129443..0d790b9 100644 --- a/src/storages/models/storage.rs +++ b/src/storages/models/storage.rs @@ -53,6 +53,21 @@ pub trait Storage: Display { /// `file_info` - info about current file. async fn create_file(&self, file_info: &FileInfo) -> RustusResult; + /// Concatenate files. + /// + /// This method is used to merge multiple files together. + /// + /// This function is used by concat extension of the protocol. + /// + /// # Params + /// `file_info` - info about current file. + /// `parts_info` - info about merged files. + async fn concat_files( + &self, + file_info: &FileInfo, + parts_info: Vec, + ) -> RustusResult<()>; + /// Remove file from storage /// /// This method removes file and all associated diff --git a/src/utils/headers.rs b/src/utils/headers.rs index 4a5b678..f2b0c42 100644 --- a/src/utils/headers.rs +++ b/src/utils/headers.rs @@ -28,15 +28,15 @@ pub fn parse_header(request: &HttpRequest, header_name: &str) -> Opt }) } -/// Check that header's value is equal to some value. +/// Check that header value satisfies some predicate. /// -/// Returns false if header is not present or values don't match. -pub fn check_header(request: &HttpRequest, header_name: &str, value: &str) -> bool { +/// Passes header as a parameter to expr if header is present. +pub fn check_header(request: &HttpRequest, header_name: &str, expr: fn(&str) -> bool) -> bool { request .headers() .get(header_name) .and_then(|header_val| match header_val.to_str() { - Ok(val) => Some(val == value), + Ok(val) => Some(expr(val)), Err(_) => None, }) .unwrap_or(false) From c8e193f19a80a22c35d2e05457cb3fb71e255400 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Sun, 23 Jan 2022 17:18:29 +0400 Subject: [PATCH 7/9] Fixed concatenation ext. Description: * Fixed HEAD request to file; * Added option to remove part files; Signed-off-by: Pavel Kirilin --- src/config.rs | 7 ++++ src/protocol/core/routes.rs | 51 ++++++++++++++++++++++++------ src/protocol/creation/routes.rs | 13 ++++++-- src/protocol/termination/routes.rs | 4 +-- 4 files changed, 60 insertions(+), 15 deletions(-) diff --git a/src/config.rs b/src/config.rs index 2a22ab3..4d5f54d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -183,6 +183,13 @@ pub struct RustusConf { )] pub tus_extensions: Vec, + /// Remove part files after concatenation is done. + /// By default rustus does nothing with part files after concatenation. + /// + /// This parameter is only needed if concatenation extension is enabled. + #[structopt(long, parse(from_flag))] + pub remove_parts: bool, + #[structopt(flatten)] pub storage_opts: StorageOptions, diff --git a/src/protocol/core/routes.rs b/src/protocol/core/routes.rs index de51352..d910f67 100644 --- a/src/protocol/core/routes.rs +++ b/src/protocol/core/routes.rs @@ -16,7 +16,7 @@ pub fn server_info(app_conf: web::Data) -> HttpResponse { .join(","); HttpResponse::Ok() .insert_header(("Tus-Extension", ext_str.as_str())) - .body("") + .finish() } pub async fn get_file_info( @@ -25,17 +25,43 @@ pub async fn get_file_info( ) -> actix_web::Result { // Getting file id from URL. if request.match_info().get("file_id").is_none() { - return Ok(HttpResponse::NotFound().body("")); + return Ok(HttpResponse::NotFound().body("No file id provided.")); } let file_id = request.match_info().get("file_id").unwrap(); // Getting file info from info_storage. let file_info = state.info_storage.get_info(file_id).await?; if file_info.storage != state.data_storage.to_string() { - return Ok(HttpResponse::NotFound().body("")); + return Ok(HttpResponse::NotFound().body("File not found.")); } let mut builder = HttpResponse::Ok(); + if file_info.is_partial { + builder.insert_header(("Upload-Concat", "partial")); + } + if file_info.is_final && file_info.parts.is_some() { + #[allow(clippy::or_fun_call)] + let parts = file_info + .parts + .clone() + .unwrap() + .iter() + .map(|file| { + format!( + "{}/{}", + state + .config + .base_url() + .strip_suffix('/') + .unwrap_or(state.config.base_url().as_str()), + file.as_str() + ) + }) + .collect::>() + .join(" "); + builder.insert_header(("Upload-Concat", format!("final; {}", parts))); + } builder + .no_chunking(file_info.offset as u64) .insert_header(("Upload-Offset", file_info.offset.to_string())) .insert_header(("Content-Length", file_info.offset.to_string())); // Upload length is known. @@ -47,7 +73,7 @@ pub async fn get_file_info( if let Some(meta) = file_info.get_metadata_string() { builder.insert_header(("Upload-Metadata", meta)); } - Ok(builder.body("")) + Ok(builder.finish()) } pub async fn write_bytes( @@ -58,17 +84,17 @@ pub async fn write_bytes( // Checking if request has required headers. let check_content_type = |val: &str| val == "application/offset+octet-stream"; if !check_header(&request, "Content-Type", check_content_type) { - return Ok(HttpResponse::UnsupportedMediaType().body("")); + return Ok(HttpResponse::UnsupportedMediaType().body("Unknown content-type.")); } // Getting current offset. let offset: Option = parse_header(&request, "Upload-Offset"); if offset.is_none() { - return Ok(HttpResponse::UnsupportedMediaType().body("")); + return Ok(HttpResponse::UnsupportedMediaType().body("No offset provided.")); } if request.match_info().get("file_id").is_none() { - return Ok(HttpResponse::NotFound().body("")); + return Ok(HttpResponse::NotFound().body("No file id provided.")); } // New upload length. @@ -87,13 +113,18 @@ pub async fn write_bytes( // Getting file info. let mut file_info = state.info_storage.get_info(file_id).await?; + // According to TUS protocol you can't update final uploads. + if file_info.is_final { + return Ok(HttpResponse::Forbidden().finish()); + } + // Checking if file was stored in the same storage. if file_info.storage != state.data_storage.to_string() { - return Ok(HttpResponse::NotFound().body("")); + return Ok(HttpResponse::NotFound().finish()); } // Checking if offset from request is the same as the real offset. if offset.unwrap() != file_info.offset { - return Ok(HttpResponse::Conflict().body("")); + return Ok(HttpResponse::Conflict().finish()); } // If someone want to update file length. @@ -152,5 +183,5 @@ pub async fn write_bytes( } Ok(HttpResponse::NoContent() .insert_header(("Upload-Offset", file_info.offset.to_string())) - .body("")) + .finish()) } diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 54d5181..21c710d 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -116,6 +116,7 @@ pub async fn create_file( if is_final { file_info.is_final = true; file_info.parts = Some(get_upload_parts(&request)); + file_info.deferred_size = false; } if is_partial { file_info.is_partial = true; @@ -158,10 +159,16 @@ pub async fn create_file( } state .data_storage - .concat_files(&file_info, parts_info) + .concat_files(&file_info, parts_info.clone()) .await?; file_info.offset = final_size; file_info.length = Some(final_size); + if state.config.remove_parts { + for part in parts_info { + state.data_storage.remove_file(&part).await?; + state.info_storage.remove_info(part.id.as_str()).await?; + } + } } // Create upload URL for this file. @@ -175,7 +182,7 @@ pub async fn create_file( if with_upload && !bytes.is_empty() && !(concat_ext && is_final) { let octet_stream = |val: &str| val == "application/offset+octet-stream"; if !check_header(&request, "Content-Type", octet_stream) { - return Ok(HttpResponse::BadRequest().body("")); + return Ok(HttpResponse::BadRequest().finish()); } // Writing first bytes. state @@ -207,5 +214,5 @@ pub async fn create_file( Ok(HttpResponse::Created() .insert_header(("Location", upload_url.as_str())) .insert_header(("Upload-Offset", file_info.offset.to_string())) - .body("")) + .finish()) } diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs index dd302e8..8e70f9d 100644 --- a/src/protocol/termination/routes.rs +++ b/src/protocol/termination/routes.rs @@ -16,7 +16,7 @@ pub async fn terminate( if let Some(file_id) = file_id_opt { let file_info = state.info_storage.get_info(file_id.as_str()).await?; if file_info.storage != state.data_storage.to_string() { - return Ok(HttpResponse::NotFound().body("")); + return Ok(HttpResponse::NotFound().finish()); } state.info_storage.remove_info(file_id.as_str()).await?; state.data_storage.remove_file(&file_info).await?; @@ -35,5 +35,5 @@ pub async fn terminate( }); } } - Ok(HttpResponse::NoContent().body("")) + Ok(HttpResponse::NoContent().finish()) } From b71ebc92727ad1ab8d27ffdcc13342eb9ed69268 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Mon, 24 Jan 2022 02:51:48 +0400 Subject: [PATCH 8/9] Updated ENVs for server. Signed-off-by: Pavel Kirilin --- src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index 4d5f54d..2adf14d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -144,11 +144,11 @@ pub struct NotificationsOptions { /// [here](https://tus.io/). pub struct RustusConf { /// Rustus server host - #[structopt(short, long, default_value = "0.0.0.0", env = "RUSTUS_HOST")] + #[structopt(short, long, default_value = "0.0.0.0", env = "RUSTUS_SERVER_HOST")] pub host: String, /// Rustus server port - #[structopt(short, long, default_value = "1081", env = "RUSTUS_PORT")] + #[structopt(short, long, default_value = "1081", env = "RUSTUS_SERVER_PORT")] pub port: u16, /// Rustus base API url From 1f5cf77da4c469f7e55271bac3b3534283daa7c2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 24 Jan 2022 13:20:35 +0000 Subject: [PATCH 9/9] Bump actix-web from 4.0.0-beta.19 to 4.0.0-beta.20 Bumps [actix-web](https://github.com/actix/actix-web) from 4.0.0-beta.19 to 4.0.0-beta.20. - [Release notes](https://github.com/actix/actix-web/releases) - [Changelog](https://github.com/actix/actix-web/blob/master/CHANGES.md) - [Commits](https://github.com/actix/actix-web/compare/web-v4.0.0-beta.19...web-v4.0.0-beta.20) --- updated-dependencies: - dependency-name: actix-web dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- Cargo.lock | 16 ++++++++-------- Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1163a4a..b1636ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,9 +91,9 @@ dependencies = [ [[package]] name = "actix-router" -version = "0.5.0-beta.4" +version = "0.5.0-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b53c1deabdbf3a8a8b9a949123edd3cafb873abd75da96b5933a8b590f9d6dc2" +checksum = "5e0b59ad08167ffbb686ddb495846707231e96908b829b1fc218198ec581e2ad" dependencies = [ "bytestring", "firestorm", @@ -105,9 +105,9 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.5.1" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82cf33e04d9911b39bfb7be3c01309568b4315895d3358372dce64ed2c2bf32d" +checksum = "cdf3f2183be1241ed4dd22611850b85d38de0b08a09f1f7bcccbd0809084b359" dependencies = [ "actix-macros", "futures-core", @@ -116,9 +116,9 @@ dependencies = [ [[package]] name = "actix-server" -version = "2.0.0-rc.3" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9259b4f3cc9ca96d7d91a7da66b7b01c47653a0da5b0ba3f7f45a344480443b" +checksum = "d9e7472ac180abb0a8e592b653744345983a7a14f44691c8394a799d0df4dbbf" dependencies = [ "actix-rt", "actix-service", @@ -155,9 +155,9 @@ dependencies = [ [[package]] name = "actix-web" -version = "4.0.0-beta.19" +version = "4.0.0-beta.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "229d0d2c11d6c734327cf1fbc15dd644b351b440bcb4608349258f5e00605bdc" +checksum = "aa8ba5081e9f8d0016cf34df516c699198158fd8c77990aa284115b055ead61b" dependencies = [ "actix-codec", "actix-http", diff --git a/Cargo.toml b/Cargo.toml index e94783a..fcbb70d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ features = ["vendored"] version = "0.6.0-beta.13" [dependencies.actix-web] -version = "^4.0.0-beta.15" +version = "^4.0.0-beta.20" [dependencies.async-std] features = ["tokio1"]