diff --git a/crates/backend/Cargo.toml b/crates/backend/Cargo.toml index 1daf5886..48fadbbe 100644 --- a/crates/backend/Cargo.toml +++ b/crates/backend/Cargo.toml @@ -85,6 +85,8 @@ semver = { version = "1.0.23", optional = true } bytesize = "1.3.0" rayon = { version = "1.10.0", optional = true } tokio = { version = "1.39.3", optional = true, default-features = false } +futures = { version = "0.3", optional = true, default-features = false } +async-trait = "0.1.81" [target.'cfg(not(windows))'.dependencies] # opendal backend - sftp is not supported on windows, see https://github.com/apache/incubator-opendal/issues/2963 diff --git a/crates/backend/src/choose.rs b/crates/backend/src/choose.rs index f1466c1e..931f4991 100644 --- a/crates/backend/src/choose.rs +++ b/crates/backend/src/choose.rs @@ -1,6 +1,7 @@ //! This module contains [`BackendOptions`] and helpers to choose a backend from a given url. use anyhow::{anyhow, Result}; use derive_setters::Setters; +use rustic_core::{AsyncRepositoryBackends, AsyncWriteBackend}; use std::{collections::HashMap, sync::Arc}; use strum_macros::{Display, EnumString}; @@ -10,6 +11,7 @@ use rustic_core::{RepositoryBackends, WriteBackend}; use crate::{ error::BackendAccessErrorKind, local::LocalBackend, + opendal::AsyncOpenDALBackend, util::{location_to_type_and_path, BackendLocation}, }; @@ -95,6 +97,19 @@ impl BackendOptions { Ok(RepositoryBackends::new(be, be_hot)) } + pub fn to_async_backends(&self) -> Result { + let mut options = self.options.clone(); + options.extend(self.options_cold.clone()); + let be = self + .get_async_backed(self.repository.as_ref(), options)? + .ok_or_else(|| anyhow!("No repository given."))?; + let mut options = self.options.clone(); + options.extend(self.options_hot.clone()); + let be_hot = self.get_async_backed(self.repo_hot.as_ref(), options)?; + + Ok(AsyncRepositoryBackends::new(be, be_hot)) + } + /// Get the backend for the given repository. /// /// # Arguments @@ -125,6 +140,25 @@ impl BackendOptions { }) .transpose() } + + fn get_async_backed( + &self, + repo_string: Option<&String>, + options: HashMap, + ) -> Result>> { + repo_string + .map(|string| { + let (be_type, location) = location_to_type_and_path(string)?; + match be_type.to_async_backends(location, options.into()) { + Ok(e) => Ok(e), + Err(e) if e.downcast_ref::().is_some() => Err(e.into()), + Err(e) => { + Err(BackendAccessErrorKind::BackendLoadError(be_type.to_string(), e).into()) + } + } + }) + .transpose() + } } /// Trait which can be implemented to choose a backend from a backend type, a backend path and options given as `HashMap`. @@ -146,6 +180,12 @@ pub trait BackendChoice { location: BackendLocation, options: Option>, ) -> Result>; + + fn to_async_backends( + &self, + location: BackendLocation, + options: Option>, + ) -> Result>; } /// The supported backend types. @@ -196,6 +236,18 @@ impl BackendChoice for SupportedBackend { Self::OpenDAL => Arc::new(OpenDALBackend::new(location, options)?), }) } + + fn to_async_backends( + &self, + location: BackendLocation, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + match self { + Self::OpenDAL => Ok(Arc::new(AsyncOpenDALBackend::new(location, options)?)), + _ => Err(BackendAccessErrorKind::BackendNoAsync(location.to_string()).into()), + } + } } #[cfg(test)] diff --git a/crates/backend/src/error.rs b/crates/backend/src/error.rs index ef89efe4..152f32f5 100644 --- a/crates/backend/src/error.rs +++ b/crates/backend/src/error.rs @@ -7,6 +7,8 @@ use thiserror::Error; /// [`BackendAccessErrorKind`] describes the errors that can be returned by the various Backends #[derive(Error, Debug, Display)] pub enum BackendAccessErrorKind { + /// no async variant implemented for backend {0:1} + BackendNoAsync(String), /// backend {0:?} is not supported! BackendNotSupported(String), /// backend {0} cannot be loaded: {1:?} diff --git a/crates/backend/src/opendal.rs b/crates/backend/src/opendal.rs index 66f8a9cd..65ed82ec 100644 --- a/crates/backend/src/opendal.rs +++ b/crates/backend/src/opendal.rs @@ -1,18 +1,21 @@ /// `OpenDAL` backend for rustic. -use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::OnceLock}; +use std::{borrow::Borrow, collections::HashMap, path::PathBuf, str::FromStr, sync::OnceLock}; use anyhow::{anyhow, Error, Result}; +use async_trait::async_trait; use bytes::Bytes; use bytesize::ByteSize; -use log::trace; +use log::{debug, trace}; use opendal::{ layers::{BlockingLayer, ConcurrentLimitLayer, LoggingLayer, RetryLayer, ThrottleLayer}, BlockingOperator, ErrorKind, Metakey, Operator, Scheme, }; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; -use tokio::runtime::Runtime; +use tokio::runtime::{EnterGuard, Handle, Runtime}; -use rustic_core::{FileType, Id, ReadBackend, WriteBackend, ALL_FILE_TYPES}; +use rustic_core::{ + AsyncReadBackend, AsyncWriteBackend, FileType, Id, ReadBackend, WriteBackend, ALL_FILE_TYPES, +}; mod constants { /// Default number of retries @@ -20,15 +23,21 @@ mod constants { } /// `OpenDALBackend` contains a wrapper around an blocking operator of the `OpenDAL` library. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct OpenDALBackend { operator: BlockingOperator, } +/// Async implementation of [OpenDALBackend]. +#[derive(Debug)] +pub struct AsyncOpenDALBackend { + operator: Operator, +} + fn runtime() -> &'static Runtime { static RUNTIME: OnceLock = OnceLock::new(); RUNTIME.get_or_init(|| { - tokio::runtime::Builder::new_multi_thread() + tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() @@ -59,7 +68,226 @@ impl FromStr for Throttle { } } +impl AsyncOpenDALBackend { + // TODO - factorize code with OpenDALBackend::new() + pub fn new(path: impl AsRef, options: HashMap) -> Result { + let max_retries = match options.get("retry").map(String::as_str) { + Some("false" | "off") => 0, + None | Some("default") => constants::DEFAULT_RETRY, + Some(value) => usize::from_str(value)?, + }; + let connections = options + .get("connections") + .map(|c| usize::from_str(c)) + .transpose()?; + + let throttle = options + .get("throttle") + .map(|t| Throttle::from_str(t)) + .transpose()?; + + let schema = Scheme::from_str(path.as_ref())?; + let mut operator = Operator::via_iter(schema, options)? + .layer(RetryLayer::new().with_max_times(max_retries).with_jitter()); + + if let Some(Throttle { bandwidth, burst }) = throttle { + operator = operator.layer(ThrottleLayer::new(bandwidth, burst)); + } + + if let Some(connections) = connections { + operator = operator.layer(ConcurrentLimitLayer::new(connections)); + } + + Ok(Self { operator }) + } + + /// Return a path for the given file type and id. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// + /// # Returns + /// + /// The path for the given file type and id. + // Let's keep this for now, as it's being used in the trait implementations. + #[allow(clippy::unused_self)] + fn path(&self, tpe: FileType, id: &Id) -> String { + let hex_id = id.to_hex(); + match tpe { + FileType::Config => PathBuf::from("config"), + FileType::Pack => PathBuf::from("data").join(&hex_id[0..2]).join(hex_id), + _ => PathBuf::from(tpe.dirname()).join(hex_id), + } + .to_string_lossy() + .to_string() + } +} + +#[async_trait] +impl AsyncReadBackend for AsyncOpenDALBackend { + /// Returns the location of the backend. + /// + /// This is `local:`. + fn location(&self) -> String { + let mut location = "opendal:".to_string(); + location.push_str(self.operator.info().name()); + location + } + + /// Lists all files of the given type. + /// + /// # Arguments + /// + /// * `tpe` - The type of the files to list. + /// + /// # Notes + /// + /// If the file type is `FileType::Config`, this will return a list with a single default id. + async fn list(&self, tpe: FileType) -> Result> { + trace!("listing tpe: {tpe:?}"); + if tpe == FileType::Config { + return Ok(if self.operator.is_exist("config").await? { + vec![Id::default()] + } else { + Vec::new() + }); + } + + Ok(self + .operator + .list_with(&(tpe.dirname().to_string() + "/")) + .recursive(true) + .await? + .into_iter() + .filter(|e| e.metadata().is_file()) + .map(|e| Id::from_hex(e.name())) + .filter_map(Result::ok) + .collect()) + } + + /// Lists all files with their size of the given type. + /// + /// # Arguments + /// + /// * `tpe` - The type of the files to list. + /// + async fn list_with_size(&self, tpe: FileType) -> Result> { + trace!("listing tpe: {tpe:?}"); + if tpe == FileType::Config { + return match self.operator.stat("config").await { + Ok(entry) => Ok(vec![(Id::default(), entry.content_length().try_into()?)]), + Err(err) if err.kind() == ErrorKind::NotFound => Ok(Vec::new()), + Err(err) => Err(err.into()), + }; + } + + Ok(self + .operator + .list_with(&(tpe.dirname().to_string() + "/")) + .recursive(true) + .metakey(Metakey::ContentLength) + .await? + .into_iter() + .filter(|e| e.metadata().is_file()) + .map(|e| -> Result<(Id, u32)> { + Ok(( + Id::from_hex(e.name())?, + e.metadata().content_length().try_into()?, + )) + }) + .filter_map(Result::ok) + .collect()) + } + + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { + trace!("reading tpe: {tpe:?}, id: {id}"); + + Ok(self.operator.read(&self.path(tpe, id)).await?.to_bytes()) + } + + async fn read_partial( + &self, + tpe: FileType, + id: &Id, + _cacheable: bool, + offset: u32, + length: u32, + ) -> Result { + trace!("reading tpe: {tpe:?}, id: {id}, offset: {offset}, length: {length}"); + let range = u64::from(offset)..u64::from(offset + length); + Ok(self + .operator + .read_with(&self.path(tpe, id)) + .range(range) + .await? + .to_bytes()) + } +} + +#[async_trait] +impl AsyncWriteBackend for AsyncOpenDALBackend { + async fn create(&self) -> Result<()> { + trace!("creating repo at {:?}", self.location()); + + for tpe in ALL_FILE_TYPES { + self.operator + .create_dir(&(tpe.dirname().to_string() + "/")) + .await?; + } + + // TODO - use futures::stream::Stream; + for i in 0u8..=255 { + self.operator + .create_dir( + &(PathBuf::from("data") + .join(hex::encode([i])) + .to_string_lossy() + .to_string() + + "/"), + ) + .await? + } + + Ok(()) + } + + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + _cacheable: bool, + buf: Bytes, + ) -> Result<()> { + trace!("writing tpe: {:?}, id: {}", &tpe, &id); + let filename = self.path(tpe, id); + self.operator.write(&filename, buf).await?; + Ok(()) + } + + /// Remove the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// * `cacheable` - Whether the file is cacheable. + async fn remove(&self, tpe: FileType, id: &Id, _cacheable: bool) -> Result<()> { + trace!("removing tpe: {:?}, id: {}", &tpe, &id); + let filename = self.path(tpe, id); + self.operator.delete(&filename).await?; + Ok(()) + } +} + impl OpenDALBackend { + /// TODO have some shared trait with such a method + /// otherwise the knowledge of this async safety could be in this match method in choose.rs + fn safe_in_async_context() -> bool { + false + } + /// Create a new openDAL backend. /// /// # Arguments diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 22e299ba..c593e88d 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -109,6 +109,7 @@ itertools = "0.13.0" quick_cache = "0.6.2" strum = { version = "0.26.3", features = ["derive"] } zstd = "0.13.2" +async-trait = "0.1.81" [target.'cfg(not(windows))'.dependencies] sha2 = { version = "0.10.8", features = ["asm"] } diff --git a/crates/core/src/backend.rs b/crates/core/src/backend.rs index 50a4c911..fedc304d 100644 --- a/crates/core/src/backend.rs +++ b/crates/core/src/backend.rs @@ -12,6 +12,7 @@ pub(crate) mod warm_up; use std::{io::Read, ops::Deref, path::PathBuf, sync::Arc}; use anyhow::Result; +use async_trait::async_trait; use bytes::Bytes; use enum_map::Enum; use log::trace; @@ -167,6 +168,101 @@ pub trait ReadBackend: Send + Sync + 'static { } } +// impl ReadBackend for T where T: AsyncReadBackend { +// fn list(&self, tpe: FileType) -> Result> { + +// } +// } + +/// TODO +#[async_trait] +pub trait AsyncReadBackend: Send + Sync + 'static { + /// Returns the location of the backend. + fn location(&self) -> String; + + /// Lists all files with their size of the given type. + /// + /// # Arguments + /// + /// * `tpe` - The type of the files to list. + /// + /// # Errors + /// + /// If the files could not be listed. + async fn list_with_size(&self, tpe: FileType) -> Result>; + + /// Lists all files of the given type. + /// + /// # Arguments + /// + /// * `tpe` - The type of the files to list. + /// + /// # Errors + /// + /// If the files could not be listed. + async fn list(&self, tpe: FileType) -> Result> { + Ok(self + .list_with_size(tpe) + .await? + .into_iter() + .map(|(id, _)| id) + .collect()) + } + + /// Reads full data of the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// + /// # Errors + /// + /// If the file could not be read. + async fn read_full(&self, tpe: FileType, id: &Id) -> Result; + + /// Reads partial data of the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// * `cacheable` - Whether the file should be cached. + /// * `offset` - The offset to read from. + /// * `length` - The length to read. + /// + /// # Errors + /// + /// If the file could not be read. + async fn read_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + ) -> Result; + + /// Specify if the backend needs a warming-up of files before accessing them. + async fn needs_warm_up(&self) -> bool { + false + } + + /// Warm-up the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// + /// # Errors + /// + /// If the file could not be read. + async fn warm_up(&self, _tpe: FileType, _id: &Id) -> Result<()> { + Ok(()) + } +} + /// Trait for Searching in a backend. /// /// This trait is implemented by all backends that can be searched in. @@ -286,6 +382,131 @@ pub trait FindInBackend: ReadBackend { impl FindInBackend for T {} +impl AsyncFindInBackend for T {} + +#[async_trait] +pub trait AsyncFindInBackend: AsyncReadBackend { + /// Finds the id of the file starting with the given string. + /// + /// # Type Parameters + /// + /// * `T` - The type of the strings. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `vec` - The strings to search for. + /// + /// # Errors + /// + /// * [`BackendAccessErrorKind::NoSuitableIdFound`] - If no id could be found. + /// * [`BackendAccessErrorKind::IdNotUnique`] - If the id is not unique. + /// + /// # Note + /// + /// This function is used to find the id of a snapshot. + /// + /// [`BackendAccessErrorKind::NoSuitableIdFound`]: crate::error::BackendAccessErrorKind::NoSuitableIdFound + /// [`BackendAccessErrorKind::IdNotUnique`]: crate::error::BackendAccessErrorKind::IdNotUnique + async fn find_starts_with + Send + Sync>( + &self, + tpe: FileType, + vec: &[T], + ) -> RusticResult> { + #[derive(Clone, Copy, PartialEq, Eq)] + enum MapResult { + None, + Some(T), + NonUnique, + } + let mut results = vec![MapResult::None; vec.len()]; + for id in self.list(tpe).await.map_err(RusticErrorKind::Backend)? { + let id_hex = id.to_hex(); + for (i, v) in vec.iter().enumerate() { + if id_hex.starts_with(v.as_ref()) { + if results[i] == MapResult::None { + results[i] = MapResult::Some(id); + } else { + results[i] = MapResult::NonUnique; + } + } + } + } + + results + .into_iter() + .enumerate() + .map(|(i, id)| match id { + MapResult::Some(id) => Ok(id), + MapResult::None => Err(BackendAccessErrorKind::NoSuitableIdFound( + (vec[i]).as_ref().to_string(), + ) + .into()), + MapResult::NonUnique => { + Err(BackendAccessErrorKind::IdNotUnique((vec[i]).as_ref().to_string()).into()) + } + }) + .collect() + } + + /// Finds the id of the file starting with the given string. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The string to search for. + /// + /// # Errors + /// + /// * [`IdErrorKind::HexError`] - If the string is not a valid hexadecimal string + /// * [`BackendAccessErrorKind::NoSuitableIdFound`] - If no id could be found. + /// * [`BackendAccessErrorKind::IdNotUnique`] - If the id is not unique. + /// + /// [`IdErrorKind::HexError`]: crate::error::IdErrorKind::HexError + /// [`BackendAccessErrorKind::NoSuitableIdFound`]: crate::error::BackendAccessErrorKind::NoSuitableIdFound + /// [`BackendAccessErrorKind::IdNotUnique`]: crate::error::BackendAccessErrorKind::IdNotUnique + async fn find_id(&self, tpe: FileType, id: &str) -> RusticResult { + Ok(self.find_ids(tpe, &[id.to_string()]).await?.remove(0)) + } + + /// Finds the ids of the files starting with the given strings. + /// + /// # Type Parameters + /// + /// * `T` - The type of the strings. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `ids` - The strings to search for. + /// + /// # Errors + /// + /// * [`IdErrorKind::HexError`] - If the string is not a valid hexadecimal string + /// * [`BackendAccessErrorKind::NoSuitableIdFound`] - If no id could be found. + /// * [`BackendAccessErrorKind::IdNotUnique`] - If the id is not unique. + /// + /// [`IdErrorKind::HexError`]: crate::error::IdErrorKind::HexError + /// [`BackendAccessErrorKind::NoSuitableIdFound`]: crate::error::BackendAccessErrorKind::NoSuitableIdFound + /// [`BackendAccessErrorKind::IdNotUnique`]: crate::error::BackendAccessErrorKind::IdNotUnique + async fn find_ids + Send + Sync>( + &self, + tpe: FileType, + ids: &[T], + ) -> RusticResult> { + match ids + .iter() + .map(|id| Id::from_hex(id.as_ref())) + .collect::>>() + { + Ok(id_vec) => Ok(id_vec), + Err(err) => { + trace!("no valid IDs given: {err}, searching for ID starting with given strings instead"); + self.find_starts_with(tpe, ids).await + } + } + } +} /// Trait for backends that can write. /// This trait is implemented by all backends that can write data. pub trait WriteBackend: ReadBackend { @@ -338,6 +559,57 @@ pub trait WriteBackend: ReadBackend { fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>; } +#[async_trait] +pub trait AsyncWriteBackend: AsyncReadBackend { + // Creates a new backend. + /// + /// # Errors + /// + /// If the backend could not be created. + /// + /// # Returns + /// + /// The result of the creation. + async fn create(&self) -> Result<()> { + Ok(()) + } + + /// Writes bytes to the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// * `cacheable` - Whether the data can be cached. + /// * `buf` - The data to write. + /// + /// # Errors + /// + /// If the data could not be written. + /// + /// # Returns + /// + /// The result of the write. + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()>; + + /// Removes the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// * `cacheable` - Whether the file is cacheable. + /// + /// # Errors + /// + /// If the file could not be removed. + /// + /// # Returns + /// + /// The result of the removal. + async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>; +} + #[cfg(test)] mock! { Backend {} @@ -401,6 +673,66 @@ impl ReadBackend for Arc { } } +#[async_trait] +impl AsyncReadBackend for Arc { + fn location(&self) -> String { + self.deref().location() + } + async fn list(&self, tpe: FileType) -> Result> { + self.deref().list(tpe).await + } + + async fn list_with_size(&self, tpe: FileType) -> Result> { + self.deref().list_with_size(tpe).await + } + + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { + self.deref().read_full(tpe, id).await + } + + async fn read_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + ) -> Result { + self.deref() + .read_partial(tpe, id, cacheable, offset, length) + .await + } + + async fn needs_warm_up(&self) -> bool { + self.deref().needs_warm_up().await + } + + async fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> { + self.deref().warm_up(tpe, id).await + } +} + +#[async_trait] +impl AsyncWriteBackend for Arc { + async fn create(&self) -> Result<()> { + self.deref().create().await + } + + async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { + self.deref().remove(tpe, id, cacheable).await + } + + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { + self.deref().write_bytes(tpe, id, cacheable, buf).await + } +} + +impl std::fmt::Debug for dyn AsyncWriteBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "WriteBackend{{{}}}", self.location()) + } +} + impl std::fmt::Debug for dyn WriteBackend { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "WriteBackend{{{}}}", self.location()) @@ -524,6 +856,12 @@ pub struct RepositoryBackends { repo_hot: Option>, } +#[derive(Debug, Clone)] +pub struct AsyncRepositoryBackends { + repository: Arc, + repo_hot: Option>, +} + impl RepositoryBackends { /// Creates a new [`RepositoryBackends`]. /// @@ -550,3 +888,27 @@ impl RepositoryBackends { self.repo_hot.clone() } } + +impl AsyncRepositoryBackends { + pub fn new( + repository: Arc, + repo_hot: Option>, + ) -> Self { + Self { + repository, + repo_hot, + } + } + + /// Returns the repository of this [`RepositoryBackends`]. + #[must_use] + pub fn repository(&self) -> Arc { + self.repository.clone() + } + + /// Returns the hot repository of this [`RepositoryBackends`]. + #[must_use] + pub fn repo_hot(&self) -> Option> { + self.repo_hot.clone() + } +} diff --git a/crates/core/src/backend/cache.rs b/crates/core/src/backend/cache.rs index 1f4a8088..dea01ac4 100644 --- a/crates/core/src/backend/cache.rs +++ b/crates/core/src/backend/cache.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::Result; +use async_trait::async_trait; use bytes::Bytes; use dirs::cache_dir; use log::{trace, warn}; @@ -18,6 +19,8 @@ use crate::{ id::Id, }; +use super::{AsyncReadBackend, AsyncWriteBackend}; + /// Backend that caches data. /// /// This backend caches data in a directory. @@ -34,6 +37,14 @@ pub struct CachedBackend { cache: Cache, } +pub struct AsyncCachedBackend { + /// The backend to cache. + be: Arc, + // TODO - AsyncCache + /// The cache. + cache: Cache, +} + impl CachedBackend { /// Create a new [`CachedBackend`] from a given backend. /// @@ -45,6 +56,17 @@ impl CachedBackend { } } +impl AsyncCachedBackend { + /// Create a new [`CachedBackend`] from a given backend. + /// + /// # Type Parameters + /// + /// * `BE` - The backend to cache. + pub fn new_cache(be: Arc, cache: Cache) -> Arc { + Arc::new(Self { be, cache }) + } +} + impl ReadBackend for CachedBackend { /// Returns the location of the backend as a String. fn location(&self) -> String { @@ -168,6 +190,132 @@ impl ReadBackend for CachedBackend { } } +#[async_trait] +impl AsyncReadBackend for AsyncCachedBackend { + /// Returns the location of the backend as a String. + fn location(&self) -> String { + self.be.location() + } + + /// Lists all files with their size of the given type. + /// + /// # Arguments + /// + /// * `tpe` - The type of the files to list. + /// + /// # Errors + /// + /// If the backend does not support listing files. + /// + /// # Returns + /// + /// A vector of tuples containing the id and size of the files. + async fn list_with_size(&self, tpe: FileType) -> Result> { + let list = self.be.list_with_size(tpe).await?; + + if tpe.is_cacheable() { + if let Err(err) = self.cache.remove_not_in_list(tpe, &list) { + warn!("Error in cache backend removing files {tpe:?}: {err}"); + } + } + + Ok(list) + } + + /// Reads full data of the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// + /// # Errors + /// + /// * [`CacheBackendErrorKind::FromIoError`] - If the file could not be read. + /// + /// # Returns + /// + /// The data read. + /// + /// [`CacheBackendErrorKind::FromIoError`]: crate::error::CacheBackendErrorKind::FromIoError + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { + if tpe.is_cacheable() { + match self.cache.read_full(tpe, id) { + Ok(Some(data)) => return Ok(data), + Ok(None) => {} + Err(err) => warn!("Error in cache backend reading {tpe:?},{id}: {err}"), + } + let res = self.be.read_full(tpe, id).await; + if let Ok(data) = &res { + if let Err(err) = self.cache.write_bytes(tpe, id, data) { + warn!("Error in cache backend writing {tpe:?},{id}: {err}"); + } + } + res + } else { + self.be.read_full(tpe, id).await + } + } + + /// Reads partial data of the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// * `cacheable` - Whether the file is cacheable. + /// * `offset` - The offset to read from. + /// * `length` - The length to read. + /// + /// # Errors + /// + /// * [`CacheBackendErrorKind::FromIoError`] - If the file could not be read. + /// + /// # Returns + /// + /// The data read. + /// + /// [`CacheBackendErrorKind::FromIoError`]: crate::error::CacheBackendErrorKind::FromIoError + async fn read_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + ) -> Result { + if cacheable || tpe.is_cacheable() { + match self.cache.read_partial(tpe, id, offset, length) { + Ok(Some(data)) => return Ok(data), + Ok(None) => {} + Err(err) => warn!("Error in cache backend reading {tpe:?},{id}: {err}"), + }; + // read full file, save to cache and return partial content + match self.be.read_full(tpe, id).await { + Ok(data) => { + let range = offset as usize..(offset + length) as usize; + if let Err(err) = self.cache.write_bytes(tpe, id, &data) { + warn!("Error in cache backend writing {tpe:?},{id}: {err}"); + } + Ok(Bytes::copy_from_slice(&data.slice(range))) + } + error => error, + } + } else { + self.be + .read_partial(tpe, id, cacheable, offset, length) + .await + } + } + async fn needs_warm_up(&self) -> bool { + self.be.needs_warm_up().await + } + + async fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> { + self.be.warm_up(tpe, id).await + } +} + impl WriteBackend for CachedBackend { /// Creates the backend. fn create(&self) -> Result<()> { @@ -211,6 +359,50 @@ impl WriteBackend for CachedBackend { } } +#[async_trait] +impl AsyncWriteBackend for AsyncCachedBackend { + /// Creates the backend. + async fn create(&self) -> Result<()> { + self.be.create().await + } + + /// Writes the given data to the given file. + /// + /// If the file is cacheable, it will also be written to the cache. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// * `cacheable` - Whether the file is cacheable. + /// * `buf` - The data to write. + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { + if cacheable || tpe.is_cacheable() { + if let Err(err) = self.cache.write_bytes(tpe, id, &buf) { + warn!("Error in cache backend writing {tpe:?},{id}: {err}"); + } + } + self.be.write_bytes(tpe, id, cacheable, buf).await + } + + /// Removes the given file. + /// + /// If the file is cacheable, it will also be removed from the cache. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { + if cacheable || tpe.is_cacheable() { + if let Err(err) = self.cache.remove(tpe, id) { + warn!("Error in cache backend removing {tpe:?},{id}: {err}"); + } + } + self.be.remove(tpe, id, cacheable).await + } +} + /// Backend that caches data in a directory. #[derive(Clone, Debug)] pub struct Cache { diff --git a/crates/core/src/backend/decrypt.rs b/crates/core/src/backend/decrypt.rs index c9ce8dc0..d265a9bd 100644 --- a/crates/core/src/backend/decrypt.rs +++ b/crates/core/src/backend/decrypt.rs @@ -1,6 +1,7 @@ use std::{num::NonZeroU32, sync::Arc}; use anyhow::Result; +use async_trait::async_trait; use bytes::Bytes; use crossbeam_channel::{unbounded, Receiver}; use rayon::prelude::*; @@ -20,17 +21,23 @@ use crate::{ error::{CryptBackendErrorKind, RusticErrorKind}, id::Id, repofile::RepoFile, - Progress, RusticResult, + Progress, RusticError, RusticResult, }; +use super::{AsyncReadBackend, AsyncWriteBackend}; + /// A backend that can decrypt data. /// This is a trait that is implemented by all backends that can decrypt data. /// It is implemented for all backends that implement `DecryptWriteBackend` and `DecryptReadBackend`. /// This trait is used by the `Repository` to decrypt data. pub trait DecryptFullBackend: DecryptWriteBackend + DecryptReadBackend {} +pub trait AsyncDecryptFullBackend: AsyncDecryptWriteBackend + AsyncDecryptReadBackend {} + impl DecryptFullBackend for T {} +impl AsyncDecryptFullBackend for T {} + pub trait DecryptReadBackend: ReadBackend + Clone + 'static { /// Decrypts the given data. /// @@ -180,6 +187,71 @@ pub trait DecryptReadBackend: ReadBackend + Clone + 'static { } } +pub trait AsyncDecryptReadBackend: AsyncReadBackend + Clone + 'static { + fn decrypt(&self, data: &[u8]) -> RusticResult>; + async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> RusticResult; + fn read_encrypted_from_partial( + &self, + data: &[u8], + uncompressed_length: Option, + ) -> RusticResult { + let mut data = self.decrypt(data)?; + if let Some(length) = uncompressed_length { + data = decode_all(&*data) + .map_err(CryptBackendErrorKind::DecodingZstdCompressedDataFailed)?; + if data.len() != length.get() as usize { + return Err(CryptBackendErrorKind::LengthOfUncompressedDataDoesNotMatch.into()); + } + } + Ok(data.into()) + } + async fn read_encrypted_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + uncompressed_length: Option, + ) -> RusticResult { + self.read_encrypted_from_partial( + &self + .read_partial(tpe, id, cacheable, offset, length) + .await + .map_err(RusticErrorKind::Backend)?, + uncompressed_length, + ) + } + async fn get_file(&self, id: &Id) -> RusticResult { + let data = self.read_encrypted_full(F::TYPE, id).await?; + Ok(serde_json::from_slice(&data) + .map_err(CryptBackendErrorKind::DeserializingFromBytesOfJsonTextFailed)?) + } + async fn stream_all( + &self, + p: &impl Progress, + ) -> RusticResult>> { + let list = self.list(F::TYPE).await.map_err(RusticErrorKind::Backend)?; + self.stream_list(&list, p).await + } + async fn stream_list( + &self, + list: &[Id], + p: &impl Progress, + ) -> RusticResult>> { + p.set_length(list.len() as u64); + let (tx, rx) = unbounded(); + + // TODO - use futures::stream::Stream; + for id in list { + let file = self.get_file::(id).await.map(|file| (*id, file)); + p.inc(1); + tx.send(file).unwrap(); + } + Ok(rx) + } +} + pub trait DecryptWriteBackend: WriteBackend + Clone + 'static { /// The type of the key. type Key: CryptoKey; @@ -339,6 +411,64 @@ pub trait DecryptWriteBackend: WriteBackend + Clone + 'static { fn set_extra_verify(&mut self, extra_check: bool); } +pub trait AsyncDecryptWriteBackend: AsyncWriteBackend + Clone + 'static { + type Key: CryptoKey; + fn key(&self) -> &Self::Key; + async fn hash_write_full(&self, tpe: FileType, data: &[u8]) -> RusticResult; + async fn process_data(&self, data: &[u8]) -> RusticResult<(Vec, u32, Option)>; + async fn hash_write_full_uncompressed(&self, tpe: FileType, data: &[u8]) -> RusticResult { + let data = self.key().encrypt_data(data)?; + let id = hash(&data); + self.write_bytes(tpe, &id, false, data.into()) + .await + .map_err(RusticErrorKind::Backend)?; + Ok(id) + } + async fn save_file(&self, file: &F) -> RusticResult { + let data = serde_json::to_vec(file) + .map_err(CryptBackendErrorKind::SerializingToJsonByteVectorFailed)?; + self.hash_write_full(F::TYPE, &data).await + } + async fn save_file_uncompressed(&self, file: &F) -> RusticResult { + let data = serde_json::to_vec(file) + .map_err(CryptBackendErrorKind::SerializingToJsonByteVectorFailed)?; + self.hash_write_full_uncompressed(F::TYPE, &data).await + } + async fn save_list<'a, F: RepoFile, I: ExactSizeIterator + Send>( + &self, + list: I, + p: impl Progress, + ) -> RusticResult<()> { + p.set_length(list.len() as u64); + + // TODO - use futures::stream::Stream; + for file in list { + _ = self.save_file(file).await?; + p.inc(1); + } + p.finish(); + Ok(()) + } + async fn delete_list<'a, I: ExactSizeIterator + Send>( + &self, + tpe: FileType, + cacheable: bool, + list: I, + p: impl Progress, + ) -> RusticResult<()> { + // TODO - use futures::stream::Stream; + p.set_length(list.len() as u64); + for id in list { + self.remove(tpe, id, cacheable).await.unwrap(); + p.inc(1); + } + p.finish(); + Ok(()) + } + fn set_zstd(&mut self, zstd: Option); + fn set_extra_verify(&mut self, extra_check: bool); +} + /// A backend that can decrypt data. /// /// # Type Parameters @@ -356,6 +486,18 @@ pub struct DecryptBackend { extra_verify: bool, } +#[derive(Debug, Clone)] +pub struct AsyncDecryptBackend { + /// The backend to decrypt. + be: Arc, + /// The key to decrypt the backend with. + key: C, + /// The compression level to use for zstd. + zstd: Option, + /// Whether to do an extra verification by decompressing and decrypting the data + extra_verify: bool, +} + impl DecryptBackend { /// Creates a new decrypt backend. /// @@ -450,6 +592,100 @@ impl DecryptBackend { } } +impl AsyncDecryptBackend { + /// Creates a new decrypt backend. + /// + /// # Type Parameters + /// + /// * `C` - The type of the key to decrypt the backend with. + /// + /// # Arguments + /// + /// * `be` - The backend to decrypt. + /// * `key` - The key to decrypt the backend with. + /// + /// # Returns + /// + /// The new decrypt backend. + pub fn new(be: Arc, key: C) -> Self { + Self { + be, + key, + // zstd and extra_verify are directly set, where needed. + zstd: None, + extra_verify: false, + } + } + + /// Decrypt and potentially decompress an already read repository file + fn decrypt_file(&self, data: &[u8]) -> RusticResult> { + let decrypted = self.decrypt(data)?; + Ok(match decrypted.first() { + Some(b'{' | b'[') => decrypted, // not compressed + Some(2) => decode_all(&decrypted[1..]) + .map_err(CryptBackendErrorKind::DecodingZstdCompressedDataFailed)?, // 2 indicates compressed data following + _ => return Err(CryptBackendErrorKind::DecryptionNotSupportedForBackend)?, + }) + } + + /// encrypt and potentially compress a repository file + fn encrypt_file(&self, data: &[u8]) -> RusticResult> { + let data_encrypted = match self.zstd { + Some(level) => { + let mut out = vec![2_u8]; + copy_encode(data, &mut out, level) + .map_err(CryptBackendErrorKind::CopyEncodingDataFailed)?; + self.key().encrypt_data(&out)? + } + None => self.key().encrypt_data(data)?, + }; + Ok(data_encrypted) + } + + fn very_file(&self, data_encrypted: &[u8], data: &[u8]) -> RusticResult<()> { + if self.extra_verify { + let check_data = self.decrypt_file(data_encrypted)?; + if data != check_data { + return Err(CryptBackendErrorKind::ExtraVerificationFailed.into()); + } + } + Ok(()) + } + + /// encrypt and potentially compress some data + fn encrypt_data(&self, data: &[u8]) -> RusticResult<(Vec, u32, Option)> { + let data_len: u32 = data + .len() + .try_into() + .map_err(CryptBackendErrorKind::IntConversionFailed)?; + let (data_encrypted, uncompressed_length) = match self.zstd { + None => (self.key.encrypt_data(data)?, None), + // compress if requested + Some(level) => ( + self.key.encrypt_data(&encode_all(data, level)?)?, + NonZeroU32::new(data_len), + ), + }; + Ok((data_encrypted, data_len, uncompressed_length)) + } + + fn very_data( + &self, + data_encrypted: &[u8], + uncompressed_length: Option, + data: &[u8], + ) -> RusticResult<()> { + if self.extra_verify { + let data_check = + self.read_encrypted_from_partial(data_encrypted, uncompressed_length)?; + if data != data_check { + return Err(CryptBackendErrorKind::ExtraVerificationFailed.into()); + } + } + Ok(()) + } +} + impl DecryptWriteBackend for DecryptBackend { /// The type of the key. type Key = C; @@ -509,6 +745,66 @@ impl DecryptWriteBackend for DecryptBackend { } } +impl AsyncDecryptWriteBackend for AsyncDecryptBackend { + /// The type of the key. + type Key = C; + + /// Gets the key. + fn key(&self) -> &Self::Key { + &self.key + } + + /// Writes the given data to the backend and returns the id of the data. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `data` - The data to write. + /// + /// # Errors + /// + /// * [`CryptBackendErrorKind::CopyEncodingDataFailed`] - If the data could not be encoded. + /// + /// # Returns + /// + /// The id of the data. + /// + /// [`CryptBackendErrorKind::CopyEncodingDataFailed`]: crate::error::CryptBackendErrorKind::CopyEncodingDataFailed + async fn hash_write_full(&self, tpe: FileType, data: &[u8]) -> RusticResult { + let data_encrypted = self.encrypt_file(data)?; + self.very_file(&data_encrypted, data)?; + let id = hash(&data_encrypted); + self.write_bytes(tpe, &id, false, data_encrypted.into()) + .await + .map_err(RusticErrorKind::Backend)?; + Ok(id) + } + + async fn process_data(&self, data: &[u8]) -> RusticResult<(Vec, u32, Option)> { + let (data_encrypted, data_len, uncompressed_length) = self.encrypt_data(data)?; + self.very_data(&data_encrypted, uncompressed_length, data)?; + Ok((data_encrypted, data_len, uncompressed_length)) + } + + /// Sets the compression level to use for zstd. + /// + /// # Arguments + /// + /// * `zstd` - The compression level to use for zstd. + fn set_zstd(&mut self, zstd: Option) { + self.zstd = zstd; + } + + /// Sets `extra_check`, i.e. whether to do an extra check after compressing/encrypting + /// + /// # Arguments + /// + /// * `extra_echeck` - The compression level to use for zstd. + fn set_extra_verify(&mut self, extra_verify: bool) { + self.extra_verify = extra_verify; + } +} + impl DecryptReadBackend for DecryptBackend { /// Decrypts the given data. /// @@ -543,6 +839,22 @@ impl DecryptReadBackend for DecryptBackend { } } +impl AsyncDecryptReadBackend for AsyncDecryptBackend { + fn decrypt(&self, data: &[u8]) -> RusticResult> { + self.key.decrypt_data(data) + } + + async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> RusticResult { + self.decrypt_file( + &self + .read_full(tpe, id) + .await + .map_err(RusticErrorKind::Backend)?, + ) + .map(Into::into) + } +} + impl ReadBackend for DecryptBackend { fn location(&self) -> String { self.be.location() @@ -572,6 +884,38 @@ impl ReadBackend for DecryptBackend { } } +#[async_trait] +impl AsyncReadBackend for AsyncDecryptBackend { + fn location(&self) -> String { + self.be.location() + } + + async fn list(&self, tpe: FileType) -> Result> { + self.be.list(tpe).await + } + + async fn list_with_size(&self, tpe: FileType) -> Result> { + self.be.list_with_size(tpe).await + } + + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { + self.be.read_full(tpe, id).await + } + + async fn read_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + ) -> Result { + self.be + .read_partial(tpe, id, cacheable, offset, length) + .await + } +} + impl WriteBackend for DecryptBackend { fn create(&self) -> Result<()> { self.be.create() @@ -586,6 +930,21 @@ impl WriteBackend for DecryptBackend { } } +#[async_trait] +impl AsyncWriteBackend for AsyncDecryptBackend { + async fn create(&self) -> Result<()> { + self.be.create().await + } + + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { + self.be.write_bytes(tpe, id, cacheable, buf).await + } + + async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { + self.be.remove(tpe, id, cacheable).await + } +} + #[cfg(test)] mod tests { use crate::{backend::MockBackend, crypto::aespoly1305::Key}; diff --git a/crates/core/src/backend/hotcold.rs b/crates/core/src/backend/hotcold.rs index 75a1f750..f09eac46 100644 --- a/crates/core/src/backend/hotcold.rs +++ b/crates/core/src/backend/hotcold.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Result; +use async_trait::async_trait; use bytes::Bytes; use crate::{ @@ -8,6 +9,8 @@ use crate::{ id::Id, }; +use super::{AsyncReadBackend, AsyncWriteBackend}; + /// A hot/cold backend implementation. /// /// # Type Parameters @@ -21,6 +24,14 @@ pub struct HotColdBackend { be_hot: Arc, } +#[derive(Clone, Debug)] +pub struct AsyncHotColdBackend { + /// The backend to use. + be: Arc, + /// The backend to use for hot files. + be_hot: Arc, +} + impl HotColdBackend { /// Creates a new `HotColdBackend`. /// @@ -40,6 +51,25 @@ impl HotColdBackend { } } +impl AsyncHotColdBackend { + /// Creates a new `HotColdBackend`. + /// + /// # Type Parameters + /// + /// * `BE` - The backend to use. + /// + /// # Arguments + /// + /// * `be` - The backend to use. + /// * `hot_be` - The backend to use for hot files. + pub fn new(be: BE, hot_be: BE) -> Self { + Self { + be: Arc::new(be), + be_hot: Arc::new(hot_be), + } + } +} + impl ReadBackend for HotColdBackend { fn location(&self) -> String { self.be.location() @@ -77,6 +107,47 @@ impl ReadBackend for HotColdBackend { } } +#[async_trait] +impl AsyncReadBackend for AsyncHotColdBackend { + fn location(&self) -> String { + self.be.location() + } + + async fn list_with_size(&self, tpe: FileType) -> Result> { + self.be.list_with_size(tpe).await + } + + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { + self.be_hot.read_full(tpe, id).await + } + + async fn read_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + ) -> Result { + if cacheable || tpe != FileType::Pack { + self.be_hot + .read_partial(tpe, id, cacheable, offset, length) + .await + } else { + self.be + .read_partial(tpe, id, cacheable, offset, length) + .await + } + } + + async fn needs_warm_up(&self) -> bool { + self.be.needs_warm_up().await + } + + async fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> { + self.be.warm_up(tpe, id).await + } +} impl WriteBackend for HotColdBackend { fn create(&self) -> Result<()> { self.be.create()?; @@ -99,3 +170,29 @@ impl WriteBackend for HotColdBackend { Ok(()) } } + +#[async_trait] +impl AsyncWriteBackend for AsyncHotColdBackend { + async fn create(&self) -> Result<()> { + self.be.create().await?; + self.be_hot.create().await + } + + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { + if tpe != FileType::Config && (cacheable || tpe != FileType::Pack) { + self.be_hot + .write_bytes(tpe, id, cacheable, buf.clone()) + .await?; + } + self.be.write_bytes(tpe, id, cacheable, buf).await + } + + async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { + // First remove cold file + self.be.remove(tpe, id, cacheable).await?; + if cacheable || tpe != FileType::Pack { + self.be_hot.remove(tpe, id, cacheable).await?; + } + Ok(()) + } +} diff --git a/crates/core/src/backend/warm_up.rs b/crates/core/src/backend/warm_up.rs index 87e9e009..bab1a1bb 100644 --- a/crates/core/src/backend/warm_up.rs +++ b/crates/core/src/backend/warm_up.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Result; +use async_trait::async_trait; use bytes::Bytes; use crate::{ @@ -8,6 +9,8 @@ use crate::{ id::Id, }; +use super::{AsyncReadBackend, AsyncWriteBackend}; + /// A backend which warms up files by simply accessing them. #[derive(Clone, Debug)] pub struct WarmUpAccessBackend { @@ -15,6 +18,18 @@ pub struct WarmUpAccessBackend { be: Arc, } +#[derive(Clone, Debug)] +pub struct AsyncWarmUpAccessBackend { + /// The backend to use. + be: Arc, +} + +impl AsyncWarmUpAccessBackend { + pub fn new_warm_up(be: Arc) -> Arc { + Arc::new(Self { be }) + } +} + impl WarmUpAccessBackend { /// Creates a new `WarmUpAccessBackend`. /// @@ -61,6 +76,44 @@ impl ReadBackend for WarmUpAccessBackend { } } +#[async_trait] +impl AsyncReadBackend for AsyncWarmUpAccessBackend { + fn location(&self) -> String { + self.be.location() + } + + async fn list_with_size(&self, tpe: FileType) -> Result> { + self.be.list_with_size(tpe).await + } + + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { + self.be.read_full(tpe, id).await + } + + async fn read_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + ) -> Result { + self.be + .read_partial(tpe, id, cacheable, offset, length) + .await + } + + async fn needs_warm_up(&self) -> bool { + true + } + + async fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> { + // warm up files by accessing them - error is ignored as we expect this to error out! + _ = self.be.read_partial(tpe, id, false, 0, 1); + Ok(()) + } +} + impl WriteBackend for WarmUpAccessBackend { fn create(&self) -> Result<()> { self.be.create() @@ -75,3 +128,19 @@ impl WriteBackend for WarmUpAccessBackend { self.be.remove(tpe, id, cacheable) } } + +#[async_trait] +impl AsyncWriteBackend for AsyncWarmUpAccessBackend { + async fn create(&self) -> Result<()> { + self.be.create().await + } + + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { + self.be.write_bytes(tpe, id, cacheable, buf).await + } + + async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { + // First remove cold file + self.be.remove(tpe, id, cacheable).await + } +} diff --git a/crates/core/src/commands/config.rs b/crates/core/src/commands/config.rs index df99a0a0..3d117af5 100644 --- a/crates/core/src/commands/config.rs +++ b/crates/core/src/commands/config.rs @@ -3,11 +3,14 @@ use bytesize::ByteSize; use derive_setters::Setters; use crate::{ - backend::decrypt::{DecryptBackend, DecryptWriteBackend}, + backend::decrypt::{ + AsyncDecryptBackend, AsyncDecryptWriteBackend, DecryptBackend, DecryptWriteBackend, + }, crypto::CryptoKey, error::{CommandErrorKind, RusticResult}, repofile::ConfigFile, repository::{Open, Repository}, + AsyncRepository, }; /// Apply the [`ConfigOptions`] to a given [`ConfigFile`] @@ -99,6 +102,25 @@ pub(crate) fn save_config( Ok(()) } +pub(crate) async fn save_config_async( + repo: &AsyncRepository, + mut new_config: ConfigFile, + key: impl CryptoKey, +) -> RusticResult<()> { + new_config.is_hot = None; + let dbe = AsyncDecryptBackend::new(repo.be.clone(), key); + // for hot/cold backend, this only saves the config to the cold repo. + _ = dbe.save_file_uncompressed(&new_config).await?; + + if let Some(hot_be) = repo.be_hot.clone() { + // save config to hot repo + let dbe = AsyncDecryptBackend::new(hot_be, key); + new_config.is_hot = Some(true); + _ = dbe.save_file_uncompressed(&new_config).await?; + } + Ok(()) +} + #[cfg_attr(feature = "clap", derive(clap::Parser))] #[derive(Debug, Clone, Copy, Default, Setters)] #[setters(into)] diff --git a/crates/core/src/commands/copy.rs b/crates/core/src/commands/copy.rs index 1adfe29d..d197c693 100644 --- a/crates/core/src/commands/copy.rs +++ b/crates/core/src/commands/copy.rs @@ -10,7 +10,8 @@ use crate::{ index::{indexer::Indexer, ReadIndex}, progress::{Progress, ProgressBars}, repofile::SnapshotFile, - repository::{IndexedFull, IndexedIds, IndexedTree, Open, Repository}, + repository::{AsyncOpen, IndexedFull, IndexedIds, IndexedTree, Open, Repository}, + AsyncRepository, }; /// This struct enhances `[SnapshotFile]` with the attribute `relevant` @@ -183,3 +184,33 @@ where Ok(relevant) } + +pub(crate) async fn relevant_snapshots_async( + snaps: &[SnapshotFile], + dest_repo: &AsyncRepository, + filter: F, +) -> RusticResult> +where + F: FnMut(&SnapshotFile) -> bool, +{ + let p = dest_repo + .pb + .progress_counter("finding relevant snapshots..."); + // save snapshots in destination in BTreeSet, as we want to efficiently search within to filter out already existing snapshots before copying. + let snapshots_dest: BTreeSet<_> = + SnapshotFile::all_from_backend_async(dest_repo.dbe(), filter, &p) + .await? + .into_iter() + .collect(); + + let relevant = snaps + .iter() + .cloned() + .map(|sn| CopySnapshot { + relevant: !snapshots_dest.contains(&sn), + sn, + }) + .collect(); + + Ok(relevant) +} diff --git a/crates/core/src/commands/forget.rs b/crates/core/src/commands/forget.rs index 797acfe4..eef58c24 100644 --- a/crates/core/src/commands/forget.rs +++ b/crates/core/src/commands/forget.rs @@ -13,7 +13,8 @@ use crate::{ snapshotfile::{SnapshotGroup, SnapshotGroupCriterion}, SnapshotFile, StringList, }, - repository::{Open, Repository}, + repository::{AsyncOpen, Open, Repository}, + AsyncRepository, }; type CheckFunction = fn(&SnapshotFile, &SnapshotFile) -> bool; @@ -100,6 +101,29 @@ pub(crate) fn get_forget_snapshots( Ok(ForgetGroups(groups)) } +pub(crate) async fn get_forget_snapshots_async( + repo: &AsyncRepository, + keep: &KeepOptions, + group_by: SnapshotGroupCriterion, + filter: impl FnMut(&SnapshotFile) -> bool, +) -> RusticResult { + let now = Local::now(); + + let groups = repo + .get_snapshot_group(&[], group_by, filter) + .await? + .into_iter() + .map(|(group, snapshots)| -> RusticResult<_> { + Ok(ForgetGroup { + group, + snapshots: keep.apply(snapshots, now)?, + }) + }) + .collect::>()?; + + Ok(ForgetGroups(groups)) +} + #[cfg_attr(feature = "clap", derive(clap::Parser))] #[cfg_attr(feature = "merge", derive(merge::Merge))] #[skip_serializing_none] diff --git a/crates/core/src/commands/init.rs b/crates/core/src/commands/init.rs index 6e198591..2be281e5 100644 --- a/crates/core/src/commands/init.rs +++ b/crates/core/src/commands/init.rs @@ -6,7 +6,7 @@ use crate::{ backend::WriteBackend, chunker::random_poly, commands::{ - config::{save_config, ConfigOptions}, + config::{save_config, save_config_async, ConfigOptions}, key::KeyOptions, }, crypto::aespoly1305::Key, @@ -14,6 +14,7 @@ use crate::{ id::Id, repofile::ConfigFile, repository::Repository, + AsyncRepository, }; /// Initialize a new repository. @@ -62,6 +63,29 @@ pub(crate) fn init( Ok((key, config)) } +pub(crate) async fn init_async( + repo: &AsyncRepository, + pass: &str, + key_opts: &KeyOptions, + config_opts: &ConfigOptions, +) -> RusticResult<(Key, ConfigFile)> { + // Create config first to allow catching errors from here without writing anything + let repo_id = Id::random(); + let chunker_poly = random_poly()?; + let mut config = ConfigFile::new(2, repo_id, chunker_poly); + if repo.be_hot.is_some() { + // for hot/cold repository, `config` must be identical to thee config file which is read by the backend, i.e. the one saved in the hot repo. + // Note: init_with_config does handle the is_hot config correctly for the hot and the cold repo. + config.is_hot = Some(true); + } + config_opts.apply(&mut config)?; + + let key = init_with_config_async(repo, pass, key_opts, &config).await?; + info!("repository {} successfully created.", repo_id); + + Ok((key, config)) +} + /// Initialize a new repository with a given config. /// /// # Type Parameters @@ -92,3 +116,17 @@ pub(crate) fn init_with_config( Ok(key) } + +pub(crate) async fn init_with_config_async( + repo: &AsyncRepository, + pass: &str, + key_opts: &KeyOptions, + config: &ConfigFile, +) -> RusticResult { + repo.be.create().await.map_err(RusticErrorKind::Backend)?; + let (key, id) = key_opts.init_key_async(repo, pass).await?; + info!("key {id} successfully added."); + save_config_async(repo, config.clone(), key).await?; + + Ok(key) +} diff --git a/crates/core/src/commands/key.rs b/crates/core/src/commands/key.rs index 4a4d195a..f38ef84f 100644 --- a/crates/core/src/commands/key.rs +++ b/crates/core/src/commands/key.rs @@ -8,6 +8,7 @@ use crate::{ id::Id, repofile::KeyFile, repository::{Open, Repository}, + AsyncRepository, }; #[cfg_attr(feature = "clap", derive(clap::Parser))] @@ -84,6 +85,16 @@ impl KeyOptions { Ok((key, self.add(repo, pass, key)?)) } + pub(crate) async fn init_key_async( + &self, + repo: &AsyncRepository, + pass: &str, + ) -> RusticResult<(Key, Id)> { + // generate key + let key = Key::new(); + Ok((key, self.add_async(repo, pass, key).await?)) + } + /// Add a key to the repository. /// /// # Arguments @@ -112,4 +123,22 @@ impl KeyOptions { .map_err(RusticErrorKind::Backend)?; Ok(id) } + + async fn add_async( + &self, + repo: &AsyncRepository, + pass: &str, + key: Key, + ) -> RusticResult { + let ko = self.clone(); + let keyfile = KeyFile::generate(key, &pass, ko.hostname, ko.username, ko.with_created)?; + + let data = serde_json::to_vec(&keyfile).map_err(CommandErrorKind::FromJsonError)?; + let id = hash(&data); + repo.be + .write_bytes(FileType::Key, &id, false, data.into()) + .await + .map_err(RusticErrorKind::Backend)?; + Ok(id) + } } diff --git a/crates/core/src/commands/snapshots.rs b/crates/core/src/commands/snapshots.rs index 37b11e0c..1bdec7d7 100644 --- a/crates/core/src/commands/snapshots.rs +++ b/crates/core/src/commands/snapshots.rs @@ -7,7 +7,7 @@ use crate::{ snapshotfile::{SnapshotGroup, SnapshotGroupCriterion}, SnapshotFile, }, - repository::{Open, Repository}, + repository::{AsyncOpen, AsyncRepository, Open, Repository}, }; /// Get the snapshots from the repository. @@ -59,3 +59,37 @@ pub(crate) fn get_snapshot_group( Ok(groups) } + +pub(crate) async fn get_snapshot_group_async( + repo: &AsyncRepository, + ids: &[String], + group_by: SnapshotGroupCriterion, + filter: impl FnMut(&SnapshotFile) -> bool, +) -> RusticResult)>> { + let pb = &repo.pb; + let dbe = repo.dbe(); + let p = pb.progress_counter("getting snapshots..."); + let groups = match ids { + [] => SnapshotFile::group_from_backend_async(dbe, filter, group_by, &p).await?, + [id] if id == "latest" => SnapshotFile::group_from_backend_async(dbe, filter, group_by, &p) + .await? + .into_iter() + .map(|(group, mut snaps)| { + snaps.sort_unstable(); + let last_idx = snaps.len() - 1; + snaps.swap(0, last_idx); + snaps.truncate(1); + (group, snaps) + }) + .collect::>(), + _ => { + let item = ( + SnapshotGroup::default(), + SnapshotFile::from_ids_async(dbe, ids, &p).await?, + ); + vec![item] + } + }; + + Ok(groups) +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index d2584b04..0fb6f1ba 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -126,8 +126,9 @@ pub use crate::{ ignore::{LocalSource, LocalSourceFilterOptions, LocalSourceSaveOptions}, local_destination::LocalDestination, node::last_modified_node, - FileType, ReadBackend, ReadSource, ReadSourceEntry, ReadSourceOpen, RepositoryBackends, - WriteBackend, ALL_FILE_TYPES, + AsyncReadBackend, AsyncRepositoryBackends, AsyncWriteBackend, FileType, ReadBackend, + ReadSource, ReadSourceEntry, ReadSourceOpen, RepositoryBackends, WriteBackend, + ALL_FILE_TYPES, }, blob::tree::{FindMatches, FindNode, TreeStreamerOptions as LsOptions}, commands::{ @@ -149,6 +150,7 @@ pub use crate::{ PathList, SnapshotGroup, SnapshotGroupCriterion, SnapshotOptions, StringList, }, repository::{ - FullIndex, IndexedFull, IndexedStatus, OpenStatus, Repository, RepositoryOptions, + AsyncOpenStatus, AsyncRepository, FullIndex, IndexedFull, IndexedStatus, OpenStatus, + Repository, RepositoryOptions, }, }; diff --git a/crates/core/src/repofile/keyfile.rs b/crates/core/src/repofile/keyfile.rs index 5837b524..6e54d7ea 100644 --- a/crates/core/src/repofile/keyfile.rs +++ b/crates/core/src/repofile/keyfile.rs @@ -9,6 +9,7 @@ use crate::{ crypto::{aespoly1305::Key, CryptoKey}, error::{KeyFileErrorKind, RusticErrorKind, RusticResult}, id::Id, + AsyncReadBackend, }; pub(super) mod constants { @@ -209,6 +210,17 @@ impl KeyFile { .map_err(KeyFileErrorKind::DeserializingFromSliceFailed)?, ) } + + async fn from_async_backend(be: &B, id: &Id) -> RusticResult { + let data = be + .read_full(FileType::Key, id) + .await + .map_err(RusticErrorKind::Backend)?; + Ok( + serde_json::from_slice(&data) + .map_err(KeyFileErrorKind::DeserializingFromSliceFailed)?, + ) + } } /// Calculate the logarithm to base 2 of the given number @@ -305,6 +317,16 @@ pub(crate) fn key_from_backend( KeyFile::from_backend(be, id)?.key_from_password(passwd) } +pub(crate) async fn key_from_async_backend( + be: &B, + id: &Id, + passwd: &impl AsRef<[u8]>, +) -> RusticResult { + KeyFile::from_async_backend(be, id) + .await? + .key_from_password(passwd) +} + /// Find a [`KeyFile`] in the backend that fits to the given password and return the contained key. /// If a key hint is given, only this key is tested. /// This is recommended for a large number of keys. @@ -340,3 +362,24 @@ pub(crate) fn find_key_in_backend( Err(KeyFileErrorKind::NoSuitableKeyFound.into()) } } + +pub(crate) async fn find_key_in_async_backend( + be: &B, + passwd: &impl AsRef<[u8]>, + hint: Option<&Id>, +) -> RusticResult { + if let Some(id) = hint { + key_from_async_backend(be, id, passwd).await + } else { + for id in be + .list(FileType::Key) + .await + .map_err(RusticErrorKind::Backend)? + { + if let Ok(key) = key_from_async_backend(be, &id, passwd).await { + return Ok(key); + } + } + Err(KeyFileErrorKind::NoSuitableKeyFound.into()) + } +} diff --git a/crates/core/src/repofile/snapshotfile.rs b/crates/core/src/repofile/snapshotfile.rs index e4224afd..c1ad6903 100644 --- a/crates/core/src/repofile/snapshotfile.rs +++ b/crates/core/src/repofile/snapshotfile.rs @@ -18,7 +18,10 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr, OneOrMany}; use crate::{ - backend::{decrypt::DecryptReadBackend, FileType, FindInBackend}, + backend::{ + decrypt::{AsyncDecryptReadBackend, DecryptReadBackend}, + AsyncFindInBackend, FileType, FindInBackend, + }, error::{RusticError, RusticResult, SnapshotFileErrorKind}, id::Id, progress::Progress, @@ -436,6 +439,10 @@ impl SnapshotFile { Ok(Self::set_id((*id, be.get_file(id)?))) } + async fn from_backend_async(be: &B, id: &Id) -> RusticResult { + Ok(Self::set_id((*id, be.get_file(id).await?))) + } + /// Get a [`SnapshotFile`] from the backend by (part of the) Id /// /// # Arguments @@ -466,6 +473,18 @@ impl SnapshotFile { } } + pub(crate) async fn from_str_async( + be: &B, + string: &str, + predicate: impl FnMut(&Self) -> bool + Send + Sync, + p: &impl Progress, + ) -> RusticResult { + match string { + "latest" => Self::latest_async(be, predicate, p).await, + _ => Self::from_id_async(be, string).await, + } + } + /// Get the latest [`SnapshotFile`] from the backend /// /// # Arguments @@ -506,6 +525,33 @@ impl SnapshotFile { latest.ok_or_else(|| SnapshotFileErrorKind::NoSnapshotsFound.into()) } + pub(crate) async fn latest_async( + be: &B, + predicate: impl FnMut(&Self) -> bool + Send + Sync, + p: &impl Progress, + ) -> RusticResult { + p.set_title("getting latest snapshot..."); + let mut latest: Option = None; + let mut pred = predicate; + + for snap in be.stream_all::(p).await? { + let (id, mut snap) = snap?; + if !pred(&snap) { + continue; + } + + snap.id = id; + match &latest { + Some(l) if l.time > snap.time => {} + _ => { + latest = Some(snap); + } + } + } + p.finish(); + latest.ok_or_else(|| SnapshotFileErrorKind::NoSnapshotsFound.into()) + } + /// Get a [`SnapshotFile`] from the backend by (part of the) id /// /// # Arguments @@ -527,6 +573,15 @@ impl SnapshotFile { Self::from_backend(be, &id) } + pub(crate) async fn from_id_async( + be: &B, + id: &str, + ) -> RusticResult { + info!("getting snapshot..."); + let id = be.find_id(FileType::Snapshot, id).await?; + Self::from_backend_async(be, &id).await + } + /// Get a list of [`SnapshotFile`]s from the backend by supplying a list of/parts of their Ids /// /// # Arguments @@ -560,6 +615,25 @@ impl SnapshotFile { .collect()) } + pub(crate) async fn from_ids_async + Send + Sync>( + be: &B, + ids: &[T], + p: &impl Progress, + ) -> RusticResult> { + let ids = be.find_ids(FileType::Snapshot, ids).await?; + let mut list: BTreeMap<_, _> = be + .stream_list::(&ids, p) + .await? + .into_iter() + .try_collect()?; + // sort back to original order + Ok(ids + .into_iter() + .filter_map(|id| list.remove_entry(&id)) + .map(Self::set_id) + .collect()) + } + /// Compare two [`SnapshotFile`]s by criteria from [`SnapshotGroupCriterion`]. /// /// # Arguments @@ -649,6 +723,30 @@ impl SnapshotFile { Ok(result) } + pub(crate) async fn group_from_backend_async( + be: &B, + filter: F, + crit: SnapshotGroupCriterion, + p: &impl Progress, + ) -> RusticResult)>> + where + B: AsyncDecryptReadBackend, + F: FnMut(&Self) -> bool, + { + let mut snaps = Self::all_from_backend_async(be, filter, p).await?; + snaps.sort_unstable_by(|sn1, sn2| sn1.cmp_group(crit, sn2)); + + let mut result = Vec::new(); + for (group, snaps) in &snaps + .into_iter() + .chunk_by(|sn| SnapshotGroup::from_snapshot(sn, crit)) + { + result.push((group, snaps.collect())); + } + + Ok(result) + } + // TODO: add documentation! pub(crate) fn all_from_backend( be: &B, @@ -666,6 +764,23 @@ impl SnapshotFile { .try_collect() } + pub(crate) async fn all_from_backend_async( + be: &B, + filter: F, + p: &impl Progress, + ) -> RusticResult> + where + B: AsyncDecryptReadBackend, + F: FnMut(&Self) -> bool, + { + be.stream_all::(p) + .await? + .into_iter() + .map_ok(Self::set_id) + .filter_ok(filter) + .try_collect() + } + /// Add tag lists to snapshot. /// /// # Arguments diff --git a/crates/core/src/repository.rs b/crates/core/src/repository.rs index ecea91be..c0a0cccc 100644 --- a/crates/core/src/repository.rs +++ b/crates/core/src/repository.rs @@ -19,12 +19,15 @@ use serde_with::{serde_as, DisplayFromStr, OneOrMany}; use crate::{ backend::{ - cache::{Cache, CachedBackend}, - decrypt::{DecryptBackend, DecryptReadBackend, DecryptWriteBackend}, - hotcold::HotColdBackend, + cache::{AsyncCachedBackend, Cache, CachedBackend}, + decrypt::{ + AsyncDecryptBackend, AsyncDecryptReadBackend, AsyncDecryptWriteBackend, DecryptBackend, + DecryptReadBackend, DecryptWriteBackend, + }, + hotcold::{AsyncHotColdBackend, HotColdBackend}, local_destination::LocalDestination, node::Node, - warm_up::WarmUpAccessBackend, + warm_up::{AsyncWarmUpAccessBackend, WarmUpAccessBackend}, FileType, ReadBackend, WriteBackend, }, blob::{ @@ -56,13 +59,13 @@ use crate::{ }, progress::{NoProgressBars, Progress, ProgressBars}, repofile::{ - keyfile::find_key_in_backend, + keyfile::{find_key_in_async_backend, find_key_in_backend}, snapshotfile::{SnapshotGroup, SnapshotGroupCriterion}, ConfigFile, PathList, RepoFile, SnapshotFile, SnapshotSummary, Tree, }, - repository::{warm_up::warm_up, warm_up::warm_up_wait}, + repository::warm_up::{warm_up, warm_up_wait}, vfs::OpenFile, - RepositoryBackends, RusticResult, + AsyncRepositoryBackends, AsyncWriteBackend, RepositoryBackends, RusticResult, }; mod constants { @@ -284,6 +287,27 @@ pub struct Repository { status: S, } +#[derive(Debug, Clone)] +pub struct AsyncRepository { + /// The name of the repository + pub name: String, + + /// The `HotColdBackend` to use for this repository + pub(crate) be: Arc, + + /// The Backend to use for hot files + pub(crate) be_hot: Option>, + + /// The options used for this repository + opts: RepositoryOptions, + + /// The progress bar to use + pub(crate) pb: P, + + /// The status + status: S, +} + impl Repository { /// Create a new repository from the given [`RepositoryOptions`] (without progress bars) /// @@ -304,6 +328,12 @@ impl Repository { } } +impl AsyncRepository { + pub fn new(opts: &RepositoryOptions, backends: &AsyncRepositoryBackends) -> RusticResult { + Self::new_with_progress(opts, backends, NoProgressBars {}) + } +} + impl

