diff --git a/.cargo/mutants.toml b/.cargo/mutants.toml index 29f3c4f0..b3af66c2 100644 --- a/.cargo/mutants.toml +++ b/.cargo/mutants.toml @@ -39,10 +39,12 @@ exclude_globs = [ "backup.rs", # almost well tested but some gaps "metric_recorder.rs", "progress.rs", + "src/hunk_index.rs", # not well tested yet? + "src/mount/projfs.rs", # mostly for Windows + "src/owner/windows.rs", "src/progress/term.rs", "src/transport.rs", # almost well tested but some gaps "src/transport/s3.rs", "src/ui/termui.rs", - "src/owner/windows.rs", "stats.rs", ] diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 92dd5958..1e84467d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -66,11 +66,19 @@ jobs: cargo --version rustc --version - name: Build - run: > - cargo build --all-targets --features fail/failpoints - - name: Test - run: > - cargo test --features fail/failpoints -- --include-ignored + run: cargo build --all-targets --features fail/failpoints + - name: Test (without mount) + run: + cargo test --features fail/failpoints -- --skip mount + --include-ignored + - name: Test (mount) + run: + cargo test --features fail/failpoints --test mount -- + --include-ignored + env: + # Running multiple instances in parallel might cause a crash on low-end environments + # when executing the mounting tests on Windows due to projfs. + RUST_TEST_THREADS: 1 # Run rustfmt separately so that it does not block test results rustfmt: diff --git a/Cargo.lock b/Cargo.lock index 13e0e17a..10ca2746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -749,7 +749,7 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "conserve" -version = "24.8.0" +version = "23.11.0" dependencies = [ "assert_cmd", "assert_fs", @@ -809,6 +809,7 @@ dependencies = [ "url", "uzers", "whoami", + "windows-projfs", ] [[package]] @@ -1706,6 +1707,16 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libloading" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +dependencies = [ + "cfg-if", + "windows-targets 0.52.5", +] + [[package]] name = "libm" version = "0.2.8" @@ -3315,6 +3326,38 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-projfs" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2bba2a89b25248a2551bbbd994fb2fcefd2f61956115ab993950dbcd1e80492" +dependencies = [ + "libloading", + "log", + "parking_lot 0.12.1", + "thiserror", + "windows", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 00867057..7dfbd755 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "GPL-2.0" name = "conserve" readme = "README.md" repository = "https://github.com/sourcefrog/conserve/" -version = "24.8.0" +version = "23.11.0" rust-version = "1.79" [features] @@ -81,6 +81,9 @@ whoami = "1.5.2" uzers = "0.11" nix = { version = "0.28", features = ["fs", "user"] } +[target.'cfg(windows)'.dependencies] +windows-projfs = { version = "0.1.6", features = ["dynamic-import"] } + [dependencies.clap] version = "4.3" features = ["derive", "deprecated", "wrap_help"] diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index 6726b52b..91400930 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -15,7 +15,7 @@ use std::cell::RefCell; use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Write}; +use std::io::{self, BufWriter, Write}; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -180,6 +180,36 @@ enum Command { long_listing: bool, }, + /// Mount the archive as a filesystem. + /// + /// Files and directories from all previous backups are visible. + /// + /// This is currently only supported on Windows. + /// + /// On Windows you must first enable the Projected Filesystem feature by running this command + /// in an elevated PowerShell: + /// + /// Enable-WindowsOptionalFeature -Online -FeatureName Client-ProjFS -NoRestart + /// + /// ProjFS by default retains extracted files in the destination directory. This can make + /// access to the archive faster on subsequent mounts, but will use more disk space. + /// + /// If `--cleanup-projfs` is set, then the directory will be deleted when the projection is stopped. + /// Also, if this option is set, the destination directory must not exist. + #[cfg(windows)] + Mount { + /// The archive to mount + archive: String, + + /// Target folder where the archive should be mounted to + destination: PathBuf, + + /// Create the target folder and remove all temporarily created + /// files on exit + #[arg(long)] + cleanup_projfs: bool, + }, + /// Copy a stored tree to a restore directory. Restore { archive: String, @@ -306,7 +336,7 @@ impl std::process::Termination for ExitCode { impl Command { fn run(&self, monitor: Arc) -> Result { - let mut stdout = std::io::stdout(); + let mut stdout = io::stdout(); match self { Command::Backup { archive, @@ -468,6 +498,42 @@ impl Command { show::show_entry_names(entry_iter, &mut stdout, *long_listing)?; } } + #[cfg(windows)] + Command::Mount { + archive, + destination, + cleanup_projfs: cleanup, + } => { + use std::io::Read; + + let archive = Archive::open(Transport::new(archive)?)?; + let options = MountOptions { clean: *cleanup }; + let projection = match mount(archive, destination, options) { + Ok(handle) => handle, + Err(Error::MountDestinationExists) => { + error!("Mount point {} already exists", destination.display()); + return Ok(ExitCode::Failure); + } + Err(Error::MountDestinationDoesNotExists) => { + error!("Mount destination {} does not exist", destination.display()); + return Ok(ExitCode::Failure); + } + Err(error) => return Err(error), + }; + + info!( + "Projection started at {}.", + projection.mount_root().display() + ); + { + info!("Press any key to stop the projection..."); + let mut stdin = io::stdin(); + let _ = stdin.read(&mut [0u8]).unwrap(); + } + + info!("Stopping projection."); + drop(projection); + } Command::Restore { archive, destination, diff --git a/src/errors.rs b/src/errors.rs index 478b7d62..dbc524b7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -167,6 +167,15 @@ pub enum Error { #[error("Unexpected file {path:?} in archive directory")] UnexpectedFile { path: String }, + #[error("This feature is not implemented")] + NotImplemented, + + #[error("The destination already exists")] + MountDestinationExists, + + #[error("The destination does not exists")] + MountDestinationDoesNotExists, + /// Generic IO error. #[error(transparent)] IOError { @@ -189,6 +198,13 @@ pub enum Error { #[from] source: transport::Error, }, + + #[cfg(windows)] + #[error(transparent)] + Projection { + #[from] + source: windows_projfs::Error, + }, } impl From for Error { diff --git a/src/hunk_index.rs b/src/hunk_index.rs new file mode 100644 index 00000000..6d932819 --- /dev/null +++ b/src/hunk_index.rs @@ -0,0 +1,142 @@ +// TODO: This uses single indexes, but for consistency with the rest of Conserve +// it should probably use stitched indexes, so that it sees the continuation +// of interrupted backups. + +// TODO: Unit tests. + +// This is currently only used by projfs, but is not inherently Windows-specific. +#![cfg_attr(not(windows), allow(unused))] + +use std::cmp::Ordering; + +use rayon::iter::{IntoParallelIterator, ParallelIterator}; + +use crate::{Apath, IndexRead, Result}; + +#[derive(Debug)] +struct HunkIndexMeta { + index: u32, + + start_path: Apath, + end_path: Apath, +} + +/// An index over all available hunks available in an index +/// for speeding up sub-dir iterations and locating +/// path metadata. +pub struct IndexHunkIndex { + hunks: Vec, +} + +impl IndexHunkIndex { + /// Index all available hunks from the read index. + /// + /// Note: + /// Depending on the index size this might not be a cheap operation + /// as we loop through every hunk and read its contents. + pub fn from_index(index: &IndexRead) -> Result { + let mut hunk_info = index + .hunks_available()? + .into_par_iter() + .map(move |hunk_index| { + let mut index = index.duplicate(); + let entries = index.read_hunk(hunk_index)?; + let meta_info = if let Some(entries) = entries { + if let (Some(first), Some(last)) = (entries.first(), entries.last()) { + Some(HunkIndexMeta { + index: hunk_index, + + start_path: first.apath.clone(), + end_path: last.apath.clone(), + }) + } else { + None + } + } else { + None + }; + + Ok(meta_info) + }) + .map(Result::ok) + .flatten() + .filter_map(|entry| entry) + .collect::>(); + + /* After parallel execution bring all hunks back into order */ + hunk_info.sort_by_key(|info| info.index); + Ok(Self { hunks: hunk_info }) + } + + fn find_hunk_index_for_file(&self, path: &Apath) -> Option { + let hunk_index = self.hunks.binary_search_by(|entry| { + match (entry.start_path.cmp(path), entry.end_path.cmp(path)) { + (Ordering::Less, Ordering::Less) => Ordering::Less, + (Ordering::Greater, Ordering::Greater) => Ordering::Greater, + _ => Ordering::Equal, + } + }); + + /* + * If we do not have an exact match, no hunk contains the path we + * are looking for. + */ + hunk_index.ok() + } + + /// Locate the hunk index of the hunk which should contain the file metadata + /// for a given path. + /// + /// Note: + /// To validate file existence it's required to parse the hunk contents and actually + /// check of the file path exist. This function only returns the hunk index where the file + /// should be located, if it exists. + pub fn find_hunk_for_file(&self, path: &Apath) -> Option { + self.find_hunk_index_for_file(path) + .map(|index| self.hunks[index].index) + } + + /// Locate the hunks where the file metadata for the directory contents of a particular directory + /// are stored. + /// + /// Note: + /// To validate directory existence it's required to parse the hunk contents and actually + /// check of the directory path exist. + pub fn find_hunks_for_subdir(&self, path: &Apath, recursive: bool) -> Vec { + /* + * Appending an empty string to the path allows us to search for the first file + * in the target directory. This is needed as a file and a directory with the same name are not + * stored in succession. + * + * Example (from the apath test): + * - /b/a + * - /b/b + * - /b/c + * - /b/a/c + * - /b/b/c + */ + let search_path = path.append(""); + let directory_start_hunk = match self.find_hunk_index_for_file(&search_path) { + Some(index) => index, + None => return vec![], + }; + + let mut result = Vec::new(); + result.push(self.hunks[directory_start_hunk].index); + for hunk in &self.hunks[directory_start_hunk + 1..] { + if !path.is_prefix_of(&hunk.start_path) { + break; + } + + if !recursive && hunk.start_path[path.len() + 1..].contains('/') { + /* hunk does already contain directory content */ + break; + } + + /* hunk still contains subtree elements of that path */ + result.push(hunk.index); + } + + result + } +} diff --git a/src/index.rs b/src/index.rs index c388dcae..126a2da4 100644 --- a/src/index.rs +++ b/src/index.rs @@ -277,11 +277,16 @@ fn hunk_relpath(hunk_number: u32) -> String { format!("{:05}/{:09}", hunk_number / HUNKS_PER_SUBDIR, hunk_number) } -// TODO: Maybe this isn't adding much on top of the hunk iter? -#[derive(Debug, Clone)] +/// Utility to read the stored index pub struct IndexRead { /// Transport pointing to this index directory. transport: Transport, + + /// Decompressor for the index to read + decompressor: Decompressor, + + /// Current read statistics of this index + pub stats: IndexReadStats, } impl IndexRead { @@ -292,41 +297,92 @@ impl IndexRead { } pub(crate) fn open(transport: Transport) -> IndexRead { - IndexRead { transport } + IndexRead { + transport, + decompressor: Decompressor::new(), + stats: IndexReadStats::default(), + } } - /// Make an iterator that will return all entries in this band. - pub fn iter_entries(self) -> IndexEntryIter { - // TODO: An option to pass in a subtree? - IndexEntryIter::new(self.iter_hunks(), Apath::root(), Exclude::nothing()) + /// Clone the read index. + /// Note: + /// This has several side effects: + /// - Depending on the implementation of the decompressor, duplicate might not be a cheap option. + /// - Every read index has its own unique read stats, therefore the clone does not inherit the read stats. + pub(crate) fn duplicate(&self) -> Self { + Self::open(self.transport.clone()) } - /// Make an iterator that returns hunks of entries from this index. - pub fn iter_hunks(&self) -> IndexHunkIter { - let _span = debug_span!("iter_hunks", ?self.transport).entered(); - // All hunk numbers present in all directories. - let subdirs = self - .transport - .list_dir("") - .expect("list index dir") // TODO: Don't panic - .dirs - .into_iter() - .sorted() - .collect_vec(); - debug!(?subdirs); + /// Read and parse a specific hunk + pub fn read_hunk(&mut self, hunk_number: u32) -> Result>> { + let path = hunk_relpath(hunk_number); + let compressed_bytes = match self.transport.read_file(&path) { + Ok(b) => b, + Err(err) if err.is_not_found() => { + // TODO: Cope with one hunk being missing, while there are still + // later-numbered hunks. This would require reading the whole + // list of hunks first. + return Ok(None); + } + Err(source) => return Err(Error::Transport { source }), + }; + self.stats.index_hunks += 1; + self.stats.compressed_index_bytes += compressed_bytes.len() as u64; + let index_bytes = self.decompressor.decompress(&compressed_bytes)?; + self.stats.uncompressed_index_bytes += index_bytes.len() as u64; + let entries: Vec = + serde_json::from_slice(&index_bytes).map_err(|source| Error::DeserializeJson { + path: path.clone(), + source, + })?; + if entries.is_empty() { + // It's legal, it's just weird - and it can be produced by some old Conserve versions. + } + Ok(Some(entries)) + } + + // All hunk numbers present in all directories. + pub fn hunks_available(&self) -> Result> { + let subdirs = self.transport.list_dir("")?.dirs.into_iter().sorted(); + let hunks = subdirs - .into_iter() .filter_map(|dir| self.transport.list_dir(&dir).ok()) .flat_map(|list| list.files) .filter_map(|f| f.parse::().ok()) .sorted() .collect_vec(); + + Ok(hunks) + } + + /// Make an iterator that will return all entries in this band. + pub fn iter_entries(self) -> IndexEntryIter { + // TODO: An option to pass in a subtree? + IndexEntryIter::new( + self.iter_available_hunks(), + Apath::root(), + Exclude::nothing(), + ) + } + + /// Make an iterator that returns hunks of entries from this index. + pub fn iter_available_hunks(self) -> IndexHunkIter { + let _span = debug_span!("iter_hunks", ?self.transport).entered(); + let hunks = self.hunks_available().expect("hunks available"); // TODO: Don't panic debug!(?hunks); IndexHunkIter { hunks: hunks.into_iter(), - transport: self.transport.clone(), - decompressor: Decompressor::new(), - stats: IndexReadStats::default(), + index: self, + after: None, + } + } + + /// Make an iterator that returns hunks of entries for the specified hunks + pub fn iter_hunks(self, hunks: std::vec::IntoIter) -> IndexHunkIter { + let _span = debug_span!("iter_hunks", ?self.transport).entered(); + IndexHunkIter { + hunks, + index: self, after: None, } } @@ -337,10 +393,7 @@ impl IndexRead { /// Each returned item is a vec of (typically up to a thousand) index entries. pub struct IndexHunkIter { hunks: std::vec::IntoIter, - /// The `i` directory within the band where all files for this index are written. - transport: Transport, - decompressor: Decompressor, - pub stats: IndexReadStats, + pub index: IndexRead, /// If set, yield only entries ordered after this apath. after: Option, } @@ -351,11 +404,11 @@ impl Iterator for IndexHunkIter { fn next(&mut self) -> Option { loop { let hunk_number = self.hunks.next()?; - let entries = match self.read_next_hunk(hunk_number) { + let entries = match self.index.read_hunk(hunk_number) { Ok(None) => return None, Ok(Some(entries)) => entries, Err(err) => { - self.stats.errors += 1; + self.index.stats.errors += 1; error!("Error reading index hunk {hunk_number:?}: {err}"); continue; } @@ -394,33 +447,6 @@ impl IndexHunkIter { ..self } } - - fn read_next_hunk(&mut self, hunk_number: u32) -> Result>> { - let path = hunk_relpath(hunk_number); - let compressed_bytes = match self.transport.read_file(&path) { - Ok(b) => b, - Err(err) if err.is_not_found() => { - // TODO: Cope with one hunk being missing, while there are still - // later-numbered hunks. This would require reading the whole - // list of hunks first. - return Ok(None); - } - Err(source) => return Err(Error::Transport { source }), - }; - self.stats.index_hunks += 1; - self.stats.compressed_index_bytes += compressed_bytes.len() as u64; - let index_bytes = self.decompressor.decompress(&compressed_bytes)?; - self.stats.uncompressed_index_bytes += index_bytes.len() as u64; - let entries: Vec = - serde_json::from_slice(&index_bytes).map_err(|source| Error::DeserializeJson { - path: path.clone(), - source, - })?; - if entries.is_empty() { - // It's legal, it's just weird - and it can be produced by some old Conserve versions. - } - Ok(Some(entries)) - } } /// Read out all the entries from a stored index, in apath order. @@ -654,8 +680,9 @@ mod tests { assert_eq!(names, &["/1.1", "/1.2", "/2.1", "/2.2"]); // Read it out as hunks. - let hunks: Vec> = - IndexRead::open_path(testdir.path()).iter_hunks().collect(); + let hunks: Vec> = IndexRead::open_path(testdir.path()) + .iter_available_hunks() + .collect(); assert_eq!(hunks.len(), 2); assert_eq!( hunks[0] @@ -681,73 +708,72 @@ mod tests { ib.append_entries(&mut vec![sample_entry("/2.1"), sample_entry("/2.2")]); ib.finish_hunk(TestMonitor::arc()).unwrap(); - let index_read = IndexRead::open_path(testdir.path()); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/1.1", "/1.2", "/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/nonexistent".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, [""; 0]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/1.1".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/1.2", "/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/1.1.1".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/1.2", "/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/1.2".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/1.3".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/2.0".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/2.1".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/2.2".into()) .flatten() .map(|entry| entry.apath.into()) @@ -806,7 +832,7 @@ mod tests { // Think about, but don't actually add some files ib.finish_hunk(TestMonitor::arc())?; let read_index = IndexRead::open_path(testdir.path()); - assert_eq!(read_index.iter_hunks().count(), 1); + assert_eq!(read_index.iter_available_hunks().count(), 1); Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 387f7f23..18b284b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub mod entry; pub mod errors; pub mod excludes; mod gc_lock; +mod hunk_index; pub mod index; mod io; mod jsonio; @@ -35,6 +36,7 @@ pub mod live_tree; mod merge; pub mod misc; pub mod monitor; +mod mount; pub mod owner; pub mod restore; pub mod show; @@ -68,6 +70,7 @@ pub use crate::kind::Kind; pub use crate::live_tree::LiveTree; pub use crate::merge::MergeTrees; pub use crate::misc::bytes_to_human_mb; +pub use crate::mount::{mount, MountOptions}; pub use crate::owner::Owner; pub use crate::restore::{restore, RestoreOptions}; pub use crate::show::{show_versions, ShowVersionsOptions}; diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index b14e0647..622c1568 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -4,6 +4,7 @@ pub mod task; pub mod test; +pub mod void; use self::task::Task; use crate::counters::Counter; diff --git a/src/monitor/void.rs b/src/monitor/void.rs new file mode 100644 index 00000000..39d8c432 --- /dev/null +++ b/src/monitor/void.rs @@ -0,0 +1,26 @@ +use crate::counters::Counter; + +use super::{ + task::{Task, TaskList}, + Monitor, +}; + +/// A monitor that does not capture any information. +#[derive(Debug, Clone)] +pub struct VoidMonitor; +impl Monitor for VoidMonitor { + fn count(&self, _counter: Counter, _increment: usize) {} + + fn set_counter(&self, _counter: Counter, _value: usize) {} + + fn error(&self, _error: crate::Error) {} + + fn start_task(&self, name: String) -> Task { + /* + * All data related to the target task will be dropped + * as soon the callee drops the task. + */ + let mut list = TaskList::default(); + list.start_task(name) + } +} diff --git a/src/mount/mod.rs b/src/mount/mod.rs new file mode 100644 index 00000000..b2eebf3e --- /dev/null +++ b/src/mount/mod.rs @@ -0,0 +1,28 @@ +use std::path::Path; + +#[cfg(windows)] +mod projfs; + +#[cfg(unix)] +mod unix; + +/// Options for mounting an archive +/// into an existing file systems. +pub struct MountOptions { + /// Create the mount point and delete it + /// when unmounting resulting in a clean environment. + pub clean: bool, +} + +/// Handle for the mount controller. +/// Once dropped, the projection will be stopped and if specified so by MountOptions cleaned. +pub trait MountHandle { + /// Returns the root path where the archive has been mounted. + fn mount_root(&self) -> &Path; +} + +#[cfg(windows)] +pub use projfs::mount; + +#[cfg(unix)] +pub use unix::mount; diff --git a/src/mount/projfs.rs b/src/mount/projfs.rs new file mode 100644 index 00000000..637a451e --- /dev/null +++ b/src/mount/projfs.rs @@ -0,0 +1,529 @@ +use std::{ + borrow::Cow, + ffi::OsStr, + fs, + io::{self, ErrorKind, Read}, + iter::Peekable, + num::NonZeroUsize, + ops::ControlFlow, + path::{Component, Path, PathBuf}, + sync::{Arc, Mutex}, + time::Duration, +}; + +use bytes::Bytes; +use itertools::Itertools; +use lru::LruCache; +use tracing::{debug, info, warn}; +use windows_projfs::{ + DirectoryEntry, DirectoryInfo, FileInfo, Notification, ProjectedFileSystem, + ProjectedFileSystemSource, +}; + +use crate::{ + hunk_index::IndexHunkIndex, + monitor::{void::VoidMonitor, Monitor}, + Apath, Archive, BandId, BandSelectionPolicy, Error, IndexEntry, Kind, Result, StoredTree, +}; + +use super::{MountHandle, MountOptions}; + +struct StoredFileReader { + iter: Peekable>>>, +} + +impl StoredFileReader { + pub fn new( + stored_tree: Arc, + entry: IndexEntry, + byte_offset: u64, + monitor: Arc, + ) -> Result { + let file_content = entry + .addrs + .into_iter() + .scan(byte_offset, |skip_bytes, mut entry| { + if *skip_bytes == 0 { + Some(entry) + } else if *skip_bytes < entry.len { + entry.len -= *skip_bytes; + entry.start += *skip_bytes; + *skip_bytes = 0; + Some(entry) + } else { + *skip_bytes -= entry.len; + None + } + }) + .map::, _>(move |entry| { + let content = stored_tree + .block_dir() + .get_block_content(&entry.hash, monitor.clone())?; + + Ok(content.slice((entry.start as usize)..(entry.start + entry.len) as usize)) + }); + + Ok(Self { + iter: (Box::new(file_content) as Box>>).peekable(), + }) + } +} + +impl Read for StoredFileReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut bytes_written = 0; + + while bytes_written < buf.len() { + let current_chunk = match self.iter.peek_mut() { + Some(Ok(value)) => value, + Some(Err(_)) => { + return Err(io::Error::other(self.iter.next().unwrap().unwrap_err())) + } + None => break, + }; + + let bytes_pending = (buf.len() - bytes_written).min(current_chunk.len()); + + buf[bytes_written..(bytes_written + bytes_pending)] + .copy_from_slice(¤t_chunk[0..bytes_pending]); + bytes_written += bytes_pending; + + if bytes_pending == current_chunk.len() { + let _ = self.iter.next(); + } else { + *current_chunk = current_chunk.slice(bytes_pending..); + } + } + + Ok(bytes_written) + } +} + +const UNIX_WIN_DIFF_SECS: i64 = 11644473600; +fn unix_time_to_windows(unix_seconds: i64, unix_nanos: u32) -> u64 { + if unix_seconds < -UNIX_WIN_DIFF_SECS { + return 0; + } + + let win_seconds = (unix_seconds + UNIX_WIN_DIFF_SECS) as u64; + win_seconds * 1_000_000_000 / 100 + (unix_nanos / 100) as u64 +} + +/* https://learn.microsoft.com/en-us/windows/win32/fileio/file-attribute-constants */ +const FILE_ATTRIBUTE_READONLY: u32 = 0x00000001; +const FILE_ATTRIBUTE_DIRECTORY: u32 = 0x00000010; +const FILE_ATTRIBUTE_NOT_CONTENT_INDEXED: u32 = 0x00002000; +const FILE_ATTRIBUTE_RECALL_ON_OPEN: u32 = 0x00040000; + +/* Note: Using FILE_ATTRIBUTE_READONLY on directories will cause the explorer to *always* list all second level subdirectory entries */ +const DIRECTORY_ATTRIBUTES: u32 = + FILE_ATTRIBUTE_DIRECTORY | FILE_ATTRIBUTE_NOT_CONTENT_INDEXED | FILE_ATTRIBUTE_RECALL_ON_OPEN; + +fn index_entry_to_directory_entry(entry: &IndexEntry) -> Option { + let file_name = entry.apath.split('/').last()?; + if entry.kind == Kind::Dir { + Some( + DirectoryInfo { + directory_name: file_name.to_string(), + directory_attributes: DIRECTORY_ATTRIBUTES, + + /* currently conserve does not differentiate between the different time stamps */ + creation_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + last_access_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + last_write_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + } + .into(), + ) + } else if entry.kind == Kind::File { + Some( + FileInfo { + file_name: file_name.to_string(), + file_size: entry.addrs.iter().map(|block| block.len).sum(), + file_attributes: FILE_ATTRIBUTE_READONLY, + + /* currently conserve does not differentiate between the different time stamps */ + creation_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + last_access_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + last_write_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + } + .into(), + ) + } else { + None + } +} + +struct ArchiveProjectionSource { + archive: Archive, + + stored_tree_cache: Mutex>>, + + hunk_index_cache: Mutex>>, + + /* + * Cache the last accessed hunks to improve directory traversal speed. + */ + #[allow(clippy::type_complexity)] + hunk_content_cache: Mutex>>>, + + /* + * The Windows file explorer has the tendency to query some directories multiple times in a row. + * Also if the user navigates up/down, allow this cache to help. + */ + serve_dir_cache: Mutex>>, +} + +impl ArchiveProjectionSource { + pub fn load_hunk_contents( + &self, + stored_tree: &StoredTree, + hunk_id: u32, + ) -> Result>> { + let band_id = stored_tree.band().id(); + self.hunk_content_cache + .lock() + .unwrap() + .try_get_or_insert((band_id, hunk_id), || { + let mut index = stored_tree.band().index(); + Ok(Arc::new(index.read_hunk(hunk_id)?.unwrap_or_default())) + }) + .cloned() + } + + pub fn get_or_open_tree(&self, policy: BandSelectionPolicy) -> Result> { + let band_id = self.archive.resolve_band_id(policy)?; + self.stored_tree_cache + .lock() + .unwrap() + .try_get_or_insert(band_id, || { + debug!("Opening band {}", band_id); + + let stored_tree = self + .archive + .open_stored_tree(BandSelectionPolicy::Specified(band_id))?; + + Ok(Arc::new(stored_tree)) + }) + .cloned() + } + + pub fn get_or_create_hunk_index( + &self, + stored_tree: &StoredTree, + ) -> Result> { + let band_id = stored_tree.band().id(); + self.hunk_index_cache + .lock() + .unwrap() + .try_get_or_insert(band_id, || { + /* Inform the user that this band has been cached as this is most likely a heavy operation (cpu and memory wise) */ + info!("Caching files for band {}", stored_tree.band().id()); + + let helper = IndexHunkIndex::from_index(&stored_tree.band().index())?; + Ok(Arc::new(helper)) + }) + .cloned() + } + + fn parse_path_band_policy( + components: &mut dyn Iterator>, + ) -> Option { + match components.next().as_deref() { + Some("latest") => Some(BandSelectionPolicy::Latest), + Some("all") => components + .next() + .and_then(|band_id| band_id.parse::().ok()) + .map(BandSelectionPolicy::Specified), + _ => None, + } + } + + fn band_id_to_directory_info(&self, policy: BandSelectionPolicy) -> Option { + let stored_tree = self.get_or_open_tree(policy).ok()?; + let band_info = stored_tree.band().get_info().ok()?; + + let timestamp = unix_time_to_windows( + band_info.start_time.unix_timestamp(), + band_info.start_time.unix_timestamp_nanos() as u32, + ); + + Some(DirectoryInfo { + directory_name: format!("{}", band_info.id), + directory_attributes: DIRECTORY_ATTRIBUTES, + + creation_time: timestamp, + last_access_time: timestamp, + last_write_time: timestamp, + }) + } + + fn serve_dir(&self, path: &Path) -> Result> { + debug!("Serving directory {}", path.display()); + + let mut components = path + .components() + .map(Component::as_os_str) + .map(OsStr::to_string_lossy); + + let target_band = match components.next().as_deref() { + None => { + /* Virtual root, display band selection */ + let mut entries = Vec::with_capacity(2); + entries.push(DirectoryEntry::Directory(DirectoryInfo { + directory_name: "all".to_string(), + directory_attributes: DIRECTORY_ATTRIBUTES, + + ..Default::default() + })); + if let Some(mut info) = self.band_id_to_directory_info(BandSelectionPolicy::Latest) + { + info.directory_name = "latest".to_string(); + entries.push(DirectoryEntry::Directory(info)) + } + + return Ok(entries); + } + Some("latest") => BandSelectionPolicy::Latest, + Some("all") => { + if let Some(band_id) = components.next() { + BandSelectionPolicy::Specified(band_id.parse::()?) + } else { + /* list bands */ + let entries = self + .archive + .list_band_ids()? + .into_iter() + .filter_map(|band_id| { + self.band_id_to_directory_info(BandSelectionPolicy::Specified(band_id)) + .map(DirectoryEntry::Directory) + }) + .collect(); + + return Ok(entries); + } + } + _ => return Ok(vec![]), + }; + + let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); + let stored_tree = self.get_or_open_tree(target_band)?; + let hunk_index = self.get_or_create_hunk_index(&stored_tree)?; + let dir_hunks = hunk_index.find_hunks_for_subdir(&target_path, false); + + let hunks = dir_hunks + .into_iter() + .flat_map(|hunk_id| self.load_hunk_contents(&stored_tree, hunk_id).ok()) + .collect_vec(); + + let iterator = hunks.iter().flat_map(|e| &**e); + + let path_prefix = target_path.to_string(); + let entries = iterator + .filter(|entry| { + if !entry.apath.starts_with(&path_prefix) { + /* Not the directory we're interested in */ + return false; + } + + if entry.apath.len() <= path_prefix.len() { + /* + * Skipping the containing directory entry which is eqal to path_prefix. + */ + return false; + } + + let file_name = &entry.apath[path_prefix.len()..].trim_start_matches('/'); + if file_name.contains('/') { + /* entry is a file which is within a sub-directory */ + return false; + } + + true + }) + .filter_map(index_entry_to_directory_entry) + .collect_vec(); + + Ok(entries) + } + + fn serve_file( + &self, + path: &Path, + byte_offset: usize, + length: usize, + ) -> io::Result> { + let mut components = path + .components() + .map(Component::as_os_str) + .map(OsStr::to_string_lossy); + + let target_band = Self::parse_path_band_policy(&mut components) + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; + + let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); + let stored_tree = self + .get_or_open_tree(target_band) + .map_err(io::Error::other)?; + + let hunk_index = self + .get_or_create_hunk_index(&stored_tree) + .map_err(io::Error::other)?; + let file_hunk = hunk_index + .find_hunk_for_file(&target_path) + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; + + let index_entry = self + .load_hunk_contents(&stored_tree, file_hunk) + .map_err(io::Error::other)? + .iter() + .find(|entry| entry.apath == target_path) + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))? + .clone(); + + let file_size: u64 = index_entry.addrs.iter().map(|addr| addr.len).sum(); + + debug!( + "Serving {}{} ({}/{} bytes)", + stored_tree.band().id(), + target_path, + length, + file_size + ); + let reader = StoredFileReader::new( + stored_tree, + index_entry, + byte_offset as u64, + Arc::new(VoidMonitor), + ) + .map_err(io::Error::other)?; + Ok(Box::new(reader.take(length as u64))) + } +} + +impl ProjectedFileSystemSource for ArchiveProjectionSource { + fn list_directory(&self, path: &Path) -> Vec { + let cached_result = self + .serve_dir_cache + .lock() + .ok() + .and_then(|mut cache| cache.get(path).cloned()); + + if let Some(cached_result) = cached_result { + return cached_result; + } + + match self.serve_dir(path) { + Ok(entries) => { + self.serve_dir_cache + .lock() + .unwrap() + .push(path.to_owned(), entries.clone()); + + entries + } + Err(error) => { + warn!("Failed to serve {}: {}", path.display(), error); + vec![] + } + } + } + + fn stream_file_content( + &self, + path: &Path, + byte_offset: usize, + length: usize, + ) -> std::io::Result> { + match self.serve_file(path, byte_offset, length) { + Ok(reader) => Ok(reader), + Err(error) => { + if error.kind() != ErrorKind::NotFound { + warn!("Failed to serve file {}: {}", path.display(), error); + } + + Err(error) + } + } + } + + fn handle_notification(&self, notification: &Notification) -> ControlFlow<()> { + if notification.is_cancelable() + && !matches!(notification, Notification::FilePreConvertToFull(_)) + { + /* try to cancel everything, except retrieving data */ + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + } +} + +const ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE: i32 = 369; +struct WindowsMountHandle { + _projection: ProjectedFileSystem, + path: PathBuf, + cleanup: bool, +} + +impl Drop for WindowsMountHandle { + fn drop(&mut self) { + if self.cleanup { + debug!("Removing destination {}", self.path.display()); + let mut attempt_count = 0; + while let Err(err) = fs::remove_dir_all(&self.path) { + attempt_count += 1; + if err.raw_os_error().unwrap_or_default() + != ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE + || attempt_count > 5 + { + warn!("Failed to clean up projection destination: {}", err); + break; + } + std::thread::sleep(Duration::from_secs(1)); + } + } + } +} + +impl MountHandle for WindowsMountHandle { + fn mount_root(&self) -> &Path { + &self.path + } +} + +pub fn mount( + archive: Archive, + destination: &Path, + options: MountOptions, +) -> Result> { + if options.clean { + if destination.exists() { + return Err(Error::MountDestinationExists); + } + + fs::create_dir_all(destination)?; + } else if !destination.exists() { + return Err(Error::MountDestinationDoesNotExists); + } + + let source = ArchiveProjectionSource { + archive: archive.clone(), + + /* cache at most 16 different bands in parallel */ + stored_tree_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())), + hunk_index_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())), + + hunk_content_cache: Mutex::new(LruCache::new(NonZeroUsize::new(64).unwrap())), + serve_dir_cache: Mutex::new(LruCache::new(NonZeroUsize::new(32).unwrap())), + }; + + let projection = ProjectedFileSystem::new(destination, source)?; + let handle: Box = Box::new(WindowsMountHandle { + _projection: projection, + + path: destination.to_owned(), + cleanup: options.clean, + }); + + Ok(handle) +} diff --git a/src/mount/unix.rs b/src/mount/unix.rs new file mode 100644 index 00000000..9d1fac04 --- /dev/null +++ b/src/mount/unix.rs @@ -0,0 +1,12 @@ +use std::path::Path; + +use super::{MountHandle, MountOptions}; +use crate::{Archive, Error, Result}; + +pub fn mount( + _archive: Archive, + _destination: &Path, + _options: MountOptions, +) -> Result> { + Err(Error::NotImplemented) +} diff --git a/src/stitch.rs b/src/stitch.rs index dc936df1..df03872e 100644 --- a/src/stitch.rs +++ b/src/stitch.rs @@ -133,7 +133,7 @@ impl Iterator for IterStitchedIndexHunks { // Start reading this new index and skip forward until after last_apath match Band::open(&self.archive, *band_id) { Ok(band) => { - let mut index_hunks = band.index().iter_hunks(); + let mut index_hunks = band.index().iter_available_hunks(); if let Some(last) = &self.last_apath { index_hunks = index_hunks.advance_to_after(last) } diff --git a/tests/mount.rs b/tests/mount.rs new file mode 100644 index 00000000..00b6a60f --- /dev/null +++ b/tests/mount.rs @@ -0,0 +1,266 @@ +// Mostly inactive on Unix, as the mount function is not implemented for Unix. +#![cfg_attr(not(windows), allow(unused))] + +use std::{ + fs::{self}, + path::Path, +}; + +use conserve::{ + backup, + monitor::test::TestMonitor, + test_fixtures::{ScratchArchive, TreeFixture}, + BackupOptions, MountOptions, +}; +use tempfile::TempDir; + +#[cfg(windows)] +fn read_dir(path: &Path) -> Vec<(bool, String)> { + fs::read_dir(path) + .unwrap() + .filter_map(|entry| entry.ok()) + .map(|entry| { + ( + entry.file_type().unwrap().is_dir(), + entry.file_name().to_string_lossy().to_string(), + ) + }) + .collect::>() +} + +#[test] +#[cfg(unix)] +fn mount_unix_not_implemented() { + use assert_matches::assert_matches; + use conserve::Error; + + let archive = ScratchArchive::new(); + let mountdir = TempDir::new().unwrap(); + + let result = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: false }, + ); + assert_matches!(result.err(), Some(Error::NotImplemented)); +} + +#[test] +#[cfg(not(unix))] +fn mount_empty() { + let archive = ScratchArchive::new(); + let mountdir = TempDir::new().unwrap(); + let _projection = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: false }, + ) + .unwrap(); + + assert!(mountdir.path().is_dir()); + + /* An empty projection should not contain the "latest" folder as there is no latest */ + assert_eq!(read_dir(mountdir.path()), [(true, "all".into())]); +} + +#[test] +#[cfg(not(unix))] +fn mount_sub_dirs() { + let archive = ScratchArchive::new(); + { + let srcdir = TreeFixture::new(); + + srcdir.create_dir("sub1"); + srcdir.create_dir("sub1/sub1"); + srcdir.create_file("sub1/sub1/file.txt"); + + srcdir.create_dir("sub2"); + backup( + &archive, + srcdir.path(), + &BackupOptions::default(), + TestMonitor::arc(), + ) + .unwrap(); + } + + let mountdir = TempDir::new().unwrap(); + let _projection = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: false }, + ) + .unwrap(); + + assert!(mountdir.path().is_dir()); + assert_eq!( + read_dir(&mountdir.path().join("all")), + [(true, "b0000".into())] + ); + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0000")), + [(true, "sub1".into()), (true, "sub2".into())] + ); + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0000").join("sub1")), + [(true, "sub1".into())] + ); + assert_eq!( + read_dir( + &mountdir + .path() + .join("all") + .join("b0000") + .join("sub1") + .join("sub1") + ), + [(false, "file.txt".into())] + ); + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0000").join("sub2")), + [] + ); +} + +#[test] +#[cfg(not(unix))] +fn mount_file_versions() { + let archive = ScratchArchive::new(); + { + let srcdir = TreeFixture::new(); + + srcdir.create_file_with_contents("file_v1.txt", b"Hello World"); + backup( + &archive, + srcdir.path(), + &BackupOptions::default(), + TestMonitor::arc(), + ) + .unwrap(); + + srcdir.create_file_with_contents("file_v1.txt", b"Good bye World"); + srcdir.create_file_with_contents("file_v2.txt", b"Only in V2"); + backup( + &archive, + srcdir.path(), + &BackupOptions::default(), + TestMonitor::arc(), + ) + .unwrap(); + } + + let mountdir = TempDir::new().unwrap(); + let _projection = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: false }, + ) + .unwrap(); + + assert!(mountdir.path().is_dir()); + assert_eq!( + read_dir(mountdir.path()), + [(true, "all".into()), (true, "latest".into())] + ); + + /* check that "latest" is actually the latest version (version 2) */ + assert_eq!( + read_dir(&mountdir.path().join("latest")), + [(false, "file_v1.txt".into()), (false, "file_v2.txt".into())] + ); + assert_eq!( + fs::read(mountdir.path().join("latest").join("file_v1.txt")).unwrap(), + b"Good bye World" + ); + + /* check if the versions can be properly listed and accessed by "all" */ + assert_eq!( + read_dir(&mountdir.path().join("all")), + [(true, "b0000".into()), (true, "b0001".into())] + ); + + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0000")), + [(false, "file_v1.txt".into())] + ); + assert_eq!( + fs::read( + mountdir + .path() + .join("all") + .join("b0000") + .join("file_v1.txt") + ) + .unwrap(), + b"Hello World" + ); + + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0001")), + [(false, "file_v1.txt".into()), (false, "file_v2.txt".into())] + ); + assert_eq!( + fs::read( + mountdir + .path() + .join("all") + .join("b0001") + .join("file_v1.txt") + ) + .unwrap(), + b"Good bye World" + ); +} + +#[test] +#[cfg(not(unix))] +fn mount_cleanup() { + let archive = ScratchArchive::new(); + { + let srcdir = TreeFixture::new(); + srcdir.create_file("file.txt"); + + srcdir.create_dir("sub1"); + srcdir.create_file("sub1/file.txt"); + + srcdir.create_dir("sub2"); + backup( + &archive, + srcdir.path(), + &BackupOptions::default(), + TestMonitor::arc(), + ) + .unwrap(); + } + + let mountdir = TempDir::new().unwrap(); + fs::remove_dir(mountdir.path()).unwrap(); + + let projection = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: true }, + ) + .unwrap(); + + assert!(mountdir.path().is_dir()); + + /* actually read some data which may create files in the mount dir */ + fs::read(mountdir.path().join("all").join("b0000").join("file.txt")).unwrap(); + fs::read( + mountdir + .path() + .join("all") + .join("b0000") + .join("sub1") + .join("file.txt"), + ) + .unwrap(); + assert!(!read_dir(mountdir.path()).is_empty()); + + /* Mount dir should be cleaned now */ + drop(projection); + + /* the target dir should have been deleted */ + assert!(!mountdir.path().is_dir()); +}