diff --git a/Cargo.lock b/Cargo.lock index 43afb17..a525893 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,7 +68,7 @@ dependencies = [ "actix-service", "actix-utils", "ahash", - "base64", + "base64 0.13.0", "bitflags", "brotli", "bytes", @@ -375,6 +375,22 @@ dependencies = [ "num-traits", ] +[[package]] +name = "attohttpc" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "262c3f7f5d61249d8c00e5546e2685cd15ebeeb1bc0f3cc5449350a1cb07319e" +dependencies = [ + "http", + "log", + "native-tls", + "openssl", + "serde", + "serde_json", + "url", + "wildmatch", +] + [[package]] name = "atty" version = "0.2.14" @@ -392,6 +408,31 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-creds" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeeee1a5defa63cba39097a510dfe63ef53658fc8995202a610f6a8a4d03639" +dependencies = [ + "attohttpc", + "dirs", + "rust-ini", + "serde", + "serde-xml-rs", + "thiserror", + "time 0.3.15", + "url", +] + +[[package]] +name = "aws-region" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f92a8af5850d0ea0916ca3e015ab86951ded0bf4b70fd27896e81ae1dfb0af37" +dependencies = [ + "thiserror", +] + [[package]] name = "base-x" version = "0.2.11" @@ -404,6 +445,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" + [[package]] name = "base64ct" version = "1.5.3" @@ -492,9 +539,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" [[package]] name = "bytestring" @@ -824,6 +871,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "dlv-list" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" + [[package]] name = "doc-comment" version = "0.3.3" @@ -1103,6 +1156,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] [[package]] name = "hashlink" @@ -1491,6 +1547,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "maybe-async" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6007f9dad048e0a224f27ca599d669fca8cfa0dac804725aab542b2eb032bce6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "md-5" version = "0.10.5" @@ -1500,6 +1567,12 @@ dependencies = [ "digest", ] +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.5.0" @@ -1531,6 +1604,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minidom" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dddfe21863f8d600ed2bd1096cb9b5cd6ff984be6185cf9d563fb4a107bffc5" +dependencies = [ + "rxml", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1791,6 +1873,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-multimap" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a" +dependencies = [ + "dlv-list", + "hashbrown 0.12.3", +] + [[package]] name = "parking_lot" version = "0.11.2" @@ -1851,7 +1943,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb" dependencies = [ - "base64", + "base64 0.13.0", "once_cell", "regex", ] @@ -2147,7 +2239,7 @@ version = "3.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6324adca27a8b4f1fb5e98b8cfd08d91cf3c5007d1c1c24651234004e585123" dependencies = [ - "base64", + "base64 0.13.0", "bigdecimal", "bit-vec", "chrono", @@ -2193,7 +2285,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "187141a7d66aea864ac042af881cd0618e4ae11fe86232600c0b172e35741b59" dependencies = [ "async-trait", - "base64", + "base64 0.13.0", "html_parser", "proc-macro2", "quote", @@ -2208,7 +2300,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65bf0ecc799682a85ab76226c1b63fd2ef9dd0a0f5cd21d877b0af44dc8cc2bf" dependencies = [ "ahash", - "base64", + "base64 0.13.0", "chrono", "hex", "indexmap", @@ -2307,7 +2399,7 @@ version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" dependencies = [ - "base64", + "base64 0.13.0", "bytes", "encoding_rs", "futures-core", @@ -2330,6 +2422,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-native-tls", + "tokio-util 0.7.4", "tower-service", "url", "wasm-bindgen", @@ -2373,6 +2466,47 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rust-ini" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + +[[package]] +name = "rust-s3" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6009d9d4cf910505534d62d380a0aa305805a2af0b5c3ad59a3024a0715b847" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64 0.13.0", + "cfg-if", + "hex", + "hmac", + "http", + "log", + "maybe-async", + "md5", + "minidom", + "percent-encoding", + "reqwest", + "serde", + "serde-xml-rs", + "serde_derive", + "sha2", + "thiserror", + "time 0.3.15", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rust_decimal" version = "1.26.1" @@ -2408,7 +2542,7 @@ version = "0.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" dependencies = [ - "base64", + "base64 0.13.0", "log", "ring", "sct", @@ -2417,7 +2551,7 @@ dependencies = [ [[package]] name = "rustus" -version = "0.5.14" +version = "0.6.0" dependencies = [ "actix-cors", "actix-files", @@ -2426,7 +2560,7 @@ dependencies = [ "actix-web", "actix-web-prom", "async-trait", - "base64", + "base64 0.20.0", "bytes", "chrono", "derive_more", @@ -2446,6 +2580,7 @@ dependencies = [ "rbatis", "rbson", "reqwest", + "rust-s3", "serde", "serde_json", "sha1 0.10.5", @@ -2467,6 +2602,25 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" +[[package]] +name = "rxml" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a071866b8c681dc2cfffa77184adc32b57b0caad4e620b6292609703bceb804" +dependencies = [ + "bytes", + "pin-project-lite", + "rxml_validation", + "smartstring", + "tokio", +] + +[[package]] +name = "rxml_validation" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53bc79743f9a66c2fb1f951cd83735f275d46bfe466259fbc5897bb60a0d00ee" + [[package]] name = "ryu" version = "1.0.11" @@ -2558,6 +2712,18 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-xml-rs" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65162e9059be2f6a3421ebbb4fef3e74b7d9e7c60c50a0e292c6239f19f1edfa" +dependencies = [ + "log", + "serde", + "thiserror", + "xml-rs", +] + [[package]] name = "serde_bytes" version = "0.11.7" @@ -2674,6 +2840,15 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +[[package]] +name = "smartstring" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e714dff2b33f2321fdcd475b71cec79781a692d846f37f415fb395a1d2bcd48e" +dependencies = [ + "static_assertions", +] + [[package]] name = "socket2" version = "0.4.7" @@ -2728,7 +2903,7 @@ checksum = "e48c61941ccf5ddcada342cd59e3e5173b007c509e1e8e990dafc830294d9dc5" dependencies = [ "ahash", "atoi", - "base64", + "base64 0.13.0", "bigdecimal", "bit-vec", "bitflags", @@ -2806,6 +2981,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stdweb" version = "0.4.20" @@ -3044,6 +3225,7 @@ dependencies = [ "itoa 1.0.4", "libc", "num_threads", + "serde", "time-macros 0.2.4", ] @@ -3614,6 +3796,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "xml-rs" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" + [[package]] name = "zeroize" version = "1.5.7" diff --git a/Cargo.toml b/Cargo.toml index c506a3f..9816f0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,9 @@ [package] name = "rustus" -version = "0.5.14" +version = "0.6.0" edition = "2021" description = "TUS protocol implementation written in Rust." -keywords = [ - "tus", - "server", - "actix-web", -] +keywords = ["tus", "server", "actix-web"] license-file = "LICENSE" authors = [ "Pavel Kirilin ", @@ -21,9 +17,9 @@ readme = "README.md" name = "rustus" [dependencies] -bytes = "~1.2.1" +bytes = "~1.3.0" async-trait = "^0.1.52" -base64 = "^0.13.0" +base64 = "0.20.0" log = "^0.4.14" serde_json = "^1" thiserror = "^1.0" @@ -33,25 +29,18 @@ actix-web-prom = "^0.6.0" dyn-clone = "^1.0.5" actix-cors = "0.6.1" wildmatch = "2.1.0" +md-5 = "^0.10.1" +digest = "0.10.3" mimalloc = { version = "~0.1.30", default-features = false } -[dependencies.digest] -version = "0.10.3" -optional = true [dependencies.sha1] version = "^0.10.1" features = ["compress"] -optional = true [dependencies.sha2] version = "^0.10.1" features = ["compress"] -optional = true - -[dependencies.md-5] -version = "^0.10.1" -optional = true [dependencies.futures] version = "^0.3.21" @@ -107,7 +96,6 @@ version = "^2.0" [dependencies.reqwest] features = ["json"] -optional = true version = "^0.11.8" [dependencies.structopt] @@ -118,7 +106,17 @@ features = ["derive"] version = "0.24.0" [dependencies.tokio] -features = ["time", "process", "fs", "io-std", "io-util", "rt-multi-thread", "bytes", "rt", "macros"] +features = [ + "time", + "process", + "fs", + "io-std", + "io-util", + "rt-multi-thread", + "bytes", + "rt", + "macros", +] version = "^1.4.0" [dependencies.tokio-amqp] @@ -129,14 +127,19 @@ version = "^1.0.0" features = ["v4"] version = "^1.0.0-alpha.1" +[dependencies.rust-s3] +version = "~0.32.3" + [features] -all = ["redis_info_storage", "db_info_storage", "http_notifier", "amqp_notifier", "hashers"] +all = [ + "redis_info_storage", + "db_info_storage", + "amqp_notifier", +] amqp_notifier = ["lapin", "tokio-amqp", "mobc-lapin"] db_info_storage = ["rbatis", "rbson"] default = [] -http_notifier = ["reqwest"] redis_info_storage = ["mobc-redis"] -hashers = ["md-5", "sha1", "sha2", "digest"] ### For testing test_redis = [] diff --git a/README.md b/README.md index 4cf9342..9de42d5 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,8 @@ You can install rustus by 4 different ways. ### From source -To build it from source rust must be installed. -Preferred version is 1.59.0. +To build it from source rust must be installed. We don't rely on nightly features, +you can use last stable release. ```bash git clone https://github.com/s3rius/rustus.git @@ -41,9 +41,7 @@ Available features: * `amqp_notifier` - adds amqp protocol support for notifying about upload status; * `db_info_storage` - adds support for storing information about upload in different databases (Postgres, MySQL, SQLite); -* `http_notifier` - adds support for notifying about upload status via http protocol; * `redis_info_storage` - adds support for storing information about upload in redis database; -* `hashers` - adds support for checksum verification; * `all` - enables all rustus features. All precompiled binaries have all features enabled. diff --git a/deploy/Dockerfile b/deploy/Dockerfile index f0b3314..545994f 100644 --- a/deploy/Dockerfile +++ b/deploy/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.64.0-bullseye AS builder +FROM rust:1.66.0-bullseye AS builder WORKDIR /app COPY Cargo.toml Cargo.lock ./ diff --git a/docs/configuration.md b/docs/configuration.md index 3ad560c..6bc769c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -27,6 +27,8 @@ You can define which hosts are allowed for your particular application. For example if you add `--cors "*.staging.domain,*.prod.domain"`, it allows all origins like `my.staging.domain` or `my.prod.domain`, but it will refuse to serve other origins. +Also you can disable access log for `/health` endpoint, by using `--disable-health-access-log`. + === "CLI" ``` bash @@ -35,8 +37,9 @@ like `my.staging.domain` or `my.prod.domain`, but it will refuse to serve other --workers 8 \ --max-body-size 1000000 \ --url "/files" \ - --log-level "INFO" - --cors "my.*.domain.com,your.*.domain.com" + --log-level "INFO" \ + --cors "my.*.domain.com,your.*.domain.com" \ + --disable-health-access-log "yes" ``` === "ENV" @@ -49,22 +52,29 @@ like `my.staging.domain` or `my.prod.domain`, but it will refuse to serve other export RUSTUS_URL="/files" export RUSTUS_LOG_LEVEL="INFO" export RUSTUS_CORS="my.*.domain.com,your.*.domain.com" + export RUSTUS_DISABLE_HEALTH_ACCESS_LOG="yes" rustus ``` -## Configuring data storage -!!!info +## Configuring storage + +Storages are used to actually store your files. You can configure where you want +to store files. By default in uses `file-storage` and stores every upload on +your local file system. + +Availabe storages: + +* `file-storage` +* `hybrid-s3` - Currently only file storage is available, - so if you pass to `--storage` parameter other than `file-storage` you will get an error. +### File storage -Available parameters: +File storage parameters: -* `--storage` - type of data storage to be used; * `--data-dir` - path to the directory where all files are stored; * `--dir-structure` - pattern of a directory structure inside data dir; * `--force-fsync` - calls fsync system call after every write to disk. @@ -112,6 +122,78 @@ data rustus ``` +### Hybrid-S3 storage + +This storage stores files locally and uploads resulting file on S3 when the upload is finished. +It has no restriction on chunk size and you can make chunks less than 5MB. + +!!! Danger + When choosing this storage you still need to have a + connected shared directory between instances. + + This storage is not intended to be used for large files, + since it uploads files to S3 during the last request. + +Hybrid-S3 uses file-storage inside, so all parameters from file storage +also applied to it. + +Parameters: + +* `--dir-structure` - pattern of a directory structure locally and on s3; +* `--data-dir` - path to the local directory where all files are stored; +* `--force-fsync` - calls fsync system call after every write to disk in local storage; +* `--s3-url` - s3 endpoint URL; +* `--s3-bucket` - name of a bucket to use; +* `--s3-region` - AWS region to use; +* `--s3-access-key` - S3 access key; +* `--s3-secret-key` - S3 secret key; +* `--s3-security-token` - s3 secrity token; +* `--s3-session-token` - S3 session token; +* `--s3-profile` - Name of the section from `~/.aws/credentials` file; +* `--s3-headers` - JSON object with additional header to every S3 request (Useful for setting ACLs); +* `--s3-force-path-style` - use path style URL. It appends bucket name at the end of the URL; + +Required parameter are only `--s3-url` and `--s3-bucket`. + +=== "CLI" + + ``` bash + rustus --storage "hybrid-s3" \ + --s3-url "https://localhost:9000" \ + --s3-bucket "bucket" \ + --s3-region "eu-central1" \ + --s3-access-key "fJljHcXo07rqIOzh" \ + --s3-secret-key "6BJfBUL18nLiGmF5zKW0NKrdxQVxNYWB" \ + --s3-profile "my_profile" \ + --s3-security-token "token" \ + --s3-session-token "token" \ + --s3-force-path-style "yes" \ + --s3-headers '{"x-amz-acl": "public-read"}' \ + --force-fsync "yes" \ + --data-dir "./data/" \ + --dir-structure "{year}/{month}/{day}" + ``` + +=== "ENV" + + ``` bash + export RUSTUS_STORAGE="hybrid-s3" + export RUSTUS_S3_URL="https://localhost:9000" + export RUSTUS_S3_BUCKET="bucket" + export RUSTUS_S3_REGION="eu-central1" + export RUSTUS_S3_ACCESS_KEY="fJljHcXo07rqIOzh" + export RUSTUS_S3_SECRET_KEY="6BJfBUL18nLiGmF5zKW0NKrdxQVxNYWB" + export RUSTUS_S3_SECURITY_TOKEN="token" + export RUSTUS_S3_SESSION_TOKEN="token" + export RUSTUS_S3_PROFILE="my_profile" + export RUSTUS_S3_HEADERS='{"x-amz-acl": "public-read"}' + export RUSTUS_DATA_DIR="./data/" + export RUSTUS_DIR_STRUCTURE="{year}/{month}/{day}" + export RUSTUS_FORCE_FSYNC="yes" + + rustus + ``` + ## Configuring info storage Info storages are used to store information diff --git a/docs/index.md b/docs/index.md index e0adab0..4c2c2fa 100644 --- a/docs/index.md +++ b/docs/index.md @@ -24,13 +24,13 @@ You can install rustus in four different ways. ### From source -To build it from source rust must be installed. -Preferred version is 1.59.0. +To build it from source rust must be installed. We don't rely on nightly features, +you can use last stable release. ```bash git clone https://github.com/s3rius/rustus.git cd rustus -cargo install --path . --features=all,metrics +cargo install --path . --features=all ``` Also, you can speedup build by disabling some features. @@ -39,9 +39,7 @@ Available features: * `amqp_notifier` - adds `AMQP` protocol support for notifying about upload status; * `db_info_storage` - adds support for storing information about upload in different databases (`Postgres`, `MySQL`, `SQLite`); -* `http_notifier` - adds support for notifying about upload status via `HTTP` protocol; * `redis_info_storage` - adds support for storing information about upload in `Redis` database; -* `hashers` - adds support for checksum verification; * `all` - enables all rustus features. All precompiled binaries have all features enabled. diff --git a/src/config.rs b/src/config.rs index 9aeebae..95136bf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -43,6 +43,68 @@ pub struct StorageOptions { /// In most cases this parameter is redundant. #[structopt(long, env = "RUSTUS_FORCE_FSYNC")] pub force_fsync: bool, + + /// S3 bucket to upload files to. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, required_if("storage", "hybrid-s3"), env = "RUSTUS_S3_BUCKET")] + pub s3_bucket: Option, + + /// S3 region. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, required_if("storage", "hybrid-s3"), env = "RUSTUS_S3_REGION")] + pub s3_region: Option, + + /// S3 access key. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, env = "RUSTUS_S3_ACCESS_KEY")] + pub s3_access_key: Option, + + /// S3 secret key. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, env = "RUSTUS_S3_SECRET_KEY")] + pub s3_secret_key: Option, + + /// S3 URL. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, required_if("storage", "hybrid-s3"), env = "RUSTUS_S3_URL")] + pub s3_url: Option, + + /// S3 force path style. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, env = "RUSTUS_S3_FORCE_PATH_STYLE")] + pub s3_force_path_style: bool, + + /// S3 security token. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, env = "RUSTUS_S3_SECURITY_TOKEN")] + pub s3_security_token: Option, + + /// S3 session token. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, env = "RUSTUS_S3_SESSION_TOKEN")] + pub s3_session_token: Option, + + /// S3 profile. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, env = "RUSTUS_S3_PROFILE")] + pub s3_profile: Option, + + /// Additional S3 headers. + /// These headers are passed to every request to s3. + /// Useful for configuring ACLs. + /// + /// This parameter is required fo s3-based storages. + #[structopt(long, env = "RUSTUS_S3_HEADERS")] + pub s3_headers: Option, } #[derive(StructOpt, Debug, Clone)] @@ -112,12 +174,10 @@ pub struct NotificationsOptions { pub behind_proxy: bool, /// List of URLS to send webhooks to. - #[cfg(feature = "http_notifier")] #[structopt(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_delimiter = true)] pub hooks_http_urls: Vec, // List of headers to forward from client. - #[cfg(feature = "http_notifier")] #[structopt(long, env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS", use_delimiter = true)] pub hooks_http_proxy_headers: Vec, @@ -205,6 +265,9 @@ pub struct RustusConf { #[structopt(short, long, default_value = "1081", env = "RUSTUS_SERVER_PORT")] pub port: u16, + #[structopt(long, env = "RUSTUS_DISABLE_HEALTH_ACCESS_LOG")] + pub disable_health_access_log: bool, + /// Rustus base API url #[structopt(long, default_value = "/files", env = "RUSTUS_URL")] pub url: String, diff --git a/src/errors.rs b/src/errors.rs index 8719a87..ae94bdf 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -7,6 +7,8 @@ pub type RustusResult = Result; #[derive(thiserror::Error, Debug)] pub enum RustusError { + #[error("{0}")] + Unimplemented(String), #[error("Not found")] FileNotFound, #[error("File already exists")] @@ -42,7 +44,6 @@ pub enum RustusError { UnableToPrepareStorage(String), #[error("Unknown extension: {0}")] UnknownExtension(String), - #[cfg(feature = "http_notifier")] #[error("Http request failed: {0}")] HttpRequestError(#[from] reqwest::Error), #[error("Hook invocation failed. Reason: {0}")] @@ -71,6 +72,8 @@ pub enum RustusError { BlockingError(#[from] actix_web::error::BlockingError), #[error("HTTP hook error. Returned status: {0}")] HTTPHookError(u16, String, Option), + #[error("Found S3 error: {0}")] + S3Error(#[from] s3::error::S3Error), } /// This conversion allows us to use `RustusError` in the `main` function. @@ -99,7 +102,7 @@ impl ResponseError for RustusError { } _ => HttpResponseBuilder::new(self.status_code()) .insert_header(("Content-Type", "text/html; charset=utf-8")) - .body(format!("{}", self)), + .body(format!("{self}")), } } diff --git a/src/info_storages/file_info_storage.rs b/src/info_storages/file_info_storage.rs index f899c1d..15f58d2 100644 --- a/src/info_storages/file_info_storage.rs +++ b/src/info_storages/file_info_storage.rs @@ -27,7 +27,7 @@ impl FileInfoStorage { } pub fn info_file_path(&self, file_id: &str) -> PathBuf { - self.info_dir.join(format!("{}.info", file_id)) + self.info_dir.join(format!("{file_id}.info")) } } diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs index e1dbc45..6d55f1d 100644 --- a/src/info_storages/models/file_info.rs +++ b/src/info_storages/models/file_info.rs @@ -78,7 +78,7 @@ impl FileInfo { for (key, val) in &self.metadata { let encoded_value = base64::encode(val); // Adding metadata entry to the list. - result.push(format!("{} {}", key, encoded_value)); + result.push(format!("{key} {encoded_value}")); } if result.is_empty() { diff --git a/src/main.rs b/src/main.rs index 7c6cfaf..b3a6fd3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -60,11 +60,11 @@ fn greeting(app_conf: &RustusConf) { .collect::>() .join(", "); let rustus_logo = include_str!("../imgs/rustus_startup_logo.txt"); - eprintln!("\n\n{}", rustus_logo); + eprintln!("\n\n{rustus_logo}"); eprintln!("Welcome to rustus!"); eprintln!("Base URL: /{}", app_conf.base_url()); - eprintln!("Available extensions: {}", extensions); - eprintln!("Enabled hooks: {}", hooks); + eprintln!("Available extensions: {extensions}"); + eprintln!("Enabled hooks: {hooks}"); eprintln!(); eprintln!(); } @@ -147,16 +147,14 @@ fn create_cors(origins: Vec, additional_headers: Vec) -> Cors { pub fn create_server(state: State) -> RustusResult { let host = state.config.host.clone(); let port = state.config.port; + let disable_health_log = state.config.disable_health_access_log; let cors_hosts = state.config.cors.clone(); let workers = state.config.workers; - #[cfg(feature = "http_notifier")] let proxy_headers = state .config .notification_opts .hooks_http_proxy_headers .clone(); - #[cfg(not(feature = "http_notifier"))] - let proxy_headers = vec![]; let metrics = actix_web_prom::PrometheusMetricsBuilder::new("") .endpoint("/metrics") .build() @@ -171,8 +169,8 @@ pub fn create_server(state: State) -> RustusResult { let terminated_uploads = TerminatedUploads::new()?; let found_errors = prometheus::IntCounterVec::new( prometheus::Opts { - namespace: "".into(), - subsystem: "".into(), + namespace: String::new(), + subsystem: String::new(), name: "errors".into(), help: "Found errors".into(), const_labels: HashMap::new(), @@ -197,6 +195,10 @@ pub fn create_server(state: State) -> RustusResult { .register(Box::new(terminated_uploads.counter.clone()))?; metrics.registry.register(Box::new(found_errors.clone()))?; let mut server = HttpServer::new(move || { + let mut logger = middleware::Logger::new("\"%r\" \"-\" \"%s\" \"%a\" \"%D\""); + if disable_health_log { + logger = logger.exclude("/health"); + } let error_metrics = found_errors.clone(); App::new() .app_data(web::Data::new(active_uploads.clone())) @@ -207,7 +209,7 @@ pub fn create_server(state: State) -> RustusResult { .route("/health", web::get().to(routes::health_check)) .configure(rustus_service(state.clone())) .wrap(metrics.clone()) - .wrap(middleware::Logger::new("\"%r\" \"-\" \"%s\" \"%a\" \"%D\"")) + .wrap(logger) .wrap(create_cors(cors_hosts.clone(), proxy_headers.clone())) // Middleware that overrides method of a request if // "X-HTTP-Method-Override" header is provided. @@ -232,9 +234,9 @@ pub fn create_server(state: State) -> RustusResult { if let Some(err) = srv_response.response().error() { let url = match srv_response.request().match_pattern() { Some(pattern) => pattern, - None => "".into(), + None => String::new(), }; - let err_desc = format!("{}", err); + let err_desc = format!("{err}"); error_counter .clone() .with_label_values(&[url.as_str(), err_desc.as_str()]) diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs index d53fbd3..f5f7727 100644 --- a/src/notifiers/amqp_notifier.rs +++ b/src/notifiers/amqp_notifier.rs @@ -68,7 +68,7 @@ impl AMQPNotifier { if let Some(routing_key) = self.routing_key.as_ref() { routing_key.into() } else { - format!("{}.{}", self.queues_prefix.as_str(), hook) + format!("{}.{hook}", self.queues_prefix.as_str()) } } } @@ -124,7 +124,7 @@ impl Notifier for AMQPNotifier { let queue = self.get_queue_name(hook); let routing_key = self.routing_key.as_ref().unwrap_or(&queue); let payload = if self.celery { - format!("[[{}], {{}}, {{}}]", message).as_bytes().to_vec() + format!("[[{message}], {{}}, {{}}]").as_bytes().to_vec() } else { message.as_bytes().to_vec() }; @@ -136,7 +136,7 @@ impl Notifier for AMQPNotifier { ); headers.insert( "task".into(), - AMQPValue::LongString(LongString::from(format!("rustus.{}", hook))), + AMQPValue::LongString(LongString::from(format!("rustus.{hook}"))), ); } chan.basic_publish( diff --git a/src/notifiers/http_notifier.rs b/src/notifiers/http_notifier.rs index 89d59c4..a5c861d 100644 --- a/src/notifiers/http_notifier.rs +++ b/src/notifiers/http_notifier.rs @@ -47,12 +47,12 @@ impl Notifier for HttpNotifier { .client .post(url.as_str()) .header("Idempotency-Key", idempotency_key.as_str()) - .header("Hook-Name", hook.clone().to_string()) + .header("Hook-Name", hook.to_string()) .header("Content-Type", "application/json") .timeout(Duration::from_secs(2)); for item in &self.forward_headers { - if let Some(value) = header_map.get(item.clone()) { - request = request.header(item.clone(), value.as_bytes()); + if let Some(value) = header_map.get(item.as_str()) { + request = request.header(item.as_str(), value.as_bytes()); } } request.body(message.clone()).send() diff --git a/src/notifiers/mod.rs b/src/notifiers/mod.rs index d9957a1..211329f 100644 --- a/src/notifiers/mod.rs +++ b/src/notifiers/mod.rs @@ -2,7 +2,6 @@ pub mod amqp_notifier; pub mod dir_notifier; mod file_notifier; -#[cfg(feature = "http_notifier")] pub mod http_notifier; pub mod models; diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 8687fd2..51c5777 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -1,10 +1,10 @@ #[cfg(feature = "amqp_notifier")] use crate::notifiers::amqp_notifier; -#[cfg(feature = "http_notifier")] -use crate::notifiers::http_notifier; use crate::{ errors::RustusResult, - notifiers::{dir_notifier::DirNotifier, file_notifier::FileNotifier, Hook, Notifier}, + notifiers::{ + dir_notifier::DirNotifier, file_notifier::FileNotifier, http_notifier, Hook, Notifier, + }, RustusConf, }; use actix_web::http::header::HeaderMap; @@ -33,7 +33,6 @@ impl NotificationManager { rustus_config.notification_opts.hooks_dir.clone().unwrap(), ))); } - #[cfg(feature = "http_notifier")] if !rustus_config.notification_opts.hooks_http_urls.is_empty() { debug!("Found http hook urls."); manager diff --git a/src/protocol/core/get_info.rs b/src/protocol/core/get_info.rs index bd1874b..bd735b7 100644 --- a/src/protocol/core/get_info.rs +++ b/src/protocol/core/get_info.rs @@ -36,7 +36,7 @@ pub async fn get_file_info( .map(|file| format!("/{}/{}", state.config.base_url(), file.as_str())) .collect::>() .join(" "); - builder.insert_header(("Upload-Concat", format!("final; {}", parts))); + builder.insert_header(("Upload-Concat", format!("final; {parts}"))); } builder .no_chunking(file_info.offset as u64) @@ -53,6 +53,7 @@ pub async fn get_file_info( if let Some(meta) = file_info.get_metadata_string() { builder.insert_header(("Upload-Metadata", meta)); } + builder.insert_header(("Upload-Created", file_info.created_at.timestamp())); builder.insert_header(CacheControl(vec![CacheDirective::NoCache])); Ok(builder.streaming(empty::>())) } diff --git a/src/protocol/core/server_info.rs b/src/protocol/core/server_info.rs index dc2561b..677c44e 100644 --- a/src/protocol/core/server_info.rs +++ b/src/protocol/core/server_info.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "hashers")] use crate::protocol::extensions::Extensions; use actix_web::{http::StatusCode, web, HttpResponse, HttpResponseBuilder}; @@ -16,7 +15,6 @@ pub async fn server_info(state: web::Data) -> HttpResponse { .join(","); let mut response_builder = HttpResponseBuilder::new(StatusCode::OK); response_builder.insert_header(("Tus-Extension", ext_str.as_str())); - #[cfg(feature = "hashers")] if state.config.tus_extensions.contains(&Extensions::Checksum) { response_builder.insert_header(("Tus-Checksum-Algorithm", "md5,sha1,sha256,sha512")); } diff --git a/src/protocol/core/write_bytes.rs b/src/protocol/core/write_bytes.rs index 3bf68a5..5ff6444 100644 --- a/src/protocol/core/write_bytes.rs +++ b/src/protocol/core/write_bytes.rs @@ -5,14 +5,15 @@ use actix_web::{ HttpRequest, HttpResponse, }; -#[cfg(feature = "hashers")] -use crate::utils::hashes::verify_chunk_checksum; use crate::{ errors::RustusError, metrics, notifiers::Hook, protocol::extensions::Extensions, - utils::headers::{check_header, parse_header}, + utils::{ + hashes::verify_chunk_checksum, + headers::{check_header, parse_header}, + }, RustusResult, State, }; @@ -39,7 +40,6 @@ pub async fn write_bytes( return Err(RustusError::FileNotFound); } - #[cfg(feature = "hashers")] if state.config.tus_extensions.contains(&Extensions::Checksum) { if let Some(header) = request.headers().get("Upload-Checksum").cloned() { let cloned_bytes = bytes.clone(); diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 1225330..48c6e52 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -236,7 +236,7 @@ pub async fn create_file( } // Create upload URL for this file. - let upload_url = request.url_for("core:write_bytes", &[file_info.id.clone()])?; + let upload_url = request.url_for("core:write_bytes", [file_info.id.clone()])?; Ok(HttpResponse::Created() .insert_header(( diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs index 10577f5..999f412 100644 --- a/src/protocol/getting/routes.rs +++ b/src/protocol/getting/routes.rs @@ -1,19 +1,18 @@ -use actix_files::NamedFile; -use actix_web::{web, HttpRequest}; +use actix_web::{web, HttpRequest, HttpResponse}; use crate::{errors::RustusError, RustusResult, State}; /// Retrieve actual file. /// /// This method allows you to download files directly from storage. -pub async fn get_file(request: HttpRequest, state: web::Data) -> RustusResult { +pub async fn get_file(request: HttpRequest, state: web::Data) -> RustusResult { let file_id_opt = request.match_info().get("file_id").map(String::from); if let Some(file_id) = file_id_opt { let file_info = state.info_storage.get_info(file_id.as_str()).await?; if file_info.storage != state.data_storage.to_string() { return Err(RustusError::FileNotFound); } - state.data_storage.get_contents(&file_info).await + state.data_storage.get_contents(&file_info, &request).await } else { Err(RustusError::FileNotFound) } diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index 8733352..cf96a26 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -1,6 +1,7 @@ use std::{io::Write, path::PathBuf}; use actix_files::NamedFile; +use actix_web::{HttpRequest, HttpResponse}; use async_trait::async_trait; use bytes::Bytes; use log::error; @@ -13,7 +14,7 @@ use crate::{ errors::{RustusError, RustusResult}, info_storages::FileInfo, storages::Storage, - utils::dir_struct::dir_struct, + utils::dir_struct::substr_now, }; use derive_more::Display; @@ -43,7 +44,7 @@ impl FileStorage { error!("{}", err); RustusError::UnableToWrite(err.to_string()) })? - .join(dir_struct(self.dir_struct.as_str())); + .join(substr_now(self.dir_struct.as_str())); DirBuilder::new() .recursive(true) .create(dir.as_path()) @@ -69,16 +70,23 @@ impl Storage for FileStorage { Ok(()) } - async fn get_contents(&self, file_info: &FileInfo) -> RustusResult { + async fn get_contents( + &self, + file_info: &FileInfo, + request: &HttpRequest, + ) -> RustusResult { if file_info.path.is_none() { return Err(RustusError::FileNotFound); } - NamedFile::open_async(file_info.path.clone().unwrap().as_str()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::FileNotFound - }) + Ok( + NamedFile::open_async(file_info.path.clone().unwrap().as_str()) + .await + .map_err(|err| { + error!("{:?}", err); + RustusError::FileNotFound + })? + .into_response(request), + ) } async fn add_bytes(&self, file_info: &FileInfo, mut bytes: Bytes) -> RustusResult<()> { @@ -198,6 +206,7 @@ impl Storage for FileStorage { mod tests { use super::FileStorage; use crate::{info_storages::FileInfo, Storage}; + use actix_web::test::TestRequest; use bytes::Bytes; use std::{ fs::File, @@ -209,7 +218,7 @@ mod tests { async fn preparation() { let dir = tempdir::TempDir::new("file_storage").unwrap(); let target_path = dir.into_path().join("not_exist"); - let mut storage = FileStorage::new(target_path.clone(), "".into(), false); + let mut storage = FileStorage::new(target_path.clone(), String::new(), false); assert_eq!(target_path.exists(), false); storage.prepare().await.unwrap(); assert_eq!(target_path.exists(), true); @@ -218,7 +227,7 @@ mod tests { #[actix_rt::test] async fn create_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); + let storage = FileStorage::new(dir.into_path().clone(), String::new(), false); let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); let new_path = storage.create_file(&file_info).await.unwrap(); assert!(PathBuf::from(new_path).exists()); @@ -228,7 +237,7 @@ mod tests { async fn create_file_but_it_exists() { let dir = tempdir::TempDir::new("file_storage").unwrap(); let base_path = dir.into_path().clone(); - let storage = FileStorage::new(base_path.clone(), "".into(), false); + let storage = FileStorage::new(base_path.clone(), String::new(), false); let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); File::create(base_path.join("test_id")).unwrap(); let result = storage.create_file(&file_info).await; @@ -238,7 +247,7 @@ mod tests { #[actix_rt::test] async fn adding_bytes() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); + let storage = FileStorage::new(dir.into_path().clone(), String::new(), false); let mut file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); let new_path = storage.create_file(&file_info).await.unwrap(); let test_data = "MyTestData"; @@ -256,7 +265,7 @@ mod tests { #[actix_rt::test] async fn adding_bytes_to_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); + let storage = FileStorage::new(dir.into_path().clone(), String::new(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -272,7 +281,7 @@ mod tests { #[actix_rt::test] async fn get_contents_of_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); + let storage = FileStorage::new(dir.into_path().clone(), String::new(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -280,14 +289,15 @@ mod tests { storage.to_string(), None, ); - let file_info = storage.get_contents(&file_info).await; + let request = TestRequest::get().to_http_request(); + let file_info = storage.get_contents(&file_info, &request).await; assert!(file_info.is_err()); } #[actix_rt::test] async fn remove_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); + let storage = FileStorage::new(dir.into_path().clone(), String::new(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -302,7 +312,7 @@ mod tests { #[actix_rt::test] async fn success_concatenation() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); + let storage = FileStorage::new(dir.into_path().clone(), String::new(), false); let mut parts = Vec::new(); let part1_path = storage.data_dir.as_path().join("part1"); diff --git a/src/storages/mod.rs b/src/storages/mod.rs index eba4b63..e0421eb 100644 --- a/src/storages/mod.rs +++ b/src/storages/mod.rs @@ -1,4 +1,5 @@ pub mod file_storage; mod models; +pub mod s3_hybrid_storage; pub use models::{available_stores::AvailableStores, storage::Storage}; diff --git a/src/storages/models/available_stores.rs b/src/storages/models/available_stores.rs index 00ba972..508cc3c 100644 --- a/src/storages/models/available_stores.rs +++ b/src/storages/models/available_stores.rs @@ -1,4 +1,8 @@ -use crate::{from_str, storages::file_storage, RustusConf, Storage}; +use crate::{ + from_str, + storages::{file_storage, s3_hybrid_storage}, + RustusConf, Storage, +}; use derive_more::{Display, From}; use strum::EnumIter; @@ -7,6 +11,8 @@ use strum::EnumIter; pub enum AvailableStores { #[display(fmt = "file-storage")] FileStorage, + #[display(fmt = "hybrid-s3")] + HybridS3, } from_str!(AvailableStores, "storage"); @@ -27,6 +33,24 @@ impl AvailableStores { config.storage_opts.dir_structure.clone(), config.storage_opts.force_fsync, )), + Self::HybridS3 => { + log::warn!("Hybrid S3 is an unstable feature. If you ecounter a problem, please raise an issue: https://github.com/s3rius/rustus/issues."); + Box::new(s3_hybrid_storage::S3HybridStorage::new( + config.storage_opts.s3_url.clone().unwrap(), + config.storage_opts.s3_region.clone().unwrap(), + &config.storage_opts.s3_access_key, + &config.storage_opts.s3_secret_key, + &config.storage_opts.s3_security_token, + &config.storage_opts.s3_session_token, + &config.storage_opts.s3_profile, + &config.storage_opts.s3_headers, + config.storage_opts.s3_bucket.clone().unwrap().as_str(), + config.storage_opts.s3_force_path_style, + config.storage_opts.data_dir.clone(), + config.storage_opts.dir_structure.clone(), + config.storage_opts.force_fsync, + )) + } } } } diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs index 7802e98..56d0563 100644 --- a/src/storages/models/storage.rs +++ b/src/storages/models/storage.rs @@ -1,5 +1,5 @@ use crate::{errors::RustusResult, info_storages::FileInfo}; -use actix_files::NamedFile; +use actix_web::{HttpRequest, HttpResponse}; use async_trait::async_trait; use bytes::Bytes; use dyn_clone::DynClone; @@ -20,13 +20,17 @@ pub trait Storage: Display + DynClone { /// Get contents of a file. /// - /// This method must return NamedFile since it - /// is compatible with ActixWeb files interface. - /// FIXME: change return type to stream. + /// This method must return HttpResponse. + /// This resposne would be sent directly. /// /// # Params /// `file_info` - info about current file. - async fn get_contents(&self, file_info: &FileInfo) -> RustusResult; + /// `request` - this parameter is needed to construct responses in some case + async fn get_contents( + &self, + file_info: &FileInfo, + request: &HttpRequest, + ) -> RustusResult; /// Add bytes to the file. /// diff --git a/src/storages/s3_hybrid_storage.rs b/src/storages/s3_hybrid_storage.rs new file mode 100644 index 0000000..17a2cf1 --- /dev/null +++ b/src/storages/s3_hybrid_storage.rs @@ -0,0 +1,177 @@ +use std::{collections::HashMap, path::PathBuf}; + +use crate::{ + errors::{RustusError, RustusResult}, + info_storages::FileInfo, +}; + +use super::Storage; +use crate::{storages::file_storage::FileStorage, utils::dir_struct::substr_time}; +use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder}; +use async_trait::async_trait; +use bytes::Bytes; +use derive_more::Display; +use s3::{command::Command, request::Reqwest, request_trait::Request, Bucket}; + +/// This storage is useful for small files when you have chunks less than 5MB. +/// This restriction is based on the S3 API limitations. +/// +/// It handles uploads localy, and after the upload is +/// complete, it uploads file to S3. +/// +/// It's not intended to use this storage for large files. +#[derive(Display, Clone)] +#[display(fmt = "s3_storage")] +pub struct S3HybridStorage { + bucket: Bucket, + local_storage: FileStorage, + dir_struct: String, +} + +impl S3HybridStorage { + #[allow(clippy::too_many_arguments)] + pub fn new( + endpoint: String, + region: String, + access_key: &Option, + secret_key: &Option, + security_token: &Option, + session_token: &Option, + profile: &Option, + custom_headers: &Option, + bucket_name: &str, + force_path_style: bool, + data_dir: PathBuf, + dir_struct: String, + force_fsync: bool, + ) -> Self { + let local_storage = FileStorage::new(data_dir, dir_struct.clone(), force_fsync); + let creds = s3::creds::Credentials::new( + access_key.as_deref(), + secret_key.as_deref(), + security_token.as_deref(), + session_token.as_deref(), + profile.as_deref(), + ); + if let Err(err) = creds { + panic!("Cannot build credentials: {err}") + } + log::debug!("Parsed credentials"); + let credentials = creds.unwrap(); + let bucket = Bucket::new( + bucket_name, + s3::Region::Custom { region, endpoint }, + credentials, + ); + if let Err(error) = bucket { + panic!("Cannot create bucket instance {error}"); + } + let mut bucket = bucket.unwrap(); + if let Some(raw_s3_headers) = custom_headers { + let headers_map = serde_json::from_str::>(raw_s3_headers) + .expect("Cannot parse s3 headers. Please provide valid JSON object."); + log::debug!("Found extra s3 headers."); + for (key, value) in &headers_map { + log::debug!("Adding header `{key}` with value `{value}`."); + bucket.add_header(key, value); + } + } + + if force_path_style { + bucket = bucket.with_path_style(); + } + + Self { + bucket, + local_storage, + dir_struct, + } + } + + /// Upload file to S3. + /// + /// This function is called to upload file to s3 completely. + /// It streams file directly from disk to s3. + async fn upload_file(&self, file_info: &FileInfo) -> RustusResult<()> { + if file_info.path.is_none() { + return Err(RustusError::UnableToWrite("Cannot get upload path.".into())); + } + let s3_path = self.get_s3_key(file_info); + log::debug!( + "Starting uploading {} to S3 with key `{}`", + file_info.id, + s3_path, + ); + let file = tokio::fs::File::open(file_info.path.clone().unwrap()).await?; + let mut reader = tokio::io::BufReader::new(file); + self.bucket.put_object_stream(&mut reader, s3_path).await?; + Ok(()) + } + + // Construct an S3 key which is used to upload files. + fn get_s3_key(&self, file_info: &FileInfo) -> String { + let base_path = substr_time(self.dir_struct.as_str(), file_info.created_at); + let trimmed_path = base_path.trim_end_matches(|c: char| c == '/'); + format!("{trimmed_path}/{}", file_info.id) + } +} + +#[async_trait(?Send)] +impl Storage for S3HybridStorage { + async fn prepare(&mut self) -> RustusResult<()> { + Ok(()) + } + + async fn get_contents( + &self, + file_info: &FileInfo, + request: &HttpRequest, + ) -> RustusResult { + if file_info.length != Some(file_info.offset) { + log::debug!("File isn't uploaded. Returning from local storage."); + return self.local_storage.get_contents(file_info, request).await; + } + let key = self.get_s3_key(file_info); + let command = Command::GetObject; + let s3_request = Reqwest::new(&self.bucket, &key, command); + let s3_response = s3_request.response().await?; + let mut response = HttpResponseBuilder::new(actix_web::http::StatusCode::OK); + Ok(response.streaming(s3_response.bytes_stream())) + } + + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> { + let part_len = bytes.len(); + self.local_storage.add_bytes(file_info, bytes).await?; + // If upload is complete. Upload the resulting file onto S3. + if Some(file_info.offset + part_len) == file_info.length { + self.upload_file(file_info).await?; + self.local_storage.remove_file(file_info).await?; + } + Ok(()) + } + + async fn create_file(&self, file_info: &FileInfo) -> RustusResult { + self.local_storage.create_file(file_info).await + } + + async fn concat_files( + &self, + _file_info: &FileInfo, + _parts_info: Vec, + ) -> RustusResult<()> { + Err(RustusError::Unimplemented( + "Hybrid s3 cannot concat files.".into(), + )) + } + + async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { + if Some(file_info.offset) == file_info.length { + self.bucket + .delete_object(self.get_s3_key(file_info)) + .await?; + } else { + self.local_storage.remove_file(file_info).await?; + } + Ok(()) + } +} diff --git a/src/utils/dir_struct.rs b/src/utils/dir_struct.rs index b1c3bef..c9142d7 100644 --- a/src/utils/dir_struct.rs +++ b/src/utils/dir_struct.rs @@ -1,31 +1,35 @@ use chrono::{Datelike, Timelike}; /// Generate directory name with user template. -pub fn dir_struct(dir_structure: &str) -> String { +pub fn substr_now(dir_structure: &str) -> String { let now = chrono::Utc::now(); + substr_time(dir_structure, now) +} + +pub fn substr_time(dir_structure: &str, time: chrono::DateTime) -> String { dir_structure - .replace("{day}", now.day().to_string().as_str()) - .replace("{month}", now.month().to_string().as_str()) - .replace("{year}", now.year().to_string().as_str()) - .replace("{hour}", now.hour().to_string().as_str()) - .replace("{minute}", now.minute().to_string().as_str()) + .replace("{day}", time.day().to_string().as_str()) + .replace("{month}", time.month().to_string().as_str()) + .replace("{year}", time.year().to_string().as_str()) + .replace("{hour}", time.hour().to_string().as_str()) + .replace("{minute}", time.minute().to_string().as_str()) } #[cfg(test)] mod tests { - use super::dir_struct; + use super::substr_now; use chrono::Datelike; #[test] pub fn test_time() { let now = chrono::Utc::now(); - let dir = dir_struct("{day}/{month}"); + let dir = substr_now("{day}/{month}"); assert_eq!(dir, format!("{}/{}", now.day(), now.month())); } #[test] pub fn test_unknown_var() { - let dir = dir_struct("test/{quake}"); + let dir = substr_now("test/{quake}"); assert_eq!(dir, String::from("test/{quake}")); } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 458aa2c..1b0e194 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,5 +1,4 @@ pub mod dir_struct; pub mod enums; -#[cfg(feature = "hashers")] pub mod hashes; pub mod headers;