Repository { /// Create a new repository from the given [`RepositoryOptions`] with given progress bars /// @@ -363,6 +393,44 @@ impl

Repository { } } +impl

AsyncRepository { + pub fn new_with_progress( + opts: &RepositoryOptions, + backends: &AsyncRepositoryBackends, + pb: P, + ) -> RusticResult { + let mut be = backends.repository(); + let be_hot = backends.repo_hot(); + + if !opts.warm_up_command.is_empty() { + if opts.warm_up_command.iter().all(|c| !c.contains("%id")) { + return Err(RepositoryErrorKind::NoIDSpecified.into()); + } + info!("using warm-up command {:?}", opts.warm_up_command); + } + + if opts.warm_up { + be = AsyncWarmUpAccessBackend::new_warm_up(be); + } + + let mut name = be.location(); + if let Some(be_hot) = &be_hot { + be = Arc::new(AsyncHotColdBackend::new(be, be_hot.clone())); + name.push('#'); + name.push_str(&be_hot.location()); + } + + Ok(Self { + name, + be, + be_hot, + opts: opts.clone(), + pb, + status: (), + }) + } +} + impl Repository { /// Evaluates the password given by the repository options /// @@ -687,194 +755,870 @@ impl Repository { } } -impl Repository { - /// Collect information about repository files +impl AsyncRepository { + pub fn password(&self) -> RusticResult> { + self.opts.evaluate_password() + } + + /// Returns the Id of the config file /// /// # Errors /// - /// If files could not be listed. - pub fn infos_files(&self) -> RusticResult { - commands::repoinfo::collect_file_infos(self) + /// * [`RepositoryErrorKind::ListingRepositoryConfigFileFailed`] - If listing the repository config file failed + /// * [`RepositoryErrorKind::MoreThanOneRepositoryConfig`] - If there is more than one repository config file + /// + /// # Returns + /// + /// The id of the config file or `None` if no config file is found + /// + /// [`RepositoryErrorKind::ListingRepositoryConfigFileFailed`]: crate::error::RepositoryErrorKind::ListingRepositoryConfigFileFailed + /// [`RepositoryErrorKind::MoreThanOneRepositoryConfig`]: crate::error::RepositoryErrorKind::MoreThanOneRepositoryConfig + pub async fn config_id(&self) -> RusticResult> { + let config_ids = self + .be + .list(FileType::Config) + .await + .map_err(|_| RepositoryErrorKind::ListingRepositoryConfigFileFailed)?; + + match config_ids.len() { + 1 => Ok(Some(config_ids[0])), + 0 => Ok(None), + _ => Err(RepositoryErrorKind::MoreThanOneRepositoryConfig(self.name.clone()).into()), + } } - /// Warm up the given pack files without waiting. - /// - /// # Arguments + /// Open the repository. /// - /// * `packs` - The pack files to warm up + /// This gets the decryption key and reads the config file /// /// # Errors /// - /// * [`RepositoryErrorKind::FromSplitError`] - If the command could not be parsed. - /// * [`RepositoryErrorKind::FromThreadPoolbilderError`] - If the thread pool could not be created. + /// * [`RepositoryErrorKind::NoPasswordGiven`] - If no password is given + /// * [`RepositoryErrorKind::ReadingPasswordFromReaderFailed`] - If reading the password failed + /// * [`RepositoryErrorKind::OpeningPasswordFileFailed`] - If opening the password file failed + /// * [`RepositoryErrorKind::PasswordCommandParsingFailed`] - If parsing the password command failed + /// * [`RepositoryErrorKind::ReadingPasswordFromCommandFailed`] - If reading the password from the command failed + /// * [`RepositoryErrorKind::FromSplitError`] - If splitting the password command failed + /// * [`RepositoryErrorKind::NoRepositoryConfigFound`] - If no repository config file is found + /// * [`RepositoryErrorKind::KeysDontMatchForRepositories`] - If the keys of the hot and cold backend don't match + /// * [`RepositoryErrorKind::IncorrectPassword`] - If the password is incorrect + /// * [`KeyFileErrorKind::NoSuitableKeyFound`] - If no suitable key is found + /// * [`RepositoryErrorKind::ListingRepositoryConfigFileFailed`] - If listing the repository config file failed + /// * [`RepositoryErrorKind::MoreThanOneRepositoryConfig`] - If there is more than one repository config file /// /// # Returns /// - /// The result of the warm up - pub fn warm_up(&self, packs: impl ExactSizeIterator) -> RusticResult<()> { - warm_up(self, packs) + /// The open repository + /// + /// [`RepositoryErrorKind::NoPasswordGiven`]: crate::error::RepositoryErrorKind::NoPasswordGiven + /// [`RepositoryErrorKind::ReadingPasswordFromReaderFailed`]: crate::error::RepositoryErrorKind::ReadingPasswordFromReaderFailed + /// [`RepositoryErrorKind::OpeningPasswordFileFailed`]: crate::error::RepositoryErrorKind::OpeningPasswordFileFailed + /// [`RepositoryErrorKind::PasswordCommandParsingFailed`]: crate::error::RepositoryErrorKind::PasswordCommandParsingFailed + /// [`RepositoryErrorKind::ReadingPasswordFromCommandFailed`]: crate::error::RepositoryErrorKind::ReadingPasswordFromCommandFailed + /// [`RepositoryErrorKind::FromSplitError`]: crate::error::RepositoryErrorKind::FromSplitError + /// [`RepositoryErrorKind::NoRepositoryConfigFound`]: crate::error::RepositoryErrorKind::NoRepositoryConfigFound + /// [`RepositoryErrorKind::KeysDontMatchForRepositories`]: crate::error::RepositoryErrorKind::KeysDontMatchForRepositories + /// [`RepositoryErrorKind::IncorrectPassword`]: crate::error::RepositoryErrorKind::IncorrectPassword + /// [`KeyFileErrorKind::NoSuitableKeyFound`]: crate::error::KeyFileErrorKind::NoSuitableKeyFound + /// [`RepositoryErrorKind::ListingRepositoryConfigFileFailed`]: crate::error::RepositoryErrorKind::ListingRepositoryConfigFileFailed + /// [`RepositoryErrorKind::MoreThanOneRepositoryConfig`]: crate::error::RepositoryErrorKind::MoreThanOneRepositoryConfig + pub async fn open(self) -> RusticResult> { + let password = self + .password()? + .ok_or(RepositoryErrorKind::NoPasswordGiven)?; + self.open_with_password(&password).await } - /// Warm up the given pack files and wait the configured waiting time. + /// Open the repository with a given password. + /// + /// This gets the decryption key and reads the config file /// /// # Arguments /// - /// * `packs` - The pack files to warm up + /// * `password` - The password to use /// /// # Errors /// - /// * [`RepositoryErrorKind::FromSplitError`] - If the command could not be parsed. - /// * [`RepositoryErrorKind::FromThreadPoolbilderError`] - If the thread pool could not be created. + /// * [`RepositoryErrorKind::NoRepositoryConfigFound`] - If no repository config file is found + /// * [`RepositoryErrorKind::KeysDontMatchForRepositories`] - If the keys of the hot and cold backend don't match + /// * [`RepositoryErrorKind::IncorrectPassword`] - If the password is incorrect + /// * [`KeyFileErrorKind::NoSuitableKeyFound`] - If no suitable key is found + /// * [`RepositoryErrorKind::ListingRepositoryConfigFileFailed`] - If listing the repository config file failed + /// * [`RepositoryErrorKind::MoreThanOneRepositoryConfig`] - If there is more than one repository config file /// - /// [`RepositoryErrorKind::FromSplitError`]: crate::error::RepositoryErrorKind::FromSplitError - /// [`RepositoryErrorKind::FromThreadPoolbilderError`]: crate::error::RepositoryErrorKind::FromThreadPoolbilderError - pub fn warm_up_wait(&self, packs: impl ExactSizeIterator) -> RusticResult<()> { - warm_up_wait(self, packs) - } -} - -/// A repository which is open, i.e. the password has been checked and the decryption key is available. -pub trait Open { - /// Get the cache - fn cache(&self) -> Option<&Cache>; - - /// Get the [`DecryptBackend`] - fn dbe(&self) -> &DecryptBackend; - - /// Get the [`ConfigFile`] - fn config(&self) -> &ConfigFile; -} - -impl Open for Repository { - /// Get the cache - fn cache(&self) -> Option<&Cache> { - self.status.cache() - } - - /// Get the [`DecryptBackend`] - fn dbe(&self) -> &DecryptBackend { - self.status.dbe() - } - - /// Get the [`ConfigFile`] - fn config(&self) -> &ConfigFile { - self.status.config() - } -} - -/// Open Status: This repository is open, i.e. the password has been checked and the decryption key is available. -#[derive(Debug)] -pub struct OpenStatus { - /// The cache - cache: Option, - /// The [`DecryptBackend`] - dbe: DecryptBackend, - /// The [`ConfigFile`] - config: ConfigFile, -} - -impl Open for OpenStatus { - /// Get the cache - fn cache(&self) -> Option<&Cache> { - self.cache.as_ref() - } + /// [`RepositoryErrorKind::NoRepositoryConfigFound`]: crate::error::RepositoryErrorKind::NoRepositoryConfigFound + /// [`RepositoryErrorKind::KeysDontMatchForRepositories`]: crate::error::RepositoryErrorKind::KeysDontMatchForRepositories + /// [`RepositoryErrorKind::IncorrectPassword`]: crate::error::RepositoryErrorKind::IncorrectPassword + /// [`KeyFileErrorKind::NoSuitableKeyFound`]: crate::error::KeyFileErrorKind::NoSuitableKeyFound + /// [`RepositoryErrorKind::ListingRepositoryConfigFileFailed`]: crate::error::RepositoryErrorKind::ListingRepositoryConfigFileFailed + /// [`RepositoryErrorKind::MoreThanOneRepositoryConfig`]: crate::error::RepositoryErrorKind::MoreThanOneRepositoryConfig + pub async fn open_with_password( + self, + password: &str, + ) -> RusticResult> { + let config_id = + self.config_id() + .await? + .ok_or(RepositoryErrorKind::NoRepositoryConfigFound( + self.name.clone(), + ))?; - /// Get the [`DecryptBackend`] - fn dbe(&self) -> &DecryptBackend { - &self.dbe - } + if let Some(be_hot) = &self.be_hot { + let mut keys = self + .be + .list_with_size(FileType::Key) + .await + .map_err(RusticErrorKind::Backend)?; + keys.sort_unstable_by_key(|key| key.0); + let mut hot_keys = be_hot + .list_with_size(FileType::Key) + .await + .map_err(RusticErrorKind::Backend)?; + hot_keys.sort_unstable_by_key(|key| key.0); + if keys != hot_keys { + return Err(RepositoryErrorKind::KeysDontMatchForRepositories(self.name).into()); + } + } - /// Get the [`ConfigFile`] - fn config(&self) -> &ConfigFile { - &self.config + let key = find_key_in_async_backend(&self.be, &password, None) + .await + .map_err(|err| match err.into_inner() { + RusticErrorKind::KeyFile(KeyFileErrorKind::NoSuitableKeyFound) => { + RepositoryErrorKind::IncorrectPassword.into() + } + err => err, + })?; + info!("repository {}: password is correct.", self.name); + let dbe = AsyncDecryptBackend::new(self.be.clone(), key); + let config: ConfigFile = dbe.get_file(&config_id).await?; + self.open_raw(key, config) } -} -impl Repository { - /// Get the content of the decrypted repository file given by id and [`FileType`] - /// - /// # Arguments - /// - /// * `tpe` - The type of the file to get - /// * `id` - The id of the file to get + /// Initialize a new repository with given options using the password defined in `RepositoryOptions` /// - /// # Errors + /// This returns an open repository which can be directly used. /// - /// * [`IdErrorKind::HexError`] - If the string is not a valid hexadecimal string - /// * [`BackendAccessErrorKind::NoSuitableIdFound`] - If no id could be found. - /// * [`BackendAccessErrorKind::IdNotUnique`] - If the id is not unique. + /// # Type Parameters /// - /// [`IdErrorKind::HexError`]: crate::error::IdErrorKind::HexError - /// [`BackendAccessErrorKind::NoSuitableIdFound`]: crate::error::BackendAccessErrorKind::NoSuitableIdFound - /// [`BackendAccessErrorKind::IdNotUnique`]: crate::error::BackendAccessErrorKind::IdNotUnique - pub fn cat_file(&self, tpe: FileType, id: &str) -> RusticResult { - commands::cat::cat_file(self, tpe, id) - } - - /// Add a new key to the repository + /// * `P` - The type of the progress bar /// /// # Arguments /// - /// * `pass` - The password to use for the new key - /// * `opts` - The options to use for the new key + /// * `key_opts` - The options to use for the key + /// * `config_opts` - The options to use for the config /// /// # Errors /// - /// * [`CommandErrorKind::FromJsonError`] - If the key could not be serialized. + /// * [`RepositoryErrorKind::NoPasswordGiven`] - If no password is given + /// * [`RepositoryErrorKind::ReadingPasswordFromReaderFailed`] - If reading the password failed + /// * [`RepositoryErrorKind::OpeningPasswordFileFailed`] - If opening the password file failed + /// * [`RepositoryErrorKind::PasswordCommandParsingFailed`] - If parsing the password command failed + /// * [`RepositoryErrorKind::ReadingPasswordFromCommandFailed`] - If reading the password from the command failed + /// * [`RepositoryErrorKind::FromSplitError`] - If splitting the password command failed + /// + /// [`RepositoryErrorKind::NoPasswordGiven`]: crate::error::RepositoryErrorKind::NoPasswordGiven + /// [`RepositoryErrorKind::ReadingPasswordFromReaderFailed`]: crate::error::RepositoryErrorKind::ReadingPasswordFromReaderFailed + /// [`RepositoryErrorKind::OpeningPasswordFileFailed`]: crate::error::RepositoryErrorKind::OpeningPasswordFileFailed + /// [`RepositoryErrorKind::PasswordCommandParsingFailed`]: crate::error::RepositoryErrorKind::PasswordCommandParsingFailed + /// [`RepositoryErrorKind::ReadingPasswordFromCommandFailed`]: crate::error::RepositoryErrorKind::ReadingPasswordFromCommandFailed + /// [`RepositoryErrorKind::FromSplitError`]: crate::error::RepositoryErrorKind::FromSplitError + pub async fn init( + self, + key_opts: &KeyOptions, + config_opts: &ConfigOptions, + ) -> RusticResult> { + let password = self + .password()? + .ok_or(RepositoryErrorKind::NoPasswordGiven)?; + self.init_with_password(&password, key_opts, config_opts) + .await + } + + /// Initialize a new repository with given password and options. + /// + /// This returns an open repository which can be directly used. + /// + /// # Type Parameters + /// + /// * `P` - The type of the progress bar + /// + /// # Arguments + /// + /// * `pass` - The password to use + /// * `key_opts` - The options to use for the key + /// * `config_opts` - The options to use for the config + /// + /// # Errors + /// + /// * [`RepositoryErrorKind::ConfigFileExists`] - If a config file already exists + /// * [`RepositoryErrorKind::ListingRepositoryConfigFileFailed`] - If listing the repository config file failed + /// * [`RepositoryErrorKind::MoreThanOneRepositoryConfig`] - If there is more than one repository config file + /// + /// [`RepositoryErrorKind::ConfigFileExists`]: crate::error::RepositoryErrorKind::ConfigFileExists + /// [`RepositoryErrorKind::ListingRepositoryConfigFileFailed`]: crate::error::RepositoryErrorKind::ListingRepositoryConfigFileFailed + /// [`RepositoryErrorKind::MoreThanOneRepositoryConfig`]: crate::error::RepositoryErrorKind::MoreThanOneRepositoryConfig + pub async fn init_with_password( + self, + pass: &str, + key_opts: &KeyOptions, + config_opts: &ConfigOptions, + ) -> RusticResult> { + if self.config_id().await?.is_some() { + return Err(RepositoryErrorKind::ConfigFileExists.into()); + } + let (key, config) = commands::init::init_async(&self, pass, key_opts, config_opts).await?; + self.open_raw(key, config) + } + + /// Initialize a new repository with given password and a ready [`ConfigFile`]. + /// + /// This returns an open repository which can be directly used. + /// + /// # Type Parameters + /// + /// * `P` - The type of the progress bar + /// + /// # Arguments + /// + /// * `password` - The password to use + /// * `key_opts` - The options to use for the key + /// * `config` - The config file to use + /// + /// # Errors + /// + // TODO: Document errors + pub async fn init_with_config( + self, + password: &str, + key_opts: &KeyOptions, + config: ConfigFile, + ) -> RusticResult> { + let key = + commands::init::init_with_config_async(&self, password, key_opts, &config).await?; + info!("repository {} successfully created.", config.id); + self.open_raw(key, config) + } + + /// Open the repository with given [`Key`] and [`ConfigFile`]. + /// + /// # Type Parameters + /// + /// * `P` - The type of the progress bar + /// + /// # Arguments + /// + /// * `key` - The key to use + /// * `config` - The config file to use + /// + /// # Errors + /// + /// * [`RepositoryErrorKind::HotRepositoryFlagMissing`] - If the config file has `is_hot` set to `true` but the repository is not hot + /// * [`RepositoryErrorKind::IsNotHotRepository`] - If the config file has `is_hot` set to `false` but the repository is hot + /// + /// [`RepositoryErrorKind::HotRepositoryFlagMissing`]: crate::error::RepositoryErrorKind::HotRepositoryFlagMissing + /// [`RepositoryErrorKind::IsNotHotRepository`]: crate::error::RepositoryErrorKind::IsNotHotRepository + fn open_raw( + mut self, + key: Key, + config: ConfigFile, + ) -> RusticResult> { + match (config.is_hot == Some(true), self.be_hot.is_some()) { + (true, false) => return Err(RepositoryErrorKind::HotRepositoryFlagMissing.into()), + (false, true) => return Err(RepositoryErrorKind::IsNotHotRepository.into()), + _ => {} + } + + let cache = (!self.opts.no_cache) + .then(|| Cache::new(config.id, self.opts.cache_dir.clone()).ok()) + .flatten(); + + if let Some(cache) = &cache { + self.be = AsyncCachedBackend::new_cache(self.be.clone(), cache.clone()); + info!("using cache at {}", cache.location()); + } else { + info!("using no cache"); + } + + let mut dbe = AsyncDecryptBackend::new(self.be.clone(), key); + dbe.set_zstd(config.zstd()?); + dbe.set_extra_verify(config.extra_verify()); + + let open = AsyncOpenStatus { cache, dbe, config }; + + Ok(AsyncRepository { + name: self.name, + be: self.be, + be_hot: self.be_hot, + opts: self.opts, + pb: self.pb, + status: open, + }) + } + + /// List all file [`Id`]s of the given [`FileType`] which are present in the repository + /// + /// # Arguments + /// + /// * `tpe` - The type of the files to list + /// + /// # Errors + /// + // TODO: Document errors + pub async fn list(&self, tpe: FileType) -> RusticResult> { + Ok(self + .be + .list(tpe) + .await + .map_err(RusticErrorKind::Backend)? + .into_iter()) + } +} + +impl Repository { + /// Collect information about repository files + /// + /// # Errors + /// + /// If files could not be listed. + pub fn infos_files(&self) -> RusticResult { + commands::repoinfo::collect_file_infos(self) + } + + /// Warm up the given pack files without waiting. + /// + /// # Arguments + /// + /// * `packs` - The pack files to warm up + /// + /// # Errors + /// + /// * [`RepositoryErrorKind::FromSplitError`] - If the command could not be parsed. + /// * [`RepositoryErrorKind::FromThreadPoolbilderError`] - If the thread pool could not be created. + /// + /// # Returns + /// + /// The result of the warm up + pub fn warm_up(&self, packs: impl ExactSizeIterator) -> RusticResult<()> { + warm_up(self, packs) + } + + /// Warm up the given pack files and wait the configured waiting time. + /// + /// # Arguments + /// + /// * `packs` - The pack files to warm up + /// + /// # Errors + /// + /// * [`RepositoryErrorKind::FromSplitError`] - If the command could not be parsed. + /// * [`RepositoryErrorKind::FromThreadPoolbilderError`] - If the thread pool could not be created. + /// + /// [`RepositoryErrorKind::FromSplitError`]: crate::error::RepositoryErrorKind::FromSplitError + /// [`RepositoryErrorKind::FromThreadPoolbilderError`]: crate::error::RepositoryErrorKind::FromThreadPoolbilderError + pub fn warm_up_wait(&self, packs: impl ExactSizeIterator) -> RusticResult<()> { + warm_up_wait(self, packs) + } +} + +/// A repository which is open, i.e. the password has been checked and the decryption key is available. +pub trait Open { + /// Get the cache + fn cache(&self) -> Option<&Cache>; + + /// Get the [`DecryptBackend`] + fn dbe(&self) -> &DecryptBackend; + + /// Get the [`ConfigFile`] + fn config(&self) -> &ConfigFile; +} + +impl Open for Repository { + /// Get the cache + fn cache(&self) -> Option<&Cache> { + self.status.cache() + } + + /// Get the [`DecryptBackend`] + fn dbe(&self) -> &DecryptBackend { + self.status.dbe() + } + + /// Get the [`ConfigFile`] + fn config(&self) -> &ConfigFile { + self.status.config() + } +} + +pub trait AsyncOpen { + /// Get the cache + fn cache(&self) -> Option<&Cache>; + + /// Get the [`DecryptBackend`] + fn dbe(&self) -> &AsyncDecryptBackend; + + /// Get the [`ConfigFile`] + fn config(&self) -> &ConfigFile; +} + +impl AsyncOpen for AsyncRepository { + fn cache(&self) -> Option<&Cache> { + self.status.cache() + } + + fn dbe(&self) -> &AsyncDecryptBackend { + self.status.dbe() + } + + fn config(&self) -> &ConfigFile { + self.status.config() + } +} + +/// Open Status: This repository is open, i.e. the password has been checked and the decryption key is available. +#[derive(Debug)] +pub struct OpenStatus { + /// The cache + cache: Option, + /// The [`DecryptBackend`] + dbe: DecryptBackend, + /// The [`ConfigFile`] + config: ConfigFile, +} + +impl Open for OpenStatus { + /// Get the cache + fn cache(&self) -> Option<&Cache> { + self.cache.as_ref() + } + + /// Get the [`DecryptBackend`] + fn dbe(&self) -> &DecryptBackend { + &self.dbe + } + + /// Get the [`ConfigFile`] + fn config(&self) -> &ConfigFile { + &self.config + } +} + +#[derive(Debug)] +pub struct AsyncOpenStatus { + /// The cache + cache: Option, + /// The [`DecryptBackend`] + dbe: AsyncDecryptBackend, + /// The [`ConfigFile`] + config: ConfigFile, +} + +impl AsyncOpen for AsyncOpenStatus { + fn cache(&self) -> Option<&Cache> { + self.cache.as_ref() + } + + fn dbe(&self) -> &AsyncDecryptBackend { + &self.dbe + } + + fn config(&self) -> &ConfigFile { + &self.config + } +} + +impl Repository { + /// Get the content of the decrypted repository file given by id and [`FileType`] + /// + /// # Arguments + /// + /// * `tpe` - The type of the file to get + /// * `id` - The id of the file to get + /// + /// # Errors + /// + /// * [`IdErrorKind::HexError`] - If the string is not a valid hexadecimal string + /// * [`BackendAccessErrorKind::NoSuitableIdFound`] - If no id could be found. + /// * [`BackendAccessErrorKind::IdNotUnique`] - If the id is not unique. + /// + /// [`IdErrorKind::HexError`]: crate::error::IdErrorKind::HexError + /// [`BackendAccessErrorKind::NoSuitableIdFound`]: crate::error::BackendAccessErrorKind::NoSuitableIdFound + /// [`BackendAccessErrorKind::IdNotUnique`]: crate::error::BackendAccessErrorKind::IdNotUnique + pub fn cat_file(&self, tpe: FileType, id: &str) -> RusticResult { + commands::cat::cat_file(self, tpe, id) + } + + /// Add a new key to the repository + /// + /// # Arguments + /// + /// * `pass` - The password to use for the new key + /// * `opts` - The options to use for the new key + /// + /// # Errors + /// + /// * [`CommandErrorKind::FromJsonError`] - If the key could not be serialized. + /// + /// [`CommandErrorKind::FromJsonError`]: crate::error::CommandErrorKind::FromJsonError + pub fn add_key(&self, pass: &str, opts: &KeyOptions) -> RusticResult { + opts.add_key(self, pass) + } + + /// Update the repository config by applying the given [`ConfigOptions`] + /// + /// # Arguments + /// + /// * `opts` - The options to apply + /// + /// # Errors + /// + /// * [`CommandErrorKind::VersionNotSupported`] - If the version is not supported + /// * [`CommandErrorKind::CannotDowngrade`] - If the version is lower than the current version + /// * [`CommandErrorKind::NoCompressionV1Repo`] - If compression is set for a v1 repo + /// * [`CommandErrorKind::CompressionLevelNotSupported`] - If the compression level is not supported + /// * [`CommandErrorKind::SizeTooLarge`] - If the size is too large + /// * [`CommandErrorKind::MinPackSizeTolerateWrong`] - If the min packsize tolerance percent is wrong + /// * [`CommandErrorKind::MaxPackSizeTolerateWrong`] - If the max packsize tolerance percent is wrong + /// * [`CryptBackendErrorKind::SerializingToJsonByteVectorFailed`] - If the file could not be serialized to json. + /// + /// [`CommandErrorKind::VersionNotSupported`]: crate::error::CommandErrorKind::VersionNotSupported + /// [`CommandErrorKind::CannotDowngrade`]: crate::error::CommandErrorKind::CannotDowngrade + /// [`CommandErrorKind::NoCompressionV1Repo`]: crate::error::CommandErrorKind::NoCompressionV1Repo + /// [`CommandErrorKind::CompressionLevelNotSupported`]: crate::error::CommandErrorKind::CompressionLevelNotSupported + /// [`CommandErrorKind::SizeTooLarge`]: crate::error::CommandErrorKind::SizeTooLarge + /// [`CommandErrorKind::MinPackSizeTolerateWrong`]: crate::error::CommandErrorKind::MinPackSizeTolerateWrong + /// [`CommandErrorKind::MaxPackSizeTolerateWrong`]: crate::error::CommandErrorKind::MaxPackSizeTolerateWrong + /// [`CryptBackendErrorKind::SerializingToJsonByteVectorFailed`]: crate::error::CryptBackendErrorKind::SerializingToJsonByteVectorFailed + pub fn apply_config(&self, opts: &ConfigOptions) -> RusticResult { + commands::config::apply_config(self, opts) + } + + /// Get the repository configuration + pub fn config(&self) -> &ConfigFile { + self.status.config() + } + + // TODO: add documentation! + pub(crate) fn dbe(&self) -> &DecryptBackend { + self.status.dbe() + } +} + +impl Repository { + /// Get grouped snapshots. + /// + /// # Arguments + /// + /// * `ids` - The ids of the snapshots to group. If empty, all snapshots are grouped. + /// * `group_by` - The criterion to group by + /// * `filter` - The filter to use + /// + /// # Errors + /// + // TODO: Document errors + /// + /// # Returns + /// + /// If `ids` are given, this will try to resolve the ids (or `latest` with respect to the given filter) and return a single group + /// If `ids` is empty, return and group all snapshots respecting the filter. + pub fn get_snapshot_group( + &self, + ids: &[String], + group_by: SnapshotGroupCriterion, + filter: impl FnMut(&SnapshotFile) -> bool, + ) -> RusticResult)>> { + commands::snapshots::get_snapshot_group(self, ids, group_by, filter) + } + + /// Get a single snapshot + /// + /// # Arguments + /// + /// * `id` - The id of the snapshot to get + /// * `filter` - The filter to use + /// + /// # Errors + /// + /// * [`IdErrorKind::HexError`] - If the string is not a valid hexadecimal string + /// * [`BackendAccessErrorKind::NoSuitableIdFound`] - If no id could be found. + /// * [`BackendAccessErrorKind::IdNotUnique`] - If the id is not unique. + /// + /// # Returns + /// + /// If `id` is (part of) an `Id`, return this snapshot. + /// If `id` is "latest", return the latest snapshot respecting the giving filter. + /// + /// [`IdErrorKind::HexError`]: crate::error::IdErrorKind::HexError + /// [`BackendAccessErrorKind::NoSuitableIdFound`]: crate::error::BackendAccessErrorKind::NoSuitableIdFound + /// [`BackendAccessErrorKind::IdNotUnique`]: crate::error::BackendAccessErrorKind::IdNotUnique + pub fn get_snapshot_from_str( + &self, + id: &str, + filter: impl FnMut(&SnapshotFile) -> bool + Send + Sync, + ) -> RusticResult { + let p = self.pb.progress_counter("getting snapshot..."); + let snap = SnapshotFile::from_str(self.dbe(), id, filter, &p)?; + p.finish(); + Ok(snap) + } + + /// Get the given snapshots. + /// + /// # Arguments + /// + /// * `ids` - The ids of the snapshots to get + /// + /// # Notes + /// + /// `ids` may contain part of snapshots id which will be resolved. + /// However, "latest" is not supported in this function. + /// + /// # Errors + /// + // TODO: Document errors + pub fn get_snapshots>(&self, ids: &[T]) -> RusticResult> { + let p = self.pb.progress_counter("getting snapshots..."); + let result = SnapshotFile::from_ids(self.dbe(), ids, &p); + p.finish(); + result + } + + /// Get all snapshots from the repository + /// + /// # Errors + /// + // TODO: Document errors + pub fn get_all_snapshots(&self) -> RusticResult> { + self.get_matching_snapshots(|_| true) + } + + /// Get all snapshots from the repository respecting the given `filter` + /// + /// # Arguments + /// + /// * `filter` - The filter to use + /// + /// # Errors + /// + /// # Note + /// The result is not sorted and may come in random order! + /// + // TODO: Document errors + pub fn get_matching_snapshots( + &self, + filter: impl FnMut(&SnapshotFile) -> bool, + ) -> RusticResult> { + let p = self.pb.progress_counter("getting snapshots..."); + let result = SnapshotFile::all_from_backend(self.dbe(), filter, &p); + p.finish(); + result + } + + /// Get snapshots to forget depending on the given [`KeepOptions`] + /// + /// # Arguments + /// + /// * `keep` - The keep options to use + /// * `group_by` - The criterion to group by + /// * `filter` - The filter to use + /// + /// # Errors + /// + /// If keep options are not valid + /// + /// # Returns + /// + /// The groups of snapshots to forget + pub fn get_forget_snapshots( + &self, + keep: &KeepOptions, + group_by: SnapshotGroupCriterion, + filter: impl FnMut(&SnapshotFile) -> bool, + ) -> RusticResult { + commands::forget::get_forget_snapshots(self, keep, group_by, filter) + } + + /// Get snapshots which are not already present and should be present. + /// + /// # Arguments + /// + /// * `filter` - The filter to use + /// * `snaps` - The snapshots to check + /// + /// # Errors + /// + // TODO: Document errors + /// + /// # Note + /// + /// This method should be called on the *destination repository* + pub fn relevant_copy_snapshots( + &self, + filter: impl FnMut(&SnapshotFile) -> bool, + snaps: &[SnapshotFile], + ) -> RusticResult> { + commands::copy::relevant_snapshots(snaps, self, filter) + } + + // TODO: Maybe only offer a method to remove &[Snapshotfile] and check if they must be kept. + // See e.g. the merge command of the CLI + /// Remove the given snapshots from the repository + /// + /// # Arguments + /// + /// * `ids` - The ids of the snapshots to remove + /// + /// # Errors + /// + // TODO: Document errors + /// + /// # Panics + /// + /// If the files could not be deleted. + pub fn delete_snapshots(&self, ids: &[Id]) -> RusticResult<()> { + if self.config().append_only == Some(true) { + return Err(CommandErrorKind::NotAllowedWithAppendOnly( + "snapshots removal".to_string(), + ) + .into()); + } + let p = self.pb.progress_counter("removing snapshots..."); + self.dbe() + .delete_list(FileType::Snapshot, true, ids.iter(), p)?; + Ok(()) + } + + /// Save the given snapshots to the repository. + /// + /// # Arguments + /// + /// * `snaps` - The snapshots to save + /// + /// # Errors + /// + /// * [`CryptBackendErrorKind::SerializingToJsonByteVectorFailed`] - If the file could not be serialized to json. + /// + /// [`CryptBackendErrorKind::SerializingToJsonByteVectorFailed`]: crate::error::CryptBackendErrorKind::SerializingToJsonByteVectorFailed + pub fn save_snapshots(&self, mut snaps: Vec) -> RusticResult<()> { + for snap in &mut snaps { + snap.id = Id::default(); + } + let p = self.pb.progress_counter("saving snapshots..."); + self.dbe().save_list(snaps.iter(), p)?; + Ok(()) + } + + /// Check the repository for errors or inconsistencies /// - /// [`CommandErrorKind::FromJsonError`]: crate::error::CommandErrorKind::FromJsonError - pub fn add_key(&self, pass: &str, opts: &KeyOptions) -> RusticResult { - opts.add_key(self, pass) + /// # Arguments + /// + /// * `opts` - The options to use + /// + /// # Errors + /// + // TODO: Document errors + pub fn check(&self, opts: CheckOptions) -> RusticResult<()> { + opts.run(self) } - /// Update the repository config by applying the given [`ConfigOptions`] + /// Get the plan about what should be pruned and/or repacked. /// /// # Arguments /// - /// * `opts` - The options to apply + /// * `opts` - The options to use /// /// # Errors /// - /// * [`CommandErrorKind::VersionNotSupported`] - If the version is not supported - /// * [`CommandErrorKind::CannotDowngrade`] - If the version is lower than the current version - /// * [`CommandErrorKind::NoCompressionV1Repo`] - If compression is set for a v1 repo - /// * [`CommandErrorKind::CompressionLevelNotSupported`] - If the compression level is not supported - /// * [`CommandErrorKind::SizeTooLarge`] - If the size is too large - /// * [`CommandErrorKind::MinPackSizeTolerateWrong`] - If the min packsize tolerance percent is wrong - /// * [`CommandErrorKind::MaxPackSizeTolerateWrong`] - If the max packsize tolerance percent is wrong - /// * [`CryptBackendErrorKind::SerializingToJsonByteVectorFailed`] - If the file could not be serialized to json. + // TODO: Document errors /// - /// [`CommandErrorKind::VersionNotSupported`]: crate::error::CommandErrorKind::VersionNotSupported - /// [`CommandErrorKind::CannotDowngrade`]: crate::error::CommandErrorKind::CannotDowngrade - /// [`CommandErrorKind::NoCompressionV1Repo`]: crate::error::CommandErrorKind::NoCompressionV1Repo - /// [`CommandErrorKind::CompressionLevelNotSupported`]: crate::error::CommandErrorKind::CompressionLevelNotSupported - /// [`CommandErrorKind::SizeTooLarge`]: crate::error::CommandErrorKind::SizeTooLarge - /// [`CommandErrorKind::MinPackSizeTolerateWrong`]: crate::error::CommandErrorKind::MinPackSizeTolerateWrong - /// [`CommandErrorKind::MaxPackSizeTolerateWrong`]: crate::error::CommandErrorKind::MaxPackSizeTolerateWrong - /// [`CryptBackendErrorKind::SerializingToJsonByteVectorFailed`]: crate::error::CryptBackendErrorKind::SerializingToJsonByteVectorFailed - pub fn apply_config(&self, opts: &ConfigOptions) -> RusticResult { - commands::config::apply_config(self, opts) + /// # Returns + /// + /// The plan about what should be pruned and/or repacked. + pub fn prune_plan(&self, opts: &PruneOptions) -> RusticResult { + opts.get_plan(self) } - /// Get the repository configuration - pub fn config(&self) -> &ConfigFile { - self.status.config() + /// Turn the repository into the `IndexedFull` state by reading and storing the index + /// + /// # Errors + /// + // TODO: Document errors + /// + /// # Note + /// + /// This saves the full index in memory which can be quite memory-consuming! + pub fn to_indexed(self) -> RusticResult>> { + let index = GlobalIndex::new(self.dbe(), &self.pb.progress_counter(""))?; + Ok(self.into_indexed_with_index(index)) } - // TODO: add documentation! - pub(crate) fn dbe(&self) -> &DecryptBackend { - self.status.dbe() + /// Turn the repository into the `IndexedFull` state by reading and storing the index + /// + /// This is similar to `to_indexed()`, but also lists the pack files and reads pack headers + /// for packs is missing in the index. + /// + /// # Errors + /// + // TODO: Document errors + /// + /// # Note + /// + /// This saves the full index in memory which can be quite memory-consuming! + pub fn to_indexed_checked(self) -> RusticResult>> { + let collector = IndexCollector::new(IndexType::Full); + let index = index_checked_from_collector(&self, collector)?; + Ok(self.into_indexed_with_index(index)) } -} -impl Repository { - /// Get grouped snapshots. + // helper function to deduplicate code + fn into_indexed_with_index( + self, + index: GlobalIndex, + ) -> Repository> { + let status = IndexedStatus { + open: self.status, + index, + index_data: FullIndex { + // TODO: Make cache size (32MB currently) customizable! + cache: quick_cache::sync::Cache::with_weighter( + constants::ESTIMATED_ITEM_CAPACITY, + constants::WEIGHT_CAPACITY, + BytesWeighter {}, + ), + }, + }; + Repository { + name: self.name, + be: self.be, + be_hot: self.be_hot, + opts: self.opts, + pb: self.pb, + status, + } + } + + /// Turn the repository into the `IndexedIds` state by reading and storing a size-optimized index /// - /// # Arguments + /// # Errors /// - /// * `ids` - The ids of the snapshots to group. If empty, all snapshots are grouped. - /// * `group_by` - The criterion to group by - /// * `filter` - The filter to use + // TODO: Document errors + /// + /// # Returns + /// + /// The repository in the `IndexedIds` state + /// + /// # Note + /// + /// This saves only the `Id`s for data blobs. Therefore, not all operations are possible on the repository. + /// However, operations which add data are fully functional. + pub fn to_indexed_ids(self) -> RusticResult>> { + let index = GlobalIndex::only_full_trees(self.dbe(), &self.pb.progress_counter(""))?; + Ok(self.into_indexed_ids_with_index(index)) + } + + /// Turn the repository into the `IndexedIds` state by reading and storing a size-optimized index + /// + /// This is similar to `to_indexed_ids()`, but also lists the pack files and reads pack headers + /// for packs is missing in the index. /// /// # Errors /// @@ -882,15 +1626,99 @@ impl Repository { /// /// # Returns /// - /// If `ids` are given, this will try to resolve the ids (or `latest` with respect to the given filter) and return a single group - /// If `ids` is empty, return and group all snapshots respecting the filter. - pub fn get_snapshot_group( + /// The repository in the `IndexedIds` state + /// + /// # Note + /// + /// This saves only the `Id`s for data blobs. Therefore, not all operations are possible on the repository. + /// However, operations which add data are fully functional. + pub fn to_indexed_ids_checked(self) -> RusticResult>> { + let collector = IndexCollector::new(IndexType::DataIds); + let index = index_checked_from_collector(&self, collector)?; + Ok(self.into_indexed_ids_with_index(index)) + } + + // helper function to deduplicate code + fn into_indexed_ids_with_index( + self, + index: GlobalIndex, + ) -> Repository> { + let status = IndexedStatus { + open: self.status, + index, + index_data: IdIndex {}, + }; + Repository { + name: self.name, + be: self.be, + be_hot: self.be_hot, + opts: self.opts, + pb: self.pb, + status, + } + } + + /// Get statistical information from the index. This method reads all index files, + /// even if an index is already available in memory. + /// + /// # Errors + /// + /// If the index could not be read. + /// + /// # Returns + /// + /// The statistical information from the index. + pub fn infos_index(&self) -> RusticResult { + commands::repoinfo::collect_index_infos(self) + } + + /// Read all files of a given [`RepoFile`] + /// + /// # Errors + /// + // TODO: Document errors + /// + /// # Returns + /// + /// # Note + /// The result is not sorted and may come in random order! + /// + /// An iterator over all files of the given type + pub fn stream_files( + &self, + ) -> RusticResult>> { + Ok(self + .dbe() + .stream_all::(&self.pb.progress_hidden())? + .into_iter()) + } + + /// Repair the index + /// + /// This compares the index with existing pack files and reads packfile headers to ensure the index + /// correctly represents the pack files. + /// + /// # Arguments + /// + /// * `opts` - The options to use + /// * `dry_run` - If true, only print what would be done + /// + /// # Errors + /// + // TODO: Document errors + pub fn repair_index(&self, opts: &RepairIndexOptions, dry_run: bool) -> RusticResult<()> { + opts.repair(self, dry_run) + } +} + +impl AsyncRepository { + pub async fn get_snapshot_group( &self, ids: &[String], group_by: SnapshotGroupCriterion, filter: impl FnMut(&SnapshotFile) -> bool, ) -> RusticResult)>> { - commands::snapshots::get_snapshot_group(self, ids, group_by, filter) + commands::snapshots::get_snapshot_group_async(self, ids, group_by, filter).await } /// Get a single snapshot @@ -914,13 +1742,13 @@ impl Repository { /// [`IdErrorKind::HexError`]: crate::error::IdErrorKind::HexError /// [`BackendAccessErrorKind::NoSuitableIdFound`]: crate::error::BackendAccessErrorKind::NoSuitableIdFound /// [`BackendAccessErrorKind::IdNotUnique`]: crate::error::BackendAccessErrorKind::IdNotUnique - pub fn get_snapshot_from_str( + pub async fn get_snapshot_from_str( &self, id: &str, filter: impl FnMut(&SnapshotFile) -> bool + Send + Sync, ) -> RusticResult { let p = self.pb.progress_counter("getting snapshot..."); - let snap = SnapshotFile::from_str(self.dbe(), id, filter, &p)?; + let snap = SnapshotFile::from_str_async(self.dbe(), id, filter, &p).await?; p.finish(); Ok(snap) } @@ -939,9 +1767,12 @@ impl Repository { /// # Errors /// // TODO: Document errors - pub fn get_snapshots>(&self, ids: &[T]) -> RusticResult> { + pub async fn get_snapshots + Send + Sync>( + &self, + ids: &[T], + ) -> RusticResult> { let p = self.pb.progress_counter("getting snapshots..."); - let result = SnapshotFile::from_ids(self.dbe(), ids, &p); + let result = SnapshotFile::from_ids_async(self.dbe(), ids, &p).await; p.finish(); result } @@ -951,8 +1782,8 @@ impl Repository { /// # Errors /// // TODO: Document errors - pub fn get_all_snapshots(&self) -> RusticResult> { - self.get_matching_snapshots(|_| true) + pub async fn get_all_snapshots(&self) -> RusticResult> { + self.get_matching_snapshots(|_| true).await } /// Get all snapshots from the repository respecting the given `filter` @@ -967,12 +1798,12 @@ impl Repository { /// The result is not sorted and may come in random order! /// // TODO: Document errors - pub fn get_matching_snapshots( + pub async fn get_matching_snapshots( &self, filter: impl FnMut(&SnapshotFile) -> bool, ) -> RusticResult> { let p = self.pb.progress_counter("getting snapshots..."); - let result = SnapshotFile::all_from_backend(self.dbe(), filter, &p); + let result = SnapshotFile::all_from_backend_async(self.dbe(), filter, &p).await; p.finish(); result } @@ -992,13 +1823,13 @@ impl Repository { /// # Returns /// /// The groups of snapshots to forget - pub fn get_forget_snapshots( + pub async fn get_forget_snapshots( &self, keep: &KeepOptions, group_by: SnapshotGroupCriterion, filter: impl FnMut(&SnapshotFile) -> bool, ) -> RusticResult { - commands::forget::get_forget_snapshots(self, keep, group_by, filter) + commands::forget::get_forget_snapshots_async(self, keep, group_by, filter).await } /// Get snapshots which are not already present and should be present. @@ -1015,30 +1846,15 @@ impl Repository { /// # Note /// /// This method should be called on the *destination repository* - pub fn relevant_copy_snapshots( + pub async fn relevant_copy_snapshots( &self, filter: impl FnMut(&SnapshotFile) -> bool, snaps: &[SnapshotFile], ) -> RusticResult> { - commands::copy::relevant_snapshots(snaps, self, filter) + commands::copy::relevant_snapshots_async(snaps, self, filter).await } - // TODO: Maybe only offer a method to remove &[Snapshotfile] and check if they must be kept. - // See e.g. the merge command of the CLI - /// Remove the given snapshots from the repository - /// - /// # Arguments - /// - /// * `ids` - The ids of the snapshots to remove - /// - /// # Errors - /// - // TODO: Document errors - /// - /// # Panics - /// - /// If the files could not be deleted. - pub fn delete_snapshots(&self, ids: &[Id]) -> RusticResult<()> { + pub async fn delete_snapshots(&self, ids: &[Id]) -> RusticResult<()> { if self.config().append_only == Some(true) { return Err(CommandErrorKind::NotAllowedWithAppendOnly( "snapshots removal".to_string(), @@ -1047,7 +1863,8 @@ impl Repository { } let p = self.pb.progress_counter("removing snapshots..."); self.dbe() - .delete_list(FileType::Snapshot, true, ids.iter(), p)?; + .delete_list(FileType::Snapshot, true, ids.iter(), p) + .await?; Ok(()) } @@ -1062,12 +1879,12 @@ impl Repository { /// * [`CryptBackendErrorKind::SerializingToJsonByteVectorFailed`] - If the file could not be serialized to json. /// /// [`CryptBackendErrorKind::SerializingToJsonByteVectorFailed`]: crate::error::CryptBackendErrorKind::SerializingToJsonByteVectorFailed - pub fn save_snapshots(&self, mut snaps: Vec) -> RusticResult<()> { + pub async fn save_snapshots(&self, mut snaps: Vec) -> RusticResult<()> { for snap in &mut snaps { snap.id = Id::default(); } let p = self.pb.progress_counter("saving snapshots..."); - self.dbe().save_list(snaps.iter(), p)?; + self.dbe().save_list(snaps.iter(), p).await?; Ok(()) } @@ -1081,7 +1898,9 @@ impl Repository { /// // TODO: Document errors pub fn check(&self, opts: CheckOptions) -> RusticResult<()> { - opts.run(self) + // TODO + todo!("impl check for AsyncRepository"); + // opts.run(self) } /// Get the plan about what should be pruned and/or repacked. @@ -1098,7 +1917,9 @@ impl Repository { /// /// The plan about what should be pruned and/or repacked. pub fn prune_plan(&self, opts: &PruneOptions) -> RusticResult { - opts.get_plan(self) + // TODO + todo!("impl prune for AsyncRepository"); + // opts.get_plan(self) } /// Turn the repository into the `IndexedFull` state by reading and storing the index @@ -1110,7 +1931,7 @@ impl Repository { /// # Note /// /// This saves the full index in memory which can be quite memory-consuming! - pub fn to_indexed(self) -> RusticResult>> { + pub fn to_indexed(self) -> RusticResult>> { let index = GlobalIndex::new(self.dbe(), &self.pb.progress_counter(""))?; Ok(self.into_indexed_with_index(index)) }