From 8dac50d392d64c8d5acf8d49c58ffb398d1f5cbc Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Fri, 29 Dec 2023 22:41:03 +0100 Subject: [PATCH 01/25] Initial implementation of archive mounting on Windows --- Cargo.lock | 115 ++++++++++++++-- Cargo.toml | 6 + src/bin/conserve.rs | 24 ++++ src/errors.rs | 7 + src/index.rs | 2 +- src/lib.rs | 4 + src/mount.rs | 322 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 466 insertions(+), 14 deletions(-) create mode 100644 src/mount.rs diff --git a/Cargo.lock b/Cargo.lock index c395f510..c3c47db5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -793,6 +793,7 @@ dependencies = [ "unix_mode", "url", "uzers", + "windows-projfs", ] [[package]] @@ -1649,7 +1650,7 @@ dependencies = [ "libc", "redox_syscall 0.4.1", "smallvec", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -2383,18 +2384,18 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "83a48fd946b02c0a526b2e9481c8e2a17755e47039164a86c4070446e3a4614d" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "e7fbe9b594d6568a6a1443250a7e67d80b74e1e96f6d1715e1e21cc1888291d3" dependencies = [ "proc-macro2", "quote", @@ -2909,13 +2910,44 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.0", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + +[[package]] +name = "windows-projfs" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcfbe27be9d35b2a49eec282e0ab98fa7d538d22f964da23c81f122e89baeda0" +dependencies = [ + "log", + "parking_lot", + "thiserror", + "windows", +] + [[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -2924,13 +2956,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -2939,42 +2986,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/Cargo.toml b/Cargo.toml index e750bc82..f58adf0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,9 @@ indoc = "2.0" uzers = "0.11" nix = { version = "0.27", features = ["fs", "user"] } +[target.'cfg(windows)'.dependencies] +windows-projfs = { version = "0.1.3", optional = true } + [dependencies.clap] version = "4.3" features = ["derive", "deprecated", "wrap_help"] @@ -104,6 +107,9 @@ s3 = [ "dep:tokio", ] s3-integration-test = ["s3"] +mount-archive = [ + "dep:windows-projfs" +] [lib] doctest = false diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index e71a6301..3a1cd5cf 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -170,6 +170,21 @@ enum Command { long_listing: bool, }, + /// Mount the archive as projection. + #[cfg(feature = "mount-archive")] + Mount { + /// The archive to mount + archive: String, + + /// Target folder where the archive should be mounted to + destination: PathBuf, + + /// Create the target folder and remove all temporarly created + /// files on exit + #[arg(long, default_value_t = true)] + cleanup: bool, + }, + /// Copy a stored tree to a restore directory. Restore { archive: String, @@ -457,6 +472,15 @@ impl Command { show::show_entry_names(entry_iter, &mut stdout, *long_listing)?; } } + #[cfg(feature = "mount-archive")] + Command::Mount { + archive, + destination, + cleanup, + } => { + let archive = Archive::open(open_transport(archive)?)?; + mount(archive, destination, *cleanup)?; + } Command::Restore { archive, destination, diff --git a/src/errors.rs b/src/errors.rs index 6bb98ac5..a0178070 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -158,6 +158,13 @@ pub enum Error { #[from] source: transport::Error, }, + + #[cfg(feature = "mount-archive")] + #[error(transparent)] + Projection { + #[from] + source: windows_projfs::Error, + }, } impl From for Error { diff --git a/src/index.rs b/src/index.rs index 3f7b4f1e..a39c51c3 100644 --- a/src/index.rs +++ b/src/index.rs @@ -295,7 +295,7 @@ impl IndexRead { } /// Make an iterator that will return all entries in this band. - pub fn iter_entries(self) -> IndexEntryIter { + pub fn iter_entries(&self) -> IndexEntryIter { // TODO: An option to pass in a subtree? IndexEntryIter::new(self.iter_hunks(), Apath::root(), Exclude::nothing()) } diff --git a/src/lib.rs b/src/lib.rs index 629f387e..3e725d6f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,8 @@ pub mod live_tree; mod merge; pub mod misc; pub mod monitor; +#[cfg(feature = "mount-archive")] +mod mount; pub mod owner; pub mod restore; pub mod show; @@ -69,6 +71,8 @@ pub use crate::kind::Kind; pub use crate::live_tree::LiveTree; pub use crate::merge::MergeTrees; pub use crate::misc::bytes_to_human_mb; +#[cfg(feature = "mount-archive")] +pub use crate::mount::mount; pub use crate::owner::Owner; pub use crate::restore::{restore, RestoreOptions}; pub use crate::show::{show_versions, ShowVersionsOptions}; diff --git a/src/mount.rs b/src/mount.rs new file mode 100644 index 00000000..82088e68 --- /dev/null +++ b/src/mount.rs @@ -0,0 +1,322 @@ +use std::{ + borrow::Cow, + ffi::OsStr, + fs, + io::{self, ErrorKind, Read}, + iter::Peekable, + path::{Component, Path}, + sync::Arc, +}; + +use bytes::Bytes; +use itertools::Itertools; +use tracing::{debug, warn}; +use windows_projfs::{ + DirectoryEntry, DirectoryInfo, FileInfo, ProjectedFileSystem, ProjectedFileSystemSource, +}; + +use crate::{ + counters::Counter, + index::IndexEntryIter, + monitor::{ + task::{Task, TaskList}, + Monitor, Problem, + }, + Apath, Archive, BandId, BandSelectionPolicy, Exclude, IndexEntry, Kind, Result, +}; + +macro_rules! static_dir { + ($name:literal) => { + DirectoryInfo { + name: $name.to_string(), + } + .into() + }; +} + +struct VoidMonitor; +impl Monitor for VoidMonitor { + fn count(&self, _counter: Counter, _increment: usize) {} + + fn set_counter(&self, _counter: Counter, _value: usize) {} + + fn problem(&self, _problem: Problem) {} + + fn start_task(&self, name: String) -> Task { + /* + * All data related to the target task will be dropped + * as soon the callee drops the task. + */ + let mut list = TaskList::default(); + list.start_task(name) + } +} + +impl Into> for IndexEntry { + fn into(self) -> Option { + let file_name = self.apath.split("/").last()?; + if self.kind == Kind::Dir { + Some( + DirectoryInfo { + name: file_name.to_string(), + } + .into(), + ) + } else if self.kind == Kind::File { + Some( + FileInfo { + file_name: file_name.to_string(), + file_size: self.addrs.iter().map(|block| block.len).sum(), + ..Default::default() + } + .into(), + ) + } else if self.kind == Kind::Symlink { + /* + * Awaiting https://github.com/WolverinDEV/windows-projfs/issues/3 to be resolved + * before we can implement symlinks. + */ + None + } else { + None + } + } +} + +struct ArchiveProjectionSource { + archive: Archive, +} + +impl ArchiveProjectionSource { + fn parse_path_band_policy( + components: &mut dyn Iterator>, + ) -> Option { + match components.next().as_deref() { + Some("latest") => Some(BandSelectionPolicy::Latest), + Some("all") => components + .next() + .map(|band_id| band_id.parse::().ok()) + .flatten() + .map(BandSelectionPolicy::Specified), + _ => None, + } + } + + fn serve_dir(&self, path: &Path) -> Result> { + let mut components = path + .components() + .map(Component::as_os_str) + .map(OsStr::to_string_lossy); + + let target_band = match components.next().as_deref() { + None => { + /* Virtual root, display channel selection */ + return Ok(vec![static_dir!("latest"), static_dir!("all")]); + } + Some("latest") => BandSelectionPolicy::Latest, + Some("all") => { + if let Some(band_id) = components.next() { + BandSelectionPolicy::Specified(band_id.parse::()?) + } else { + /* list bands */ + let entries = self + .archive + .list_band_ids()? + .into_iter() + .map(|band_id| { + DirectoryEntry::Directory(DirectoryInfo { + name: format!("{}", band_id), + }) + }) + .collect(); + + return Ok(entries); + } + } + _ => return Ok(vec![]), + }; + + let stored_tree = self.archive.open_stored_tree(target_band)?; + let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); + let tree_index = stored_tree.band().index(); + + let iterator = IndexEntryIter::new( + tree_index.iter_hunks(), + target_path.clone(), + Exclude::nothing(), + ); + + let path_prefix = target_path.to_string(); + let entries = iterator + .filter(|entry| { + if entry.apath.len() <= path_prefix.len() { + /* + * Skipping the containing directory entry which is eqal to path_prefix. + * + * Note: + * We're not filtering for entries which are not contained within target_path as the + * IndexEntryIter already does this. + */ + return false; + } + + let file_name = &entry.apath[path_prefix.len()..].trim_start_matches("/"); + if file_name.contains("/") { + /* entry is a file which is within a sub-directory */ + return false; + } + + true + }) + .filter_map(IndexEntry::into) + .collect_vec(); + + Ok(entries) + } +} + +struct BytesIteratorReader { + iter: Peekable>>>, +} + +impl BytesIteratorReader { + pub fn new(iter: Box>>) -> Self { + Self { + iter: iter.peekable(), + } + } +} + +impl Read for BytesIteratorReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut bytes_written = 0; + + while bytes_written < buf.len() { + let current_chunk = match self.iter.peek_mut() { + Some(Ok(value)) => value, + Some(Err(_)) => { + return Err(io::Error::other(self.iter.next().unwrap().unwrap_err())) + } + None => break, + }; + + let bytes_pending = (buf.len() - bytes_written).min(current_chunk.len()); + + buf[bytes_written..(bytes_written + bytes_pending)] + .copy_from_slice(¤t_chunk[0..bytes_pending]); + bytes_written += bytes_pending; + + if bytes_pending == current_chunk.len() { + let _ = self.iter.next(); + } else { + *current_chunk = current_chunk.slice(bytes_pending..); + } + } + + Ok(bytes_written) + } +} + +impl ProjectedFileSystemSource for ArchiveProjectionSource { + fn list_directory(&self, path: &Path) -> Vec { + match self.serve_dir(path) { + Ok(entries) => entries, + Err(error) => { + warn!("Failed to serve {}: {}", path.display(), error); + vec![] + } + } + } + + fn stream_file_content( + &self, + path: &Path, + byte_offset: usize, + length: usize, + ) -> std::io::Result> { + let mut components = path + .components() + .map(Component::as_os_str) + .map(OsStr::to_string_lossy); + + let target_band = Self::parse_path_band_policy(&mut components) + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; + + let stored_tree = self + .archive + .open_stored_tree(target_band) + .map_err(io::Error::other)?; + + let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); + let index_entry = stored_tree + .band() + .index() + .iter_entries() + .find(|entry| entry.apath == target_path) + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; + + let void_monitor = Arc::new(VoidMonitor); + + let file_content = index_entry + .addrs + .into_iter() + .scan(byte_offset as u64, |skip_bytes, mut entry| { + if *skip_bytes == 0 { + Some(entry) + } else if *skip_bytes < entry.len { + entry.len -= *skip_bytes; + entry.start += *skip_bytes; + *skip_bytes = 0; + Some(entry) + } else { + *skip_bytes -= entry.len; + None + } + }) + .map(move |entry| { + let content = stored_tree + .block_dir() + .get_block_content(&entry.hash, void_monitor.clone())?; + Ok(content.slice((entry.start as usize)..(entry.start + entry.len) as usize)) + }); + + let reader = BytesIteratorReader::new(Box::new(file_content)); + Ok(Box::new(reader.take(length as u64))) + } +} + +pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { + if clean { + if destination.exists() { + eprintln!("The destination already exists."); + eprintln!("Please ensure, that the destination does not exists."); + return Ok(()); + } + + fs::create_dir_all(destination)?; + } else { + if !destination.exists() { + eprintln!("The destination does not exists."); + eprintln!("Please ensure, that the destination does exist prior mounting."); + return Ok(()); + } + } + + let source = ArchiveProjectionSource { archive }; + let _projection = ProjectedFileSystem::new(destination, source)?; + + { + println!("Press any key to stop the projection..."); + let mut stdin = io::stdin(); + let _ = stdin.read(&mut [0u8]).unwrap(); + } + + if clean { + debug!("Removing destination {}", destination.display()); + if let Err(err) = fs::remove_dir_all(destination) { + warn!("Failed to clean up projection destination: {}", err); + } + } + + Ok(()) +} From 58f84a54074de2a8ebe2c692c43a9ca09fe34ca7 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 30 Dec 2023 13:34:46 +0100 Subject: [PATCH 02/25] Adding a hunk metadata cache to speed up index entry querying --- src/index.rs | 183 ++++++++++++++++------------ src/mount.rs | 332 +++++++++++++++++++++++++++++++++++++++----------- src/stitch.rs | 2 +- 3 files changed, 363 insertions(+), 154 deletions(-) diff --git a/src/index.rs b/src/index.rs index a39c51c3..49de5d46 100644 --- a/src/index.rs +++ b/src/index.rs @@ -277,11 +277,16 @@ fn hunk_relpath(hunk_number: u32) -> String { format!("{:05}/{:09}", hunk_number / HUNKS_PER_SUBDIR, hunk_number) } -// TODO: Maybe this isn't adding much on top of the hunk iter? -#[derive(Debug, Clone)] +/// Utility to read the stored index pub struct IndexRead { /// Transport pointing to this index directory. transport: Arc, + + /// Decompressor for the index to read + decompressor: Decompressor, + + /// Current read statistics of this index + pub stats: IndexReadStats, } impl IndexRead { @@ -291,41 +296,90 @@ impl IndexRead { } pub(crate) fn open(transport: Arc) -> IndexRead { - IndexRead { transport } + IndexRead { + transport, + decompressor: Decompressor::new(), + stats: IndexReadStats::default(), + } } - /// Make an iterator that will return all entries in this band. - pub fn iter_entries(&self) -> IndexEntryIter { - // TODO: An option to pass in a subtree? - IndexEntryIter::new(self.iter_hunks(), Apath::root(), Exclude::nothing()) + pub(crate) fn duplicate(&self) -> Self { + Self::open(self.transport.clone()) } - /// Make an iterator that returns hunks of entries from this index. - pub fn iter_hunks(&self) -> IndexHunkIter { - let _span = debug_span!("iter_hunks", ?self.transport).entered(); - // All hunk numbers present in all directories. - let subdirs = self - .transport - .list_dir("") - .expect("list index dir") // TODO: Don't panic - .dirs - .into_iter() - .sorted() - .collect_vec(); - debug!(?subdirs); + /// Read and parse a specific hunk + pub fn read_hunk(&mut self, hunk_number: u32) -> Result>> { + let path = hunk_relpath(hunk_number); + let compressed_bytes = match self.transport.read_file(&path) { + Ok(b) => b, + Err(err) if err.is_not_found() => { + // TODO: Cope with one hunk being missing, while there are still + // later-numbered hunks. This would require reading the whole + // list of hunks first. + return Ok(None); + } + Err(source) => return Err(Error::Transport { source }), + }; + self.stats.index_hunks += 1; + self.stats.compressed_index_bytes += compressed_bytes.len() as u64; + let index_bytes = self.decompressor.decompress(&compressed_bytes)?; + self.stats.uncompressed_index_bytes += index_bytes.len() as u64; + let entries: Vec = + serde_json::from_slice(&index_bytes).map_err(|source| Error::DeserializeJson { + path: path.clone(), + source, + })?; + if entries.is_empty() { + // It's legal, it's just weird - and it can be produced by some old Conserve versions. + } + Ok(Some(entries)) + } + + // All hunk numbers present in all directories. + pub fn hunks_available(&self) -> Result> { + let subdirs = self.transport.list_dir("")?.dirs.into_iter().sorted(); + let hunks = subdirs - .into_iter() .filter_map(|dir| self.transport.list_dir(&dir).ok()) .flat_map(|list| list.files) .filter_map(|f| f.parse::().ok()) .sorted() .collect_vec(); + + Ok(hunks) + } + + /// Make an iterator that will return all entries in this band. + pub fn iter_entries(self) -> IndexEntryIter { + // TODO: An option to pass in a subtree? + IndexEntryIter::new( + self.iter_available_hunks(), + Apath::root(), + Exclude::nothing(), + ) + } + + /// Make an iterator that returns hunks of entries from this index. + pub fn iter_available_hunks(self) -> IndexHunkIter { + let _span = debug_span!("iter_hunks", ?self.transport).entered(); + let hunks = self.hunks_available().expect("hunks available"); // TODO: Don't panic debug!(?hunks); + IndexHunkIter { hunks: hunks.into_iter(), - transport: Arc::clone(&self.transport), - decompressor: Decompressor::new(), - stats: IndexReadStats::default(), + index: self, + + after: None, + } + } + + /// Make an iterator that returns hunks of entries for the specified hunks + pub fn iter_hunks(self, hunks: std::vec::IntoIter) -> IndexHunkIter { + let _span = debug_span!("iter_hunks", ?self.transport).entered(); + IndexHunkIter { + hunks, + index: self, + after: None, } } @@ -336,10 +390,8 @@ impl IndexRead { /// Each returned item is a vec of (typically up to a thousand) index entries. pub struct IndexHunkIter { hunks: std::vec::IntoIter, - /// The `i` directory within the band where all files for this index are written. - transport: Arc, - decompressor: Decompressor, - pub stats: IndexReadStats, + pub index: IndexRead, + /// If set, yield only entries ordered after this apath. after: Option, } @@ -350,11 +402,11 @@ impl Iterator for IndexHunkIter { fn next(&mut self) -> Option { loop { let hunk_number = self.hunks.next()?; - let entries = match self.read_next_hunk(hunk_number) { + let entries = match self.index.read_hunk(hunk_number) { Ok(None) => return None, Ok(Some(entries)) => entries, Err(err) => { - self.stats.errors += 1; + self.index.stats.errors += 1; error!("Error reading index hunk {hunk_number:?}: {err}"); continue; } @@ -393,33 +445,6 @@ impl IndexHunkIter { ..self } } - - fn read_next_hunk(&mut self, hunk_number: u32) -> Result>> { - let path = hunk_relpath(hunk_number); - let compressed_bytes = match self.transport.read_file(&path) { - Ok(b) => b, - Err(err) if err.is_not_found() => { - // TODO: Cope with one hunk being missing, while there are still - // later-numbered hunks. This would require reading the whole - // list of hunks first. - return Ok(None); - } - Err(source) => return Err(Error::Transport { source }), - }; - self.stats.index_hunks += 1; - self.stats.compressed_index_bytes += compressed_bytes.len() as u64; - let index_bytes = self.decompressor.decompress(&compressed_bytes)?; - self.stats.uncompressed_index_bytes += index_bytes.len() as u64; - let entries: Vec = - serde_json::from_slice(&index_bytes).map_err(|source| Error::DeserializeJson { - path: path.clone(), - source, - })?; - if entries.is_empty() { - // It's legal, it's just weird - and it can be produced by some old Conserve versions. - } - Ok(Some(entries)) - } } /// Read out all the entries from a stored index, in apath order. @@ -654,8 +679,9 @@ mod tests { assert_eq!(names, &["/1.1", "/1.2", "/2.1", "/2.2"]); // Read it out as hunks. - let hunks: Vec> = - IndexRead::open_path(testdir.path()).iter_hunks().collect(); + let hunks: Vec> = IndexRead::open_path(testdir.path()) + .iter_available_hunks() + .collect(); assert_eq!(hunks.len(), 2); assert_eq!( hunks[0] @@ -681,73 +707,72 @@ mod tests { ib.append_entries(&mut vec![sample_entry("/2.1"), sample_entry("/2.2")]); ib.finish_hunk(CollectMonitor::arc()).unwrap(); - let index_read = IndexRead::open_path(testdir.path()); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/1.1", "/1.2", "/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/nonexistent".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, [""; 0]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/1.1".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/1.2", "/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/1.1.1".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/1.2", "/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/1.2".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/1.3".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/2.0".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/2.1", "/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/2.1".into()) .flatten() .map(|entry| entry.apath.into()) .collect(); assert_eq!(names, ["/2.2"]); - let names: Vec = index_read - .iter_hunks() + let names: Vec = IndexRead::open_path(testdir.path()) + .iter_available_hunks() .advance_to_after(&"/2.2".into()) .flatten() .map(|entry| entry.apath.into()) @@ -806,7 +831,7 @@ mod tests { // Think about, but don't actually add some files ib.finish_hunk(CollectMonitor::arc())?; let read_index = IndexRead::open_path(testdir.path()); - assert_eq!(read_index.iter_hunks().count(), 1); + assert_eq!(read_index.iter_available_hunks().count(), 1); Ok(()) } } diff --git a/src/mount.rs b/src/mount.rs index 82088e68..0bbcc31b 100644 --- a/src/mount.rs +++ b/src/mount.rs @@ -1,12 +1,16 @@ +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::{ borrow::Cow, + cmp::Ordering, + collections::{btree_map::Entry, BTreeMap}, ffi::OsStr, fs, io::{self, ErrorKind, Read}, iter::Peekable, path::{Component, Path}, - sync::Arc, + sync::{Arc, Mutex}, }; +use tracing::{info, trace}; use bytes::Bytes; use itertools::Itertools; @@ -22,7 +26,8 @@ use crate::{ task::{Task, TaskList}, Monitor, Problem, }, - Apath, Archive, BandId, BandSelectionPolicy, Exclude, IndexEntry, Kind, Result, + Apath, Archive, BandId, BandSelectionPolicy, Exclude, IndexEntry, IndexRead, Kind, Result, + StoredTree, }; macro_rules! static_dir { @@ -52,6 +57,199 @@ impl Monitor for VoidMonitor { } } +#[derive(Debug)] +struct HunkMetaInfo { + index: u32, + + start_path: Apath, + end_path: Apath, +} + +struct HunkHelper { + hunks: Vec, +} + +impl HunkHelper { + pub fn from_index(index: &IndexRead) -> Result { + let mut hunk_info = index + .hunks_available()? + .into_par_iter() + .map(move |hunk_index| { + let mut index = index.duplicate(); + let entries = index.read_hunk(hunk_index)?; + let meta_info = if let Some(entries) = entries { + if let (Some(first), Some(last)) = (entries.first(), entries.last()) { + Some(HunkMetaInfo { + index: hunk_index, + + start_path: first.apath.clone(), + end_path: last.apath.clone(), + }) + } else { + None + } + } else { + None + }; + + Ok(meta_info) + }) + .map(Result::ok) + .flatten() + .filter_map(|entry| entry) + .collect::>(); + + /* After parallel execution bring all hunks back into order */ + hunk_info.sort_by_key(|info| info.index); + Ok(Self { hunks: hunk_info }) + } + + pub fn find_hunk_for_file(&self, path: &Apath) -> Option { + let hunk_index = self.hunks.binary_search_by(|entry| { + match (entry.start_path.cmp(&path), entry.end_path.cmp(&path)) { + (Ordering::Less, Ordering::Less) => Ordering::Less, + (Ordering::Greater, Ordering::Greater) => Ordering::Greater, + _ => Ordering::Equal, + } + }); + + let hunk_index = match hunk_index { + Ok(index) => index, + Err(index) => index, + }; + + if hunk_index >= self.hunks.len() { + None + } else { + Some(self.hunks[hunk_index].index) + } + } + + pub fn find_hunks_for_subdir(&self, path: &Apath, recursive: bool) -> Vec { + /* + * Appending an empty string to the path allows us to search for the first file + * in the target directory. This is needed as a file and a directory with the same name are not + * stored in succession. + * + * Example (from the apath test): + * - /b/a + * - /b/b + * - /b/c + * - /b/a/c + * - /b/b/c + */ + let search_path = path.append(""); + let directory_start_hunk = match self.hunks.binary_search_by(|entry| { + match ( + entry.start_path.cmp(&search_path), + entry.end_path.cmp(&search_path), + ) { + (Ordering::Less, Ordering::Less) => Ordering::Less, + (Ordering::Greater, Ordering::Greater) => Ordering::Greater, + _ => Ordering::Equal, + } + }) { + Ok(hunk) => hunk, + Err(hunk) => hunk, + }; + + if directory_start_hunk >= self.hunks.len() { + return vec![]; + } + + let mut result = Vec::new(); + result.push(self.hunks[directory_start_hunk].index); + for hunk in &self.hunks[directory_start_hunk + 1..] { + if !path.is_prefix_of(&hunk.start_path) { + break; + } + + if !recursive { + if hunk.start_path[path.len() + 1..].contains("/") { + /* hunk does already contain directory content */ + break; + } + } + + /* hunk still contains subtree elements of that path */ + result.push(hunk.index); + } + + result + } +} + +struct StoredFileReader { + iter: Peekable>>>, +} + +impl StoredFileReader { + pub fn new( + stored_tree: StoredTree, + entry: IndexEntry, + byte_offset: u64, + monitor: Arc, + ) -> Result { + let file_content = entry + .addrs + .into_iter() + .scan(byte_offset, |skip_bytes, mut entry| { + if *skip_bytes == 0 { + Some(entry) + } else if *skip_bytes < entry.len { + entry.len -= *skip_bytes; + entry.start += *skip_bytes; + *skip_bytes = 0; + Some(entry) + } else { + *skip_bytes -= entry.len; + None + } + }) + .map::, _>(move |entry| { + let content = stored_tree + .block_dir() + .get_block_content(&entry.hash, monitor.clone())?; + + Ok(content.slice((entry.start as usize)..(entry.start + entry.len) as usize)) + }); + + Ok(Self { + iter: (Box::new(file_content) as Box>>).peekable(), + }) + } +} + +impl Read for StoredFileReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut bytes_written = 0; + + while bytes_written < buf.len() { + let current_chunk = match self.iter.peek_mut() { + Some(Ok(value)) => value, + Some(Err(_)) => { + return Err(io::Error::other(self.iter.next().unwrap().unwrap_err())) + } + None => break, + }; + + let bytes_pending = (buf.len() - bytes_written).min(current_chunk.len()); + + buf[bytes_written..(bytes_written + bytes_pending)] + .copy_from_slice(¤t_chunk[0..bytes_pending]); + bytes_written += bytes_pending; + + if bytes_pending == current_chunk.len() { + let _ = self.iter.next(); + } else { + *current_chunk = current_chunk.slice(bytes_pending..); + } + } + + Ok(bytes_written) + } +} + impl Into> for IndexEntry { fn into(self) -> Option { let file_name = self.apath.split("/").last()?; @@ -67,6 +265,7 @@ impl Into> for IndexEntry { FileInfo { file_name: file_name.to_string(), file_size: self.addrs.iter().map(|block| block.len).sum(), + ..Default::default() } .into(), @@ -83,8 +282,27 @@ impl Into> for IndexEntry { } } +struct ProjectionHunkHelper { + hunks: BTreeMap, +} + +impl ProjectionHunkHelper { + pub fn get_or_create_helper(&mut self, stored_tree: &StoredTree) -> Result<&HunkHelper> { + match self.hunks.entry(stored_tree.band().id()) { + Entry::Occupied(entry) => Ok(entry.into_mut()), + Entry::Vacant(entry) => { + info!("Caching files for band {}", stored_tree.band().id()); + + let helper = HunkHelper::from_index(&stored_tree.band().index())?; + Ok(entry.insert(helper)) + } + } + } +} + struct ArchiveProjectionSource { archive: Archive, + hunk_helper: Mutex, } impl ArchiveProjectionSource { @@ -138,10 +356,16 @@ impl ArchiveProjectionSource { let stored_tree = self.archive.open_stored_tree(target_band)?; let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); - let tree_index = stored_tree.band().index(); + let dir_hunks = { + let mut hunk_helper = self.hunk_helper.lock().unwrap(); + let hunk_helper = hunk_helper.get_or_create_helper(&stored_tree)?; + hunk_helper.find_hunks_for_subdir(&target_path, false) + }; + + let tree_index = stored_tree.band().index(); let iterator = IndexEntryIter::new( - tree_index.iter_hunks(), + tree_index.iter_hunks(dir_hunks.into_iter()), target_path.clone(), Exclude::nothing(), ); @@ -175,48 +399,6 @@ impl ArchiveProjectionSource { } } -struct BytesIteratorReader { - iter: Peekable>>>, -} - -impl BytesIteratorReader { - pub fn new(iter: Box>>) -> Self { - Self { - iter: iter.peekable(), - } - } -} - -impl Read for BytesIteratorReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let mut bytes_written = 0; - - while bytes_written < buf.len() { - let current_chunk = match self.iter.peek_mut() { - Some(Ok(value)) => value, - Some(Err(_)) => { - return Err(io::Error::other(self.iter.next().unwrap().unwrap_err())) - } - None => break, - }; - - let bytes_pending = (buf.len() - bytes_written).min(current_chunk.len()); - - buf[bytes_written..(bytes_written + bytes_pending)] - .copy_from_slice(¤t_chunk[0..bytes_pending]); - bytes_written += bytes_pending; - - if bytes_pending == current_chunk.len() { - let _ = self.iter.next(); - } else { - *current_chunk = current_chunk.slice(bytes_pending..); - } - } - - Ok(bytes_written) - } -} - impl ProjectedFileSystemSource for ArchiveProjectionSource { fn list_directory(&self, path: &Path) -> Vec { match self.serve_dir(path) { @@ -248,39 +430,36 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { .map_err(io::Error::other)?; let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); + + let file_hunk = { + let mut hunk_helper = self.hunk_helper.lock().unwrap(); + let hunk_helper = hunk_helper + .get_or_create_helper(&stored_tree) + .map_err(io::Error::other)?; + + hunk_helper + .find_hunk_for_file(&target_path) + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))? + }; + let index_entry = stored_tree .band() .index() - .iter_entries() + .read_hunk(file_hunk) + .map_err(io::Error::other)? + .unwrap_or_default() + .into_iter() .find(|entry| entry.apath == target_path) .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; - let void_monitor = Arc::new(VoidMonitor); - - let file_content = index_entry - .addrs - .into_iter() - .scan(byte_offset as u64, |skip_bytes, mut entry| { - if *skip_bytes == 0 { - Some(entry) - } else if *skip_bytes < entry.len { - entry.len -= *skip_bytes; - entry.start += *skip_bytes; - *skip_bytes = 0; - Some(entry) - } else { - *skip_bytes -= entry.len; - None - } - }) - .map(move |entry| { - let content = stored_tree - .block_dir() - .get_block_content(&entry.hash, void_monitor.clone())?; - Ok(content.slice((entry.start as usize)..(entry.start + entry.len) as usize)) - }); - - let reader = BytesIteratorReader::new(Box::new(file_content)); + trace!("Serving {}/{}", stored_tree.band().id(), target_path); + let reader = StoredFileReader::new( + stored_tree, + index_entry, + byte_offset as u64, + Arc::new(VoidMonitor), + ) + .map_err(io::Error::other)?; Ok(Box::new(reader.take(length as u64))) } } @@ -302,7 +481,12 @@ pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { } } - let source = ArchiveProjectionSource { archive }; + let source = ArchiveProjectionSource { + archive, + hunk_helper: Mutex::new(ProjectionHunkHelper { + hunks: Default::default(), + }), + }; let _projection = ProjectedFileSystem::new(destination, source)?; { diff --git a/src/stitch.rs b/src/stitch.rs index 0e945e0b..a99bea09 100644 --- a/src/stitch.rs +++ b/src/stitch.rs @@ -122,7 +122,7 @@ impl Iterator for IterStitchedIndexHunks { // Start reading this new index and skip forward until after last_apath match Band::open(&self.archive, *band_id) { Ok(band) => { - let mut index_hunks = band.index().iter_hunks(); + let mut index_hunks = band.index().iter_available_hunks(); if let Some(last) = &self.last_apath { index_hunks = index_hunks.advance_to_after(last) } From 0b7e53a4d051bbe9bcec698ca87403ff897c0cff Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 30 Dec 2023 13:52:26 +0100 Subject: [PATCH 03/25] Caching the opened bands as well --- src/mount.rs | 91 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/src/mount.rs b/src/mount.rs index 0bbcc31b..ef56b191 100644 --- a/src/mount.rs +++ b/src/mount.rs @@ -10,7 +10,7 @@ use std::{ path::{Component, Path}, sync::{Arc, Mutex}, }; -use tracing::{info, trace}; +use tracing::info; use bytes::Bytes; use itertools::Itertools; @@ -31,9 +31,9 @@ use crate::{ }; macro_rules! static_dir { - ($name:literal) => { + ($name:expr) => { DirectoryInfo { - name: $name.to_string(), + name: ($name).to_string(), } .into() }; @@ -185,7 +185,7 @@ struct StoredFileReader { impl StoredFileReader { pub fn new( - stored_tree: StoredTree, + stored_tree: Arc, entry: IndexEntry, byte_offset: u64, monitor: Arc, @@ -282,11 +282,29 @@ impl Into> for IndexEntry { } } -struct ProjectionHunkHelper { +struct ProjectionCache { + archive: Archive, hunks: BTreeMap, + trees: BTreeMap>, } -impl ProjectionHunkHelper { +impl ProjectionCache { + pub fn get_or_open_tree(&mut self, policy: BandSelectionPolicy) -> Result<&Arc> { + let band_id = self.archive.resolve_band_id(policy)?; + match self.trees.entry(band_id) { + Entry::Occupied(entry) => Ok(entry.into_mut()), + Entry::Vacant(entry) => { + info!("Opening band {}", band_id); + + let stored_tree = self + .archive + .open_stored_tree(BandSelectionPolicy::Specified(band_id))?; + + Ok(entry.insert(Arc::new(stored_tree))) + } + } + } + pub fn get_or_create_helper(&mut self, stored_tree: &StoredTree) -> Result<&HunkHelper> { match self.hunks.entry(stored_tree.band().id()) { Entry::Occupied(entry) => Ok(entry.into_mut()), @@ -302,7 +320,7 @@ impl ProjectionHunkHelper { struct ArchiveProjectionSource { archive: Archive, - hunk_helper: Mutex, + cache: Mutex, } impl ArchiveProjectionSource { @@ -341,11 +359,7 @@ impl ArchiveProjectionSource { .archive .list_band_ids()? .into_iter() - .map(|band_id| { - DirectoryEntry::Directory(DirectoryInfo { - name: format!("{}", band_id), - }) - }) + .map(|band_id| static_dir!(format!("{}", band_id))) .collect(); return Ok(entries); @@ -354,13 +368,14 @@ impl ArchiveProjectionSource { _ => return Ok(vec![]), }; - let stored_tree = self.archive.open_stored_tree(target_band)?; let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); + let (stored_tree, dir_hunks) = { + let mut cache = self.cache.lock().unwrap(); + let stored_tree = cache.get_or_open_tree(target_band)?.clone(); + let hunk_helper = cache.get_or_create_helper(&stored_tree)?; + let dir_hunks = hunk_helper.find_hunks_for_subdir(&target_path, false); - let dir_hunks = { - let mut hunk_helper = self.hunk_helper.lock().unwrap(); - let hunk_helper = hunk_helper.get_or_create_helper(&stored_tree)?; - hunk_helper.find_hunks_for_subdir(&target_path, false) + (stored_tree, dir_hunks) }; let tree_index = stored_tree.band().index(); @@ -424,22 +439,23 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { let target_band = Self::parse_path_band_policy(&mut components) .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; - let stored_tree = self - .archive - .open_stored_tree(target_band) - .map_err(io::Error::other)?; - let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); - - let file_hunk = { - let mut hunk_helper = self.hunk_helper.lock().unwrap(); - let hunk_helper = hunk_helper + let (stored_tree, file_hunk) = { + let mut cache = self.cache.lock().unwrap(); + let stored_tree = cache + .get_or_open_tree(target_band) + .map_err(io::Error::other)? + .clone(); + + let hunk_helper = cache .get_or_create_helper(&stored_tree) .map_err(io::Error::other)?; - hunk_helper + let file_hunk = hunk_helper .find_hunk_for_file(&target_path) - .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))? + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; + + (stored_tree, file_hunk) }; let index_entry = stored_tree @@ -452,7 +468,15 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { .find(|entry| entry.apath == target_path) .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; - trace!("Serving {}/{}", stored_tree.band().id(), target_path); + let file_size: u64 = index_entry.addrs.iter().map(|addr| addr.len).sum(); + + info!( + "Serving {}/{} ({}/{} bytes)", + stored_tree.band().id(), + target_path, + length, + file_size + ); let reader = StoredFileReader::new( stored_tree, index_entry, @@ -482,19 +506,24 @@ pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { } let source = ArchiveProjectionSource { - archive, - hunk_helper: Mutex::new(ProjectionHunkHelper { + archive: archive.clone(), + cache: Mutex::new(ProjectionCache { + archive, + hunks: Default::default(), + trees: Default::default(), }), }; let _projection = ProjectedFileSystem::new(destination, source)?; + info!("Projection started at {}.", destination.display()); { println!("Press any key to stop the projection..."); let mut stdin = io::stdin(); let _ = stdin.read(&mut [0u8]).unwrap(); } + info!("Stopping projection."); if clean { debug!("Removing destination {}", destination.display()); if let Err(err) = fs::remove_dir_all(destination) { From e88514bb7edae0a418b108a1dbb32dd759c31ba0 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 30 Dec 2023 14:30:56 +0100 Subject: [PATCH 04/25] Supporting file timestamps, mark files read only and improved projection root cleanup --- src/index.rs | 5 +++ src/mount.rs | 105 +++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 90 insertions(+), 20 deletions(-) diff --git a/src/index.rs b/src/index.rs index 49de5d46..79394563 100644 --- a/src/index.rs +++ b/src/index.rs @@ -303,6 +303,11 @@ impl IndexRead { } } + /// Clone the read index. + /// Note: + /// This has several side effects: + /// - Depending on the implementation of the decompressor, duplicate might not be a cheap option. + /// - Every read index has its own unique read stats, therefore the clone does not inherit the read stats. pub(crate) fn duplicate(&self) -> Self { Self::open(self.transport.clone()) } diff --git a/src/mount.rs b/src/mount.rs index ef56b191..e528222d 100644 --- a/src/mount.rs +++ b/src/mount.rs @@ -7,16 +7,19 @@ use std::{ fs, io::{self, ErrorKind, Read}, iter::Peekable, + ops::ControlFlow, path::{Component, Path}, sync::{Arc, Mutex}, + time::Duration, }; -use tracing::info; +use tracing::{info, trace}; use bytes::Bytes; use itertools::Itertools; use tracing::{debug, warn}; use windows_projfs::{ - DirectoryEntry, DirectoryInfo, FileInfo, ProjectedFileSystem, ProjectedFileSystemSource, + DirectoryEntry, DirectoryInfo, FileInfo, Notification, ProjectedFileSystem, + ProjectedFileSystemSource, }; use crate::{ @@ -250,6 +253,19 @@ impl Read for StoredFileReader { } } +const UNIX_WIN_DIFF_SECS: i64 = 11644473600; +fn unix_time_to_windows(unix_seconds: i64, unix_nanos: u32) -> u64 { + if unix_seconds < -UNIX_WIN_DIFF_SECS { + return 0; + } + + let win_seconds = (unix_seconds + UNIX_WIN_DIFF_SECS) as u64; + return win_seconds * 1_000_000_000 / 100 + (unix_nanos / 100) as u64; +} + +/* https://learn.microsoft.com/en-us/windows/win32/fileio/file-attribute-constants */ +const FILE_ATTRIBUTE_READONLY: u32 = 0x00000001; + impl Into> for IndexEntry { fn into(self) -> Option { let file_name = self.apath.split("/").last()?; @@ -265,6 +281,12 @@ impl Into> for IndexEntry { FileInfo { file_name: file_name.to_string(), file_size: self.addrs.iter().map(|block| block.len).sum(), + file_attributes: FILE_ATTRIBUTE_READONLY, + + /* currently conserve does not differentiate between the different time stamps */ + creation_time: unix_time_to_windows(self.mtime, self.mtime_nanos), + last_access_time: unix_time_to_windows(self.mtime, self.mtime_nanos), + last_write_time: unix_time_to_windows(self.mtime, self.mtime_nanos), ..Default::default() } @@ -339,6 +361,7 @@ impl ArchiveProjectionSource { } fn serve_dir(&self, path: &Path) -> Result> { + trace!("serve_dir {}", path.display()); let mut components = path .components() .map(Component::as_os_str) @@ -412,25 +435,13 @@ impl ArchiveProjectionSource { Ok(entries) } -} - -impl ProjectedFileSystemSource for ArchiveProjectionSource { - fn list_directory(&self, path: &Path) -> Vec { - match self.serve_dir(path) { - Ok(entries) => entries, - Err(error) => { - warn!("Failed to serve {}: {}", path.display(), error); - vec![] - } - } - } - fn stream_file_content( + fn serve_file( &self, path: &Path, byte_offset: usize, length: usize, - ) -> std::io::Result> { + ) -> io::Result> { let mut components = path .components() .map(Component::as_os_str) @@ -471,7 +482,7 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { let file_size: u64 = index_entry.addrs.iter().map(|addr| addr.len).sum(); info!( - "Serving {}/{} ({}/{} bytes)", + "Serving {}{} ({}/{} bytes)", stored_tree.band().id(), target_path, length, @@ -488,6 +499,49 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { } } +impl ProjectedFileSystemSource for ArchiveProjectionSource { + fn list_directory(&self, path: &Path) -> Vec { + match self.serve_dir(path) { + Ok(entries) => entries, + Err(error) => { + warn!("Failed to serve {}: {}", path.display(), error); + vec![] + } + } + } + + fn stream_file_content( + &self, + path: &Path, + byte_offset: usize, + length: usize, + ) -> std::io::Result> { + trace!("stream_file_content {}", path.display()); + match self.serve_file(path, byte_offset, length) { + Ok(reader) => Ok(reader), + Err(error) => { + if error.kind() != ErrorKind::NotFound { + warn!("Failed to serve file {}: {}", path.display(), error); + } + + Err(error) + } + } + } + + fn handle_notification(&self, notification: &Notification) -> ControlFlow<()> { + if notification.is_cancelable() + && !matches!(notification, Notification::FilePreConvertToFull(_)) + { + /* try to cancel everything, except retriving data */ + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + } +} + +const ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE: i32 = 369; pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { if clean { if destination.exists() { @@ -514,8 +568,8 @@ pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { trees: Default::default(), }), }; - let _projection = ProjectedFileSystem::new(destination, source)?; + let projection = ProjectedFileSystem::new(destination, source)?; info!("Projection started at {}.", destination.display()); { println!("Press any key to stop the projection..."); @@ -524,10 +578,21 @@ pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { } info!("Stopping projection."); + drop(projection); + if clean { debug!("Removing destination {}", destination.display()); - if let Err(err) = fs::remove_dir_all(destination) { - warn!("Failed to clean up projection destination: {}", err); + let mut attempt_count = 0; + while let Err(err) = fs::remove_dir_all(destination) { + attempt_count += 1; + if err.raw_os_error().unwrap_or_default() + != ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE + || attempt_count > 5 + { + warn!("Failed to clean up projection destination: {}", err); + break; + } + std::thread::sleep(Duration::from_secs(1)); } } From 16a37aa1012a55a82a93a655bc5241309227c159 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 30 Dec 2023 20:37:19 +0100 Subject: [PATCH 05/25] Enabling the mount command by default for all Windows platforms --- Cargo.lock | 15 +++++++++++++-- Cargo.toml | 5 +---- src/bin/conserve.rs | 4 ++-- src/errors.rs | 2 +- src/lib.rs | 4 ++-- src/mount.rs | 19 +++++++++++++++++-- 6 files changed, 36 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3c47db5..92dccbb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1410,6 +1410,16 @@ version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +[[package]] +name = "libloading" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161" +dependencies = [ + "cfg-if", + "windows-sys", +] + [[package]] name = "libm" version = "0.2.8" @@ -2931,10 +2941,11 @@ dependencies = [ [[package]] name = "windows-projfs" -version = "0.1.3" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcfbe27be9d35b2a49eec282e0ab98fa7d538d22f964da23c81f122e89baeda0" +checksum = "90f405570663a441ceb10b307fc4a0230ca2bec02d0691f2e422d1d67607a44a" dependencies = [ + "libloading", "log", "parking_lot", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index f58adf0d..6ee1b1b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ uzers = "0.11" nix = { version = "0.27", features = ["fs", "user"] } [target.'cfg(windows)'.dependencies] -windows-projfs = { version = "0.1.3", optional = true } +windows-projfs = { version = "0.1.5", features = ["dynamic-import"] } [dependencies.clap] version = "4.3" @@ -107,9 +107,6 @@ s3 = [ "dep:tokio", ] s3-integration-test = ["s3"] -mount-archive = [ - "dep:windows-projfs" -] [lib] doctest = false diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index 3a1cd5cf..3ee84f36 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -171,7 +171,7 @@ enum Command { }, /// Mount the archive as projection. - #[cfg(feature = "mount-archive")] + #[cfg(windows)] Mount { /// The archive to mount archive: String, @@ -472,7 +472,7 @@ impl Command { show::show_entry_names(entry_iter, &mut stdout, *long_listing)?; } } - #[cfg(feature = "mount-archive")] + #[cfg(windows)] Command::Mount { archive, destination, diff --git a/src/errors.rs b/src/errors.rs index a0178070..1195e8ad 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -159,7 +159,7 @@ pub enum Error { source: transport::Error, }, - #[cfg(feature = "mount-archive")] + #[cfg(windows)] #[error(transparent)] Projection { #[from] diff --git a/src/lib.rs b/src/lib.rs index 3e725d6f..f5024cce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,7 @@ pub mod live_tree; mod merge; pub mod misc; pub mod monitor; -#[cfg(feature = "mount-archive")] +#[cfg(windows)] mod mount; pub mod owner; pub mod restore; @@ -71,7 +71,7 @@ pub use crate::kind::Kind; pub use crate::live_tree::LiveTree; pub use crate::merge::MergeTrees; pub use crate::misc::bytes_to_human_mb; -#[cfg(feature = "mount-archive")] +#[cfg(windows)] pub use crate::mount::mount; pub use crate::owner::Owner; pub use crate::restore::{restore, RestoreOptions}; diff --git a/src/mount.rs b/src/mount.rs index e528222d..1475e4fc 100644 --- a/src/mount.rs +++ b/src/mount.rs @@ -36,7 +36,10 @@ use crate::{ macro_rules! static_dir { ($name:expr) => { DirectoryInfo { - name: ($name).to_string(), + directory_name: ($name).to_string(), + directory_attributes: DIRECTORY_ATTRIBUTES, + + ..Default::default() } .into() }; @@ -265,6 +268,12 @@ fn unix_time_to_windows(unix_seconds: i64, unix_nanos: u32) -> u64 { /* https://learn.microsoft.com/en-us/windows/win32/fileio/file-attribute-constants */ const FILE_ATTRIBUTE_READONLY: u32 = 0x00000001; +const FILE_ATTRIBUTE_DIRECTORY: u32 = 0x00000010; +const FILE_ATTRIBUTE_NOT_CONTENT_INDEXED: u32 = 0x00002000; +const FILE_ATTRIBUTE_RECALL_ON_OPEN: u32 = 0x00040000; + +/* Note: Using FILE_ATTRIBUTE_READONLY on directories will cause the explorer to *always* list all second level subdirectory entries */ +const DIRECTORY_ATTRIBUTES: u32 = FILE_ATTRIBUTE_DIRECTORY | FILE_ATTRIBUTE_NOT_CONTENT_INDEXED | FILE_ATTRIBUTE_RECALL_ON_OPEN; impl Into> for IndexEntry { fn into(self) -> Option { @@ -272,7 +281,13 @@ impl Into> for IndexEntry { if self.kind == Kind::Dir { Some( DirectoryInfo { - name: file_name.to_string(), + directory_name: file_name.to_string(), + directory_attributes: DIRECTORY_ATTRIBUTES, + + /* currently conserve does not differentiate between the different time stamps */ + creation_time: unix_time_to_windows(self.mtime, self.mtime_nanos), + last_access_time: unix_time_to_windows(self.mtime, self.mtime_nanos), + last_write_time: unix_time_to_windows(self.mtime, self.mtime_nanos), } .into(), ) From 6b7d7b6942d5dfb0821471da3977120f87584e00 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 30 Dec 2023 21:23:36 +0100 Subject: [PATCH 06/25] Improving cache to speed up directory and file serving --- src/mount.rs | 283 ++++++++++++++++++++++++++++----------------------- 1 file changed, 158 insertions(+), 125 deletions(-) diff --git a/src/mount.rs b/src/mount.rs index 1475e4fc..a6ead81c 100644 --- a/src/mount.rs +++ b/src/mount.rs @@ -1,18 +1,19 @@ +use lru::LruCache; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::{ borrow::Cow, cmp::Ordering, - collections::{btree_map::Entry, BTreeMap}, ffi::OsStr, fs, io::{self, ErrorKind, Read}, iter::Peekable, + num::NonZeroUsize, ops::ControlFlow, - path::{Component, Path}, + path::{Component, Path, PathBuf}, sync::{Arc, Mutex}, time::Duration, }; -use tracing::{info, trace}; +use tracing::{error, info, trace}; use bytes::Bytes; use itertools::Itertools; @@ -24,13 +25,11 @@ use windows_projfs::{ use crate::{ counters::Counter, - index::IndexEntryIter, monitor::{ task::{Task, TaskList}, Monitor, Problem, }, - Apath, Archive, BandId, BandSelectionPolicy, Exclude, IndexEntry, IndexRead, Kind, Result, - StoredTree, + Apath, Archive, BandId, BandSelectionPolicy, IndexEntry, IndexRead, Kind, Result, StoredTree, }; macro_rules! static_dir { @@ -273,94 +272,118 @@ const FILE_ATTRIBUTE_NOT_CONTENT_INDEXED: u32 = 0x00002000; const FILE_ATTRIBUTE_RECALL_ON_OPEN: u32 = 0x00040000; /* Note: Using FILE_ATTRIBUTE_READONLY on directories will cause the explorer to *always* list all second level subdirectory entries */ -const DIRECTORY_ATTRIBUTES: u32 = FILE_ATTRIBUTE_DIRECTORY | FILE_ATTRIBUTE_NOT_CONTENT_INDEXED | FILE_ATTRIBUTE_RECALL_ON_OPEN; - -impl Into> for IndexEntry { - fn into(self) -> Option { - let file_name = self.apath.split("/").last()?; - if self.kind == Kind::Dir { - Some( - DirectoryInfo { - directory_name: file_name.to_string(), - directory_attributes: DIRECTORY_ATTRIBUTES, - - /* currently conserve does not differentiate between the different time stamps */ - creation_time: unix_time_to_windows(self.mtime, self.mtime_nanos), - last_access_time: unix_time_to_windows(self.mtime, self.mtime_nanos), - last_write_time: unix_time_to_windows(self.mtime, self.mtime_nanos), - } - .into(), - ) - } else if self.kind == Kind::File { - Some( - FileInfo { - file_name: file_name.to_string(), - file_size: self.addrs.iter().map(|block| block.len).sum(), - file_attributes: FILE_ATTRIBUTE_READONLY, - - /* currently conserve does not differentiate between the different time stamps */ - creation_time: unix_time_to_windows(self.mtime, self.mtime_nanos), - last_access_time: unix_time_to_windows(self.mtime, self.mtime_nanos), - last_write_time: unix_time_to_windows(self.mtime, self.mtime_nanos), - - ..Default::default() - } - .into(), - ) - } else if self.kind == Kind::Symlink { - /* - * Awaiting https://github.com/WolverinDEV/windows-projfs/issues/3 to be resolved - * before we can implement symlinks. - */ - None - } else { - None - } +const DIRECTORY_ATTRIBUTES: u32 = + FILE_ATTRIBUTE_DIRECTORY | FILE_ATTRIBUTE_NOT_CONTENT_INDEXED | FILE_ATTRIBUTE_RECALL_ON_OPEN; + +fn index_entry_to_directory_entry(entry: &IndexEntry) -> Option { + let file_name = entry.apath.split("/").last()?; + if entry.kind == Kind::Dir { + Some( + DirectoryInfo { + directory_name: file_name.to_string(), + directory_attributes: DIRECTORY_ATTRIBUTES, + + /* currently conserve does not differentiate between the different time stamps */ + creation_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + last_access_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + last_write_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + } + .into(), + ) + } else if entry.kind == Kind::File { + Some( + FileInfo { + file_name: file_name.to_string(), + file_size: entry.addrs.iter().map(|block| block.len).sum(), + file_attributes: FILE_ATTRIBUTE_READONLY, + + /* currently conserve does not differentiate between the different time stamps */ + creation_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + last_access_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + last_write_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), + + ..Default::default() + } + .into(), + ) + } else if entry.kind == Kind::Symlink { + /* + * Awaiting https://github.com/WolverinDEV/windows-projfs/issues/3 to be resolved + * before we can implement symlinks. + */ + None + } else { + None } } -struct ProjectionCache { +struct ArchiveProjectionSource { archive: Archive, - hunks: BTreeMap, - trees: BTreeMap>, + + stored_tree_cache: Mutex>>, + + hunk_helper_cache: Mutex>>, + + /* + * Cache the last accessed hunks to improve directory travesal speed. + */ + hunk_content_cache: Mutex>>>, + + /* + * The Windows file explorer has the tendency to query some directories multiple times in a row. + * Also if the user navigates up/down, allow this cache to help. + */ + serve_dir_cache: Mutex>>, } -impl ProjectionCache { - pub fn get_or_open_tree(&mut self, policy: BandSelectionPolicy) -> Result<&Arc> { +impl ArchiveProjectionSource { + pub fn load_hunk_contents( + &self, + stored_tree: &StoredTree, + hunk_id: u32, + ) -> Result>> { + let band_id = stored_tree.band().id(); + self.hunk_content_cache + .lock() + .unwrap() + .try_get_or_insert((band_id, hunk_id), || { + let mut index = stored_tree.band().index(); + Ok(Arc::new(index.read_hunk(hunk_id)?.unwrap_or_default())) + }) + .cloned() + } + + pub fn get_or_open_tree(&self, policy: BandSelectionPolicy) -> Result> { let band_id = self.archive.resolve_band_id(policy)?; - match self.trees.entry(band_id) { - Entry::Occupied(entry) => Ok(entry.into_mut()), - Entry::Vacant(entry) => { + self.stored_tree_cache + .lock() + .unwrap() + .try_get_or_insert(band_id, || { info!("Opening band {}", band_id); let stored_tree = self .archive .open_stored_tree(BandSelectionPolicy::Specified(band_id))?; - Ok(entry.insert(Arc::new(stored_tree))) - } - } + Ok(Arc::new(stored_tree)) + }) + .cloned() } - pub fn get_or_create_helper(&mut self, stored_tree: &StoredTree) -> Result<&HunkHelper> { - match self.hunks.entry(stored_tree.band().id()) { - Entry::Occupied(entry) => Ok(entry.into_mut()), - Entry::Vacant(entry) => { + pub fn get_or_create_hunk_helper(&self, stored_tree: &StoredTree) -> Result> { + let band_id = stored_tree.band().id(); + self.hunk_helper_cache + .lock() + .unwrap() + .try_get_or_insert(band_id, || { info!("Caching files for band {}", stored_tree.band().id()); let helper = HunkHelper::from_index(&stored_tree.band().index())?; - Ok(entry.insert(helper)) - } - } + Ok(Arc::new(helper)) + }) + .cloned() } -} -struct ArchiveProjectionSource { - archive: Archive, - cache: Mutex, -} - -impl ArchiveProjectionSource { fn parse_path_band_policy( components: &mut dyn Iterator>, ) -> Option { @@ -376,7 +399,8 @@ impl ArchiveProjectionSource { } fn serve_dir(&self, path: &Path) -> Result> { - trace!("serve_dir {}", path.display()); + debug!("Serving directory {}", path.display()); + let mut components = path .components() .map(Component::as_os_str) @@ -407,25 +431,25 @@ impl ArchiveProjectionSource { }; let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); - let (stored_tree, dir_hunks) = { - let mut cache = self.cache.lock().unwrap(); - let stored_tree = cache.get_or_open_tree(target_band)?.clone(); - let hunk_helper = cache.get_or_create_helper(&stored_tree)?; - let dir_hunks = hunk_helper.find_hunks_for_subdir(&target_path, false); + let stored_tree = self.get_or_open_tree(target_band)?; + let hunk_helper = self.get_or_create_hunk_helper(&stored_tree)?; + let dir_hunks = hunk_helper.find_hunks_for_subdir(&target_path, false); - (stored_tree, dir_hunks) - }; + let hunks = dir_hunks + .into_iter() + .flat_map(|hunk_id| self.load_hunk_contents(&stored_tree, hunk_id).ok()) + .collect_vec(); - let tree_index = stored_tree.band().index(); - let iterator = IndexEntryIter::new( - tree_index.iter_hunks(dir_hunks.into_iter()), - target_path.clone(), - Exclude::nothing(), - ); + let iterator = hunks.iter().map(|e| &**e).flatten(); let path_prefix = target_path.to_string(); let entries = iterator .filter(|entry| { + if !entry.apath.starts_with(&path_prefix) { + /* Not the directory we're interested in */ + return false; + } + if entry.apath.len() <= path_prefix.len() { /* * Skipping the containing directory entry which is eqal to path_prefix. @@ -445,7 +469,7 @@ impl ArchiveProjectionSource { true }) - .filter_map(IndexEntry::into) + .filter_map(index_entry_to_directory_entry) .collect_vec(); Ok(entries) @@ -466,37 +490,28 @@ impl ArchiveProjectionSource { .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); - let (stored_tree, file_hunk) = { - let mut cache = self.cache.lock().unwrap(); - let stored_tree = cache - .get_or_open_tree(target_band) - .map_err(io::Error::other)? - .clone(); - - let hunk_helper = cache - .get_or_create_helper(&stored_tree) - .map_err(io::Error::other)?; - - let file_hunk = hunk_helper - .find_hunk_for_file(&target_path) - .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; - - (stored_tree, file_hunk) - }; + let stored_tree = self + .get_or_open_tree(target_band) + .map_err(io::Error::other)?; + + let hunk_helper = self + .get_or_create_hunk_helper(&stored_tree) + .map_err(io::Error::other)?; + let file_hunk = hunk_helper + .find_hunk_for_file(&target_path) + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; - let index_entry = stored_tree - .band() - .index() - .read_hunk(file_hunk) + let index_entry = self + .load_hunk_contents(&stored_tree, file_hunk) .map_err(io::Error::other)? - .unwrap_or_default() - .into_iter() + .iter() .find(|entry| entry.apath == target_path) - .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; + .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))? + .clone(); let file_size: u64 = index_entry.addrs.iter().map(|addr| addr.len).sum(); - info!( + debug!( "Serving {}{} ({}/{} bytes)", stored_tree.band().id(), target_path, @@ -516,8 +531,25 @@ impl ArchiveProjectionSource { impl ProjectedFileSystemSource for ArchiveProjectionSource { fn list_directory(&self, path: &Path) -> Vec { + let cached_result = self + .serve_dir_cache + .lock() + .ok() + .and_then(|mut cache| cache.get(path).cloned()); + + if let Some(cached_result) = cached_result { + return cached_result; + } + match self.serve_dir(path) { - Ok(entries) => entries, + Ok(entries) => { + self.serve_dir_cache + .lock() + .unwrap() + .push(path.to_owned(), entries.clone()); + + entries + } Err(error) => { warn!("Failed to serve {}: {}", path.display(), error); vec![] @@ -560,34 +592,35 @@ const ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE: i32 = 369; pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { if clean { if destination.exists() { - eprintln!("The destination already exists."); - eprintln!("Please ensure, that the destination does not exists."); + error!("The destination already exists."); + error!("Please ensure, that the destination does not exists."); return Ok(()); } fs::create_dir_all(destination)?; } else { if !destination.exists() { - eprintln!("The destination does not exists."); - eprintln!("Please ensure, that the destination does exist prior mounting."); + error!("The destination does not exists."); + error!("Please ensure, that the destination does exist prior mounting."); return Ok(()); } } let source = ArchiveProjectionSource { archive: archive.clone(), - cache: Mutex::new(ProjectionCache { - archive, - hunks: Default::default(), - trees: Default::default(), - }), + /* cache at most 16 different bands in parallel */ + stored_tree_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())), + hunk_helper_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())), + + hunk_content_cache: Mutex::new(LruCache::new(NonZeroUsize::new(64).unwrap())), + serve_dir_cache: Mutex::new(LruCache::new(NonZeroUsize::new(32).unwrap())), }; let projection = ProjectedFileSystem::new(destination, source)?; info!("Projection started at {}.", destination.display()); { - println!("Press any key to stop the projection..."); + info!("Press any key to stop the projection..."); let mut stdin = io::stdin(); let _ = stdin.read(&mut [0u8]).unwrap(); } From eb1e814158ecf49be86f10f8066229ad902f7470 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 30 Dec 2023 21:33:23 +0100 Subject: [PATCH 07/25] Indicating backup timestamp trough directory modify timestamps --- src/mount.rs | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/src/mount.rs b/src/mount.rs index a6ead81c..6197123d 100644 --- a/src/mount.rs +++ b/src/mount.rs @@ -398,6 +398,25 @@ impl ArchiveProjectionSource { } } + fn band_id_to_directory_info(&self, policy: BandSelectionPolicy) -> Option { + let stored_tree = self.get_or_open_tree(policy).ok()?; + let band_info = stored_tree.band().get_info().ok()?; + + let timestamp = unix_time_to_windows( + band_info.start_time.unix_timestamp(), + band_info.start_time.unix_timestamp_nanos() as u32, + ); + + Some(DirectoryInfo { + directory_name: format!("{}", band_info.id), + directory_attributes: DIRECTORY_ATTRIBUTES, + + creation_time: timestamp, + last_access_time: timestamp, + last_write_time: timestamp, + }) + } + fn serve_dir(&self, path: &Path) -> Result> { debug!("Serving directory {}", path.display()); @@ -409,7 +428,15 @@ impl ArchiveProjectionSource { let target_band = match components.next().as_deref() { None => { /* Virtual root, display channel selection */ - return Ok(vec![static_dir!("latest"), static_dir!("all")]); + let mut entries = Vec::with_capacity(2); + entries.push(static_dir!("all")); + if let Some(mut info) = self.band_id_to_directory_info(BandSelectionPolicy::Latest) + { + info.directory_name = "latest".to_string(); + entries.push(DirectoryEntry::Directory(info)) + } + + return Ok(entries); } Some("latest") => BandSelectionPolicy::Latest, Some("all") => { @@ -421,7 +448,10 @@ impl ArchiveProjectionSource { .archive .list_band_ids()? .into_iter() - .map(|band_id| static_dir!(format!("{}", band_id))) + .filter_map(|band_id| { + self.band_id_to_directory_info(BandSelectionPolicy::Specified(band_id)) + .map(DirectoryEntry::Directory) + }) .collect(); return Ok(entries); From 76535be24384403955be5beb8b6989e4742057f1 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 30 Dec 2023 21:34:12 +0100 Subject: [PATCH 08/25] Bumped windows-projfs version to v0.1.6 --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92dccbb9..d6d342b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2941,9 +2941,9 @@ dependencies = [ [[package]] name = "windows-projfs" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90f405570663a441ceb10b307fc4a0230ca2bec02d0691f2e422d1d67607a44a" +checksum = "ac3c87ccd8945e09a06fbdc0104d06537c05f6e1da7463a5f4194189c744392f" dependencies = [ "libloading", "log", diff --git a/Cargo.toml b/Cargo.toml index 6ee1b1b9..3528238f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ uzers = "0.11" nix = { version = "0.27", features = ["fs", "user"] } [target.'cfg(windows)'.dependencies] -windows-projfs = { version = "0.1.5", features = ["dynamic-import"] } +windows-projfs = { version = "0.1.6", features = ["dynamic-import"] } [dependencies.clap] version = "4.3" From bc6cd1b503f40fc7e5f4879322fc01e327fc4add Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sun, 31 Dec 2023 12:26:10 +0100 Subject: [PATCH 09/25] Mount implementation polishing --- src/mount.rs | 77 ++++++++++++++++++---------------------------------- 1 file changed, 27 insertions(+), 50 deletions(-) diff --git a/src/mount.rs b/src/mount.rs index 6197123d..9b31394c 100644 --- a/src/mount.rs +++ b/src/mount.rs @@ -109,9 +109,9 @@ impl HunkHelper { Ok(Self { hunks: hunk_info }) } - pub fn find_hunk_for_file(&self, path: &Apath) -> Option { + pub fn find_hunk_index_for_file(&self, path: &Apath) -> Option { let hunk_index = self.hunks.binary_search_by(|entry| { - match (entry.start_path.cmp(&path), entry.end_path.cmp(&path)) { + match (entry.start_path.cmp(path), entry.end_path.cmp(path)) { (Ordering::Less, Ordering::Less) => Ordering::Less, (Ordering::Greater, Ordering::Greater) => Ordering::Greater, _ => Ordering::Equal, @@ -126,10 +126,15 @@ impl HunkHelper { if hunk_index >= self.hunks.len() { None } else { - Some(self.hunks[hunk_index].index) + Some(hunk_index) } } + pub fn find_hunk_for_file(&self, path: &Apath) -> Option { + self.find_hunk_index_for_file(path) + .map(|index| self.hunks[index].index) + } + pub fn find_hunks_for_subdir(&self, path: &Apath, recursive: bool) -> Vec { /* * Appending an empty string to the path allows us to search for the first file @@ -144,24 +149,11 @@ impl HunkHelper { * - /b/b/c */ let search_path = path.append(""); - let directory_start_hunk = match self.hunks.binary_search_by(|entry| { - match ( - entry.start_path.cmp(&search_path), - entry.end_path.cmp(&search_path), - ) { - (Ordering::Less, Ordering::Less) => Ordering::Less, - (Ordering::Greater, Ordering::Greater) => Ordering::Greater, - _ => Ordering::Equal, - } - }) { - Ok(hunk) => hunk, - Err(hunk) => hunk, + let directory_start_hunk = match self.find_hunk_index_for_file(&search_path) { + Some(index) => index, + None => return vec![], }; - if directory_start_hunk >= self.hunks.len() { - return vec![]; - } - let mut result = Vec::new(); result.push(self.hunks[directory_start_hunk].index); for hunk in &self.hunks[directory_start_hunk + 1..] { @@ -169,11 +161,9 @@ impl HunkHelper { break; } - if !recursive { - if hunk.start_path[path.len() + 1..].contains("/") { - /* hunk does already contain directory content */ - break; - } + if !recursive && hunk.start_path[path.len() + 1..].contains('/') { + /* hunk does already contain directory content */ + break; } /* hunk still contains subtree elements of that path */ @@ -262,7 +252,7 @@ fn unix_time_to_windows(unix_seconds: i64, unix_nanos: u32) -> u64 { } let win_seconds = (unix_seconds + UNIX_WIN_DIFF_SECS) as u64; - return win_seconds * 1_000_000_000 / 100 + (unix_nanos / 100) as u64; + win_seconds * 1_000_000_000 / 100 + (unix_nanos / 100) as u64 } /* https://learn.microsoft.com/en-us/windows/win32/fileio/file-attribute-constants */ @@ -276,7 +266,7 @@ const DIRECTORY_ATTRIBUTES: u32 = FILE_ATTRIBUTE_DIRECTORY | FILE_ATTRIBUTE_NOT_CONTENT_INDEXED | FILE_ATTRIBUTE_RECALL_ON_OPEN; fn index_entry_to_directory_entry(entry: &IndexEntry) -> Option { - let file_name = entry.apath.split("/").last()?; + let file_name = entry.apath.split('/').last()?; if entry.kind == Kind::Dir { Some( DirectoryInfo { @@ -301,17 +291,9 @@ fn index_entry_to_directory_entry(entry: &IndexEntry) -> Option creation_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), last_access_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), last_write_time: unix_time_to_windows(entry.mtime, entry.mtime_nanos), - - ..Default::default() } .into(), ) - } else if entry.kind == Kind::Symlink { - /* - * Awaiting https://github.com/WolverinDEV/windows-projfs/issues/3 to be resolved - * before we can implement symlinks. - */ - None } else { None } @@ -327,6 +309,7 @@ struct ArchiveProjectionSource { /* * Cache the last accessed hunks to improve directory travesal speed. */ + #[allow(clippy::type_complexity)] hunk_content_cache: Mutex>>>, /* @@ -359,7 +342,7 @@ impl ArchiveProjectionSource { .lock() .unwrap() .try_get_or_insert(band_id, || { - info!("Opening band {}", band_id); + debug!("Opening band {}", band_id); let stored_tree = self .archive @@ -376,6 +359,7 @@ impl ArchiveProjectionSource { .lock() .unwrap() .try_get_or_insert(band_id, || { + /* Inform the user that this band has been cached as this is most likely a heavy operaton (cpu and memory wise) */ info!("Caching files for band {}", stored_tree.band().id()); let helper = HunkHelper::from_index(&stored_tree.band().index())?; @@ -391,8 +375,7 @@ impl ArchiveProjectionSource { Some("latest") => Some(BandSelectionPolicy::Latest), Some("all") => components .next() - .map(|band_id| band_id.parse::().ok()) - .flatten() + .and_then(|band_id| band_id.parse::().ok()) .map(BandSelectionPolicy::Specified), _ => None, } @@ -470,7 +453,7 @@ impl ArchiveProjectionSource { .flat_map(|hunk_id| self.load_hunk_contents(&stored_tree, hunk_id).ok()) .collect_vec(); - let iterator = hunks.iter().map(|e| &**e).flatten(); + let iterator = hunks.iter().flat_map(|e| &**e); let path_prefix = target_path.to_string(); let entries = iterator @@ -483,16 +466,12 @@ impl ArchiveProjectionSource { if entry.apath.len() <= path_prefix.len() { /* * Skipping the containing directory entry which is eqal to path_prefix. - * - * Note: - * We're not filtering for entries which are not contained within target_path as the - * IndexEntryIter already does this. */ return false; } - let file_name = &entry.apath[path_prefix.len()..].trim_start_matches("/"); - if file_name.contains("/") { + let file_name = &entry.apath[path_prefix.len()..].trim_start_matches('/'); + if file_name.contains('/') { /* entry is a file which is within a sub-directory */ return false; } @@ -628,12 +607,10 @@ pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { } fs::create_dir_all(destination)?; - } else { - if !destination.exists() { - error!("The destination does not exists."); - error!("Please ensure, that the destination does exist prior mounting."); - return Ok(()); - } + } else if !destination.exists() { + error!("The destination does not exists."); + error!("Please ensure, that the destination does exist prior mounting."); + return Ok(()); } let source = ArchiveProjectionSource { From e2d2f07c4d91bd525807489f7061e719df987972 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sun, 31 Dec 2023 12:27:29 +0100 Subject: [PATCH 10/25] Moving unix specific test imports into the function itself to avoid unused imports compile warnings on Windows --- tests/api/restore.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/api/restore.rs b/tests/api/restore.rs index c31baf40..2546dfd1 100644 --- a/tests/api/restore.rs +++ b/tests/api/restore.rs @@ -13,13 +13,8 @@ //! Tests focused on restore. use std::cell::RefCell; -#[cfg(unix)] -use std::fs::{read_link, symlink_metadata}; -use std::path::PathBuf; use conserve::monitor::collect::CollectMonitor; -use filetime::{set_symlink_file_times, FileTime}; -use tempfile::TempDir; use conserve::test_fixtures::ScratchArchive; use conserve::test_fixtures::TreeFixture; @@ -166,6 +161,10 @@ fn exclude_files() { #[cfg(unix)] fn restore_symlink() { use conserve::monitor::collect::CollectMonitor; + use filetime::{set_symlink_file_times, FileTime}; + use std::fs::{read_link, symlink_metadata}; + use std::path::PathBuf; + use tempfile::TempDir; let af = ScratchArchive::new(); let srcdir = TreeFixture::new(); From eddd3696ff71a15d9e23dad5f484aaee7a8cc076 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sun, 31 Dec 2023 12:29:46 +0100 Subject: [PATCH 11/25] Bumped the minimal required rust version to 1.75 --- .github/workflows/rust.yml | 8 +++----- Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 52fb4607..3291ab46 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -26,7 +26,7 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macOS-latest] features: ["", "s3"] - version: [stable, nightly, "1.73"] + version: [stable, nightly, "1.75"] steps: - uses: actions/checkout@v3 @@ -43,8 +43,7 @@ jobs: cargo --version rustc --version - name: Build - run: - cargo build --all-targets --no-default-features --features=${{ + run: cargo build --all-targets --no-default-features --features=${{ matrix.features }} - name: Test run: @@ -77,8 +76,7 @@ jobs: # Can't use --all-features here because BLAKE2 SIMD needs unstable... # Don't use the S3 features because they require AWS credentials for realistic # testing. - run: - cargo mutants -j2 --no-shuffle -vV --cargo-arg --no-default-features + run: cargo mutants -j2 --no-shuffle -vV --cargo-arg --no-default-features - name: Archive results uses: actions/upload-artifact@v3 with: diff --git a/Cargo.toml b/Cargo.toml index 3528238f..3b84f882 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ name = "conserve" readme = "README.md" repository = "https://github.com/sourcefrog/conserve/" version = "23.11.0" -rust-version = "1.73" +rust-version = "1.75" [[bin]] doc = false From 1fd937bb3a36be083a599fbe57527e0069238379 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Mon, 1 Jan 2024 18:51:39 +0100 Subject: [PATCH 12/25] Incorporate PR feedback --- src/bin/conserve.rs | 3 +- src/errors.rs | 3 + src/hunk_index.rs | 133 ++++++++++++++++++++ src/lib.rs | 5 +- src/monitor/mod.rs | 1 + src/monitor/void.rs | 26 ++++ src/mount/mod.rs | 22 ++++ src/{mount.rs => mount/projfs.rs} | 197 +++++------------------------- 8 files changed, 218 insertions(+), 172 deletions(-) create mode 100644 src/hunk_index.rs create mode 100644 src/monitor/void.rs create mode 100644 src/mount/mod.rs rename src/{mount.rs => mount/projfs.rs} (74%) diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index 3ee84f36..a16b9ca9 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -479,7 +479,8 @@ impl Command { cleanup, } => { let archive = Archive::open(open_transport(archive)?)?; - mount(archive, destination, *cleanup)?; + let options = MountOptions { clean: *cleanup }; + mount(archive, destination, options)?; } Command::Restore { archive, diff --git a/src/errors.rs b/src/errors.rs index 1195e8ad..f4f0640c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -136,6 +136,9 @@ pub enum Error { #[error("Unexpected file {path:?} in archive directory")] UnexpectedFile { path: String }, + #[error("This feature is not implemented")] + NotImplemented, + /// Generic IO error. #[error(transparent)] IOError { diff --git a/src/hunk_index.rs b/src/hunk_index.rs new file mode 100644 index 00000000..02972084 --- /dev/null +++ b/src/hunk_index.rs @@ -0,0 +1,133 @@ +use std::cmp::Ordering; + +use rayon::iter::{IntoParallelIterator, ParallelIterator}; + +use crate::{Apath, IndexRead, Result}; + +#[derive(Debug)] +struct HunkIndexMeta { + index: u32, + + start_path: Apath, + end_path: Apath, +} + +/// An index over all available hunks available in an index +/// for speeding up sub-dir iterations and locating +/// path metadata. +pub struct IndexHunkIndex { + hunks: Vec, +} + +impl IndexHunkIndex { + /// Index all available hunks from the read index. + /// + /// Note: + /// Depending on the index size this might not be a cheap operation + /// as we loop trough every hunk and read its contents. + pub fn from_index(index: &IndexRead) -> Result { + let mut hunk_info = index + .hunks_available()? + .into_par_iter() + .map(move |hunk_index| { + let mut index = index.duplicate(); + let entries = index.read_hunk(hunk_index)?; + let meta_info = if let Some(entries) = entries { + if let (Some(first), Some(last)) = (entries.first(), entries.last()) { + Some(HunkIndexMeta { + index: hunk_index, + + start_path: first.apath.clone(), + end_path: last.apath.clone(), + }) + } else { + None + } + } else { + None + }; + + Ok(meta_info) + }) + .map(Result::ok) + .flatten() + .filter_map(|entry| entry) + .collect::>(); + + /* After parallel execution bring all hunks back into order */ + hunk_info.sort_by_key(|info| info.index); + Ok(Self { hunks: hunk_info }) + } + + fn find_hunk_index_for_file(&self, path: &Apath) -> Option { + let hunk_index = self.hunks.binary_search_by(|entry| { + match (entry.start_path.cmp(path), entry.end_path.cmp(path)) { + (Ordering::Less, Ordering::Less) => Ordering::Less, + (Ordering::Greater, Ordering::Greater) => Ordering::Greater, + _ => Ordering::Equal, + } + }); + + /* + * If we do not have an exact match, no hunk contains the path we + * are looking for. + */ + hunk_index.ok() + } + + /// Locate the hunk index of the hunk which should contain the file metadata + /// for a given path. + /// + /// Note: + /// To validate file existence it's required to parse the hunk contents and actually + /// check of the file path exist. This function only returns the hunk index where the file + /// should be located, if it exists. + pub fn find_hunk_for_file(&self, path: &Apath) -> Option { + self.find_hunk_index_for_file(path) + .map(|index| self.hunks[index].index) + } + + /// Locate the hunks where the file metadata for the directory contents of a particular directory + /// are stored. + /// + /// Note: + /// To validate directory existence it's required to parse the hunk contents and actually + /// check of the directory path exist. + pub fn find_hunks_for_subdir(&self, path: &Apath, recursive: bool) -> Vec { + /* + * Appending an empty string to the path allows us to search for the first file + * in the target directory. This is needed as a file and a directory with the same name are not + * stored in succession. + * + * Example (from the apath test): + * - /b/a + * - /b/b + * - /b/c + * - /b/a/c + * - /b/b/c + */ + let search_path = path.append(""); + let directory_start_hunk = match self.find_hunk_index_for_file(&search_path) { + Some(index) => index, + None => return vec![], + }; + + let mut result = Vec::new(); + result.push(self.hunks[directory_start_hunk].index); + for hunk in &self.hunks[directory_start_hunk + 1..] { + if !path.is_prefix_of(&hunk.start_path) { + break; + } + + if !recursive && hunk.start_path[path.len() + 1..].contains('/') { + /* hunk does already contain directory content */ + break; + } + + /* hunk still contains subtree elements of that path */ + result.push(hunk.index); + } + + result + } +} diff --git a/src/lib.rs b/src/lib.rs index f5024cce..3dd07c0d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub mod entry; pub mod errors; pub mod excludes; mod gc_lock; +mod hunk_index; pub mod index; mod io; mod jsonio; @@ -35,7 +36,6 @@ pub mod live_tree; mod merge; pub mod misc; pub mod monitor; -#[cfg(windows)] mod mount; pub mod owner; pub mod restore; @@ -71,8 +71,7 @@ pub use crate::kind::Kind; pub use crate::live_tree::LiveTree; pub use crate::merge::MergeTrees; pub use crate::misc::bytes_to_human_mb; -#[cfg(windows)] -pub use crate::mount::mount; +pub use crate::mount::{mount, MountOptions}; pub use crate::owner::Owner; pub use crate::restore::{restore, RestoreOptions}; pub use crate::show::{show_versions, ShowVersionsOptions}; diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 7e418ffe..44f7f5c5 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -4,6 +4,7 @@ pub mod collect; pub mod task; +pub mod void; use std::fmt::Debug; diff --git a/src/monitor/void.rs b/src/monitor/void.rs new file mode 100644 index 00000000..26f5f5e2 --- /dev/null +++ b/src/monitor/void.rs @@ -0,0 +1,26 @@ +use crate::counters::Counter; + +use super::{ + task::{Task, TaskList}, + Monitor, Problem, +}; + +/// A monitor that does not capture any information. +#[derive(Debug, Clone)] +pub struct VoidMonitor; +impl Monitor for VoidMonitor { + fn count(&self, _counter: Counter, _increment: usize) {} + + fn set_counter(&self, _counter: Counter, _value: usize) {} + + fn problem(&self, _problem: Problem) {} + + fn start_task(&self, name: String) -> Task { + /* + * All data related to the target task will be dropped + * as soon the callee drops the task. + */ + let mut list = TaskList::default(); + list.start_task(name) + } +} diff --git a/src/mount/mod.rs b/src/mount/mod.rs new file mode 100644 index 00000000..dfe99352 --- /dev/null +++ b/src/mount/mod.rs @@ -0,0 +1,22 @@ +use std::path::Path; + +use crate::{Archive, Result}; + +#[cfg(windows)] +mod projfs; + +/// Options for mounting an archive +/// into an existing file systems. +pub struct MountOptions { + /// Create the mount point and delete it + /// when unmounting resulting in a clean environment. + pub clean: bool, +} + +pub fn mount(archive: Archive, destination: &Path, options: MountOptions) -> Result<()> { + #[cfg(windows)] + return projfs::mount(archive, destination, options); + + #[cfg(not(windows))] + return Err(crate::Error::NotImplemented); +} diff --git a/src/mount.rs b/src/mount/projfs.rs similarity index 74% rename from src/mount.rs rename to src/mount/projfs.rs index 9b31394c..b858ec49 100644 --- a/src/mount.rs +++ b/src/mount/projfs.rs @@ -1,8 +1,5 @@ -use lru::LruCache; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::{ borrow::Cow, - cmp::Ordering, ffi::OsStr, fs, io::{self, ErrorKind, Read}, @@ -13,166 +10,23 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; -use tracing::{error, info, trace}; use bytes::Bytes; use itertools::Itertools; -use tracing::{debug, warn}; +use lru::LruCache; +use tracing::{debug, error, info, warn}; use windows_projfs::{ DirectoryEntry, DirectoryInfo, FileInfo, Notification, ProjectedFileSystem, ProjectedFileSystemSource, }; use crate::{ - counters::Counter, - monitor::{ - task::{Task, TaskList}, - Monitor, Problem, - }, - Apath, Archive, BandId, BandSelectionPolicy, IndexEntry, IndexRead, Kind, Result, StoredTree, + hunk_index::IndexHunkIndex, + monitor::{void::VoidMonitor, Monitor}, + Apath, Archive, BandId, BandSelectionPolicy, IndexEntry, Kind, Result, StoredTree, }; -macro_rules! static_dir { - ($name:expr) => { - DirectoryInfo { - directory_name: ($name).to_string(), - directory_attributes: DIRECTORY_ATTRIBUTES, - - ..Default::default() - } - .into() - }; -} - -struct VoidMonitor; -impl Monitor for VoidMonitor { - fn count(&self, _counter: Counter, _increment: usize) {} - - fn set_counter(&self, _counter: Counter, _value: usize) {} - - fn problem(&self, _problem: Problem) {} - - fn start_task(&self, name: String) -> Task { - /* - * All data related to the target task will be dropped - * as soon the callee drops the task. - */ - let mut list = TaskList::default(); - list.start_task(name) - } -} - -#[derive(Debug)] -struct HunkMetaInfo { - index: u32, - - start_path: Apath, - end_path: Apath, -} - -struct HunkHelper { - hunks: Vec, -} - -impl HunkHelper { - pub fn from_index(index: &IndexRead) -> Result { - let mut hunk_info = index - .hunks_available()? - .into_par_iter() - .map(move |hunk_index| { - let mut index = index.duplicate(); - let entries = index.read_hunk(hunk_index)?; - let meta_info = if let Some(entries) = entries { - if let (Some(first), Some(last)) = (entries.first(), entries.last()) { - Some(HunkMetaInfo { - index: hunk_index, - - start_path: first.apath.clone(), - end_path: last.apath.clone(), - }) - } else { - None - } - } else { - None - }; - - Ok(meta_info) - }) - .map(Result::ok) - .flatten() - .filter_map(|entry| entry) - .collect::>(); - - /* After parallel execution bring all hunks back into order */ - hunk_info.sort_by_key(|info| info.index); - Ok(Self { hunks: hunk_info }) - } - - pub fn find_hunk_index_for_file(&self, path: &Apath) -> Option { - let hunk_index = self.hunks.binary_search_by(|entry| { - match (entry.start_path.cmp(path), entry.end_path.cmp(path)) { - (Ordering::Less, Ordering::Less) => Ordering::Less, - (Ordering::Greater, Ordering::Greater) => Ordering::Greater, - _ => Ordering::Equal, - } - }); - - let hunk_index = match hunk_index { - Ok(index) => index, - Err(index) => index, - }; - - if hunk_index >= self.hunks.len() { - None - } else { - Some(hunk_index) - } - } - - pub fn find_hunk_for_file(&self, path: &Apath) -> Option { - self.find_hunk_index_for_file(path) - .map(|index| self.hunks[index].index) - } - - pub fn find_hunks_for_subdir(&self, path: &Apath, recursive: bool) -> Vec { - /* - * Appending an empty string to the path allows us to search for the first file - * in the target directory. This is needed as a file and a directory with the same name are not - * stored in succession. - * - * Example (from the apath test): - * - /b/a - * - /b/b - * - /b/c - * - /b/a/c - * - /b/b/c - */ - let search_path = path.append(""); - let directory_start_hunk = match self.find_hunk_index_for_file(&search_path) { - Some(index) => index, - None => return vec![], - }; - - let mut result = Vec::new(); - result.push(self.hunks[directory_start_hunk].index); - for hunk in &self.hunks[directory_start_hunk + 1..] { - if !path.is_prefix_of(&hunk.start_path) { - break; - } - - if !recursive && hunk.start_path[path.len() + 1..].contains('/') { - /* hunk does already contain directory content */ - break; - } - - /* hunk still contains subtree elements of that path */ - result.push(hunk.index); - } - - result - } -} +use super::MountOptions; struct StoredFileReader { iter: Peekable>>>, @@ -304,7 +158,7 @@ struct ArchiveProjectionSource { stored_tree_cache: Mutex>>, - hunk_helper_cache: Mutex>>, + hunk_index_cache: Mutex>>, /* * Cache the last accessed hunks to improve directory travesal speed. @@ -353,16 +207,19 @@ impl ArchiveProjectionSource { .cloned() } - pub fn get_or_create_hunk_helper(&self, stored_tree: &StoredTree) -> Result> { + pub fn get_or_create_hunk_index( + &self, + stored_tree: &StoredTree, + ) -> Result> { let band_id = stored_tree.band().id(); - self.hunk_helper_cache + self.hunk_index_cache .lock() .unwrap() .try_get_or_insert(band_id, || { /* Inform the user that this band has been cached as this is most likely a heavy operaton (cpu and memory wise) */ info!("Caching files for band {}", stored_tree.band().id()); - let helper = HunkHelper::from_index(&stored_tree.band().index())?; + let helper = IndexHunkIndex::from_index(&stored_tree.band().index())?; Ok(Arc::new(helper)) }) .cloned() @@ -410,9 +267,14 @@ impl ArchiveProjectionSource { let target_band = match components.next().as_deref() { None => { - /* Virtual root, display channel selection */ + /* Virtual root, display band selection */ let mut entries = Vec::with_capacity(2); - entries.push(static_dir!("all")); + entries.push(DirectoryEntry::Directory(DirectoryInfo { + directory_name: "all".to_string(), + directory_attributes: DIRECTORY_ATTRIBUTES, + + ..Default::default() + })); if let Some(mut info) = self.band_id_to_directory_info(BandSelectionPolicy::Latest) { info.directory_name = "latest".to_string(); @@ -445,8 +307,8 @@ impl ArchiveProjectionSource { let target_path = components.fold(Apath::root(), |path, component| path.append(&component)); let stored_tree = self.get_or_open_tree(target_band)?; - let hunk_helper = self.get_or_create_hunk_helper(&stored_tree)?; - let dir_hunks = hunk_helper.find_hunks_for_subdir(&target_path, false); + let hunk_index = self.get_or_create_hunk_index(&stored_tree)?; + let dir_hunks = hunk_index.find_hunks_for_subdir(&target_path, false); let hunks = dir_hunks .into_iter() @@ -503,10 +365,10 @@ impl ArchiveProjectionSource { .get_or_open_tree(target_band) .map_err(io::Error::other)?; - let hunk_helper = self - .get_or_create_hunk_helper(&stored_tree) + let hunk_index = self + .get_or_create_hunk_index(&stored_tree) .map_err(io::Error::other)?; - let file_hunk = hunk_helper + let file_hunk = hunk_index .find_hunk_for_file(&target_path) .ok_or(io::Error::new(ErrorKind::NotFound, "invalid path"))?; @@ -572,7 +434,6 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { byte_offset: usize, length: usize, ) -> std::io::Result> { - trace!("stream_file_content {}", path.display()); match self.serve_file(path, byte_offset, length) { Ok(reader) => Ok(reader), Err(error) => { @@ -598,8 +459,8 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { } const ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE: i32 = 369; -pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { - if clean { +pub fn mount(archive: Archive, destination: &Path, options: MountOptions) -> Result<()> { + if options.clean { if destination.exists() { error!("The destination already exists."); error!("Please ensure, that the destination does not exists."); @@ -618,7 +479,7 @@ pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { /* cache at most 16 different bands in parallel */ stored_tree_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())), - hunk_helper_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())), + hunk_index_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())), hunk_content_cache: Mutex::new(LruCache::new(NonZeroUsize::new(64).unwrap())), serve_dir_cache: Mutex::new(LruCache::new(NonZeroUsize::new(32).unwrap())), @@ -635,7 +496,7 @@ pub fn mount(archive: Archive, destination: &Path, clean: bool) -> Result<()> { info!("Stopping projection."); drop(projection); - if clean { + if options.clean { debug!("Removing destination {}", destination.display()); let mut attempt_count = 0; while let Err(err) = fs::remove_dir_all(destination) { From 3266178413b19f65f9082c83642a2d7c94b9c5bc Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Thu, 1 Aug 2024 13:27:59 +0200 Subject: [PATCH 13/25] Adding tests for the mount archive feature --- src/bin/conserve.rs | 32 +++++- src/errors.rs | 6 ++ src/mount/mod.rs | 13 ++- src/mount/projfs.rs | 89 ++++++++------- tests/mount.rs | 258 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 356 insertions(+), 42 deletions(-) create mode 100644 tests/mount.rs diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index d9547044..044aba73 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -15,7 +15,7 @@ use std::cell::RefCell; use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Write}; +use std::io::{self, BufWriter, Read, Write}; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -188,7 +188,7 @@ enum Command { /// Target folder where the archive should be mounted to destination: PathBuf, - /// Create the target folder and remove all temporarly created + /// Create the target folder and remove all temporarily created /// files on exit #[arg(long, default_value_t = true)] cleanup: bool, @@ -490,7 +490,33 @@ impl Command { } => { let archive = Archive::open(open_transport(archive)?)?; let options = MountOptions { clean: *cleanup }; - mount(archive, destination, options)?; + let projection = match mount(archive, destination, options) { + Ok(handle) => handle, + Err(Error::MountDestinationExists) => { + error!("The destination already exists."); + error!("Please ensure, that the destination does not exists."); + return Ok(ExitCode::Failure); + } + Err(Error::MountDestinationDoesNotExists) => { + error!("The destination does not exists."); + error!("Please ensure, that the destination does exist prior mounting."); + return Ok(ExitCode::Failure); + } + Err(error) => return Err(error), + }; + + info!( + "Projection started at {}.", + projection.mount_root().display() + ); + { + info!("Press any key to stop the projection..."); + let mut stdin = io::stdin(); + let _ = stdin.read(&mut [0u8]).unwrap(); + } + + info!("Stopping projection."); + drop(projection); } Command::Restore { archive, diff --git a/src/errors.rs b/src/errors.rs index 75ce2c91..2dcac82e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -170,6 +170,12 @@ pub enum Error { #[error("This feature is not implemented")] NotImplemented, + #[error("The destination already exists")] + MountDestinationExists, + + #[error("The destination does not exists")] + MountDestinationDoesNotExists, + /// Generic IO error. #[error(transparent)] IOError { diff --git a/src/mount/mod.rs b/src/mount/mod.rs index dfe99352..a6216c33 100644 --- a/src/mount/mod.rs +++ b/src/mount/mod.rs @@ -13,7 +13,18 @@ pub struct MountOptions { pub clean: bool, } -pub fn mount(archive: Archive, destination: &Path, options: MountOptions) -> Result<()> { +/// Handle for the mount controller. +/// Once dropped, the projection will be stopped and if specified so by MountOptions cleaned. +pub trait MountHandle { + /// Returns the root path where the archive has been mounted. + fn mount_root(&self) -> &Path; +} + +pub fn mount( + archive: Archive, + destination: &Path, + options: MountOptions, +) -> Result> { #[cfg(windows)] return projfs::mount(archive, destination, options); diff --git a/src/mount/projfs.rs b/src/mount/projfs.rs index b858ec49..637a451e 100644 --- a/src/mount/projfs.rs +++ b/src/mount/projfs.rs @@ -14,7 +14,7 @@ use std::{ use bytes::Bytes; use itertools::Itertools; use lru::LruCache; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; use windows_projfs::{ DirectoryEntry, DirectoryInfo, FileInfo, Notification, ProjectedFileSystem, ProjectedFileSystemSource, @@ -23,10 +23,10 @@ use windows_projfs::{ use crate::{ hunk_index::IndexHunkIndex, monitor::{void::VoidMonitor, Monitor}, - Apath, Archive, BandId, BandSelectionPolicy, IndexEntry, Kind, Result, StoredTree, + Apath, Archive, BandId, BandSelectionPolicy, Error, IndexEntry, Kind, Result, StoredTree, }; -use super::MountOptions; +use super::{MountHandle, MountOptions}; struct StoredFileReader { iter: Peekable>>>, @@ -161,7 +161,7 @@ struct ArchiveProjectionSource { hunk_index_cache: Mutex>>, /* - * Cache the last accessed hunks to improve directory travesal speed. + * Cache the last accessed hunks to improve directory traversal speed. */ #[allow(clippy::type_complexity)] hunk_content_cache: Mutex>>>, @@ -216,7 +216,7 @@ impl ArchiveProjectionSource { .lock() .unwrap() .try_get_or_insert(band_id, || { - /* Inform the user that this band has been cached as this is most likely a heavy operaton (cpu and memory wise) */ + /* Inform the user that this band has been cached as this is most likely a heavy operation (cpu and memory wise) */ info!("Caching files for band {}", stored_tree.band().id()); let helper = IndexHunkIndex::from_index(&stored_tree.band().index())?; @@ -450,7 +450,7 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { if notification.is_cancelable() && !matches!(notification, Notification::FilePreConvertToFull(_)) { - /* try to cancel everything, except retriving data */ + /* try to cancel everything, except retrieving data */ ControlFlow::Break(()) } else { ControlFlow::Continue(()) @@ -459,19 +459,51 @@ impl ProjectedFileSystemSource for ArchiveProjectionSource { } const ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE: i32 = 369; -pub fn mount(archive: Archive, destination: &Path, options: MountOptions) -> Result<()> { +struct WindowsMountHandle { + _projection: ProjectedFileSystem, + path: PathBuf, + cleanup: bool, +} + +impl Drop for WindowsMountHandle { + fn drop(&mut self) { + if self.cleanup { + debug!("Removing destination {}", self.path.display()); + let mut attempt_count = 0; + while let Err(err) = fs::remove_dir_all(&self.path) { + attempt_count += 1; + if err.raw_os_error().unwrap_or_default() + != ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE + || attempt_count > 5 + { + warn!("Failed to clean up projection destination: {}", err); + break; + } + std::thread::sleep(Duration::from_secs(1)); + } + } + } +} + +impl MountHandle for WindowsMountHandle { + fn mount_root(&self) -> &Path { + &self.path + } +} + +pub fn mount( + archive: Archive, + destination: &Path, + options: MountOptions, +) -> Result> { if options.clean { if destination.exists() { - error!("The destination already exists."); - error!("Please ensure, that the destination does not exists."); - return Ok(()); + return Err(Error::MountDestinationExists); } fs::create_dir_all(destination)?; } else if !destination.exists() { - error!("The destination does not exists."); - error!("Please ensure, that the destination does exist prior mounting."); - return Ok(()); + return Err(Error::MountDestinationDoesNotExists); } let source = ArchiveProjectionSource { @@ -486,31 +518,12 @@ pub fn mount(archive: Archive, destination: &Path, options: MountOptions) -> Res }; let projection = ProjectedFileSystem::new(destination, source)?; - info!("Projection started at {}.", destination.display()); - { - info!("Press any key to stop the projection..."); - let mut stdin = io::stdin(); - let _ = stdin.read(&mut [0u8]).unwrap(); - } - - info!("Stopping projection."); - drop(projection); + let handle: Box = Box::new(WindowsMountHandle { + _projection: projection, - if options.clean { - debug!("Removing destination {}", destination.display()); - let mut attempt_count = 0; - while let Err(err) = fs::remove_dir_all(destination) { - attempt_count += 1; - if err.raw_os_error().unwrap_or_default() - != ERROR_CODE_VIRTUALIZATION_TEMPORARILY_UNAVAILABLE - || attempt_count > 5 - { - warn!("Failed to clean up projection destination: {}", err); - break; - } - std::thread::sleep(Duration::from_secs(1)); - } - } + path: destination.to_owned(), + cleanup: options.clean, + }); - Ok(()) + Ok(handle) } diff --git a/tests/mount.rs b/tests/mount.rs new file mode 100644 index 00000000..e46ae461 --- /dev/null +++ b/tests/mount.rs @@ -0,0 +1,258 @@ +use std::{ + fs::{self}, + path::Path, +}; + +use conserve::{ + backup, + monitor::test::TestMonitor, + test_fixtures::{ScratchArchive, TreeFixture}, + BackupOptions, MountOptions, +}; +use tempfile::TempDir; + +fn read_dir(path: &Path) -> Vec<(bool, String)> { + fs::read_dir(path) + .unwrap() + .filter_map(|entry| entry.ok()) + .map(|entry| { + ( + entry.file_type().unwrap().is_dir(), + entry.file_name().to_string_lossy().to_string(), + ) + }) + .collect::>() +} + +#[test] +#[cfg(unix)] +fn mount_unix_not_implemented() { + use assert_matches::assert_matches; + use conserve::Error; + + let archive = ScratchArchive::new(); + let mountdir = TempDir::new().unwrap(); + + let result = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: false }, + ); + assert_matches!(result.err(), Some(Error::NotImplemented)); +} + +#[test] +fn mount_empty() { + let archive = ScratchArchive::new(); + let mountdir = TempDir::new().unwrap(); + let _projection = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: false }, + ) + .unwrap(); + + assert!(mountdir.path().is_dir()); + + /* An empty projection should not contain the "latest" folder as there is no latest */ + assert_eq!(read_dir(mountdir.path()), [(true, "all".into())]); +} + +#[test] +fn mount_sub_dirs() { + let archive = ScratchArchive::new(); + { + let srcdir = TreeFixture::new(); + + srcdir.create_dir("sub1"); + srcdir.create_dir("sub1/sub1"); + srcdir.create_file("sub1/sub1/file.txt"); + + srcdir.create_dir("sub2"); + backup( + &archive, + srcdir.path(), + &BackupOptions::default(), + TestMonitor::arc(), + ) + .unwrap(); + } + + let mountdir = TempDir::new().unwrap(); + let _projection = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: false }, + ) + .unwrap(); + + assert!(mountdir.path().is_dir()); + assert_eq!( + read_dir(&mountdir.path().join("all")), + [(true, "b0000".into())] + ); + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0000")), + [(true, "sub1".into()), (true, "sub2".into())] + ); + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0000").join("sub1")), + [(true, "sub1".into())] + ); + assert_eq!( + read_dir( + &mountdir + .path() + .join("all") + .join("b0000") + .join("sub1") + .join("sub1") + ), + [(false, "file.txt".into())] + ); + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0000").join("sub2")), + [] + ); +} + +#[test] +fn mount_file_versions() { + let archive = ScratchArchive::new(); + { + let srcdir = TreeFixture::new(); + + srcdir.create_file_with_contents("file_v1.txt", b"Hello World"); + backup( + &archive, + srcdir.path(), + &BackupOptions::default(), + TestMonitor::arc(), + ) + .unwrap(); + + srcdir.create_file_with_contents("file_v1.txt", b"Good bye World"); + srcdir.create_file_with_contents("file_v2.txt", b"Only in V2"); + backup( + &archive, + srcdir.path(), + &BackupOptions::default(), + TestMonitor::arc(), + ) + .unwrap(); + } + + let mountdir = TempDir::new().unwrap(); + let _projection = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: false }, + ) + .unwrap(); + + assert!(mountdir.path().is_dir()); + assert_eq!( + read_dir(mountdir.path()), + [(true, "all".into()), (true, "latest".into())] + ); + + /* check that "latest" is actually the latest version (version 2) */ + assert_eq!( + read_dir(&mountdir.path().join("latest")), + [(false, "file_v1.txt".into()), (false, "file_v2.txt".into())] + ); + assert_eq!( + fs::read(mountdir.path().join("latest").join("file_v1.txt")).unwrap(), + b"Good bye World" + ); + + /* check if the versions can be properly listed and accessed by "all" */ + assert_eq!( + read_dir(&mountdir.path().join("all")), + [(true, "b0000".into()), (true, "b0001".into())] + ); + + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0000")), + [(false, "file_v1.txt".into())] + ); + assert_eq!( + fs::read( + mountdir + .path() + .join("all") + .join("b0000") + .join("file_v1.txt") + ) + .unwrap(), + b"Hello World" + ); + + assert_eq!( + read_dir(&mountdir.path().join("all").join("b0001")), + [(false, "file_v1.txt".into()), (false, "file_v2.txt".into())] + ); + assert_eq!( + fs::read( + mountdir + .path() + .join("all") + .join("b0001") + .join("file_v1.txt") + ) + .unwrap(), + b"Good bye World" + ); +} + +#[test] +fn mount_cleanup() { + let archive = ScratchArchive::new(); + { + let srcdir = TreeFixture::new(); + srcdir.create_file("file.txt"); + + srcdir.create_dir("sub1"); + srcdir.create_file("sub1/file.txt"); + + srcdir.create_dir("sub2"); + backup( + &archive, + srcdir.path(), + &BackupOptions::default(), + TestMonitor::arc(), + ) + .unwrap(); + } + + let mountdir = TempDir::new().unwrap(); + fs::remove_dir(mountdir.path()).unwrap(); + + let projection = conserve::mount( + archive.clone(), + mountdir.path(), + MountOptions { clean: true }, + ) + .unwrap(); + + assert!(mountdir.path().is_dir()); + + /* actually read some data which may create files in the mount dir */ + fs::read(mountdir.path().join("all").join("b0000").join("file.txt")).unwrap(); + fs::read( + mountdir + .path() + .join("all") + .join("b0000") + .join("sub1") + .join("file.txt"), + ) + .unwrap(); + assert!(!read_dir(mountdir.path()).is_empty()); + + /* Mount dir should be cleaned now */ + drop(projection); + + /* the target dir should have been deleted */ + assert!(!mountdir.path().is_dir()); +} From 4b48c8cff1f431177e8b2b3eea5bcaac0423e04b Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Thu, 1 Aug 2024 13:31:35 +0200 Subject: [PATCH 14/25] Using Rust 1.76 as required by aws-sdk-sso --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 0abfe4e5..cc19c3db 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -33,7 +33,7 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macOS-latest] features: ["", "s3"] - version: [stable, nightly, "1.75"] + version: [stable, nightly, "1.76"] steps: - uses: actions/checkout@v4 From 57942e300fe2d2ce226a56cd44bfa3009c6ed41e Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Thu, 1 Aug 2024 13:34:47 +0200 Subject: [PATCH 15/25] Disable tests for mount on non windows systems --- tests/mount.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/mount.rs b/tests/mount.rs index e46ae461..529d4ddc 100644 --- a/tests/mount.rs +++ b/tests/mount.rs @@ -42,6 +42,7 @@ fn mount_unix_not_implemented() { } #[test] +#[cfg(not(unix))] fn mount_empty() { let archive = ScratchArchive::new(); let mountdir = TempDir::new().unwrap(); @@ -59,6 +60,7 @@ fn mount_empty() { } #[test] +#[cfg(not(unix))] fn mount_sub_dirs() { let archive = ScratchArchive::new(); { @@ -117,6 +119,7 @@ fn mount_sub_dirs() { } #[test] +#[cfg(not(unix))] fn mount_file_versions() { let archive = ScratchArchive::new(); { @@ -206,6 +209,7 @@ fn mount_file_versions() { } #[test] +#[cfg(not(unix))] fn mount_cleanup() { let archive = ScratchArchive::new(); { From 1918fa58c7a8f93d785d5f7fb5f58fbcbff51ec7 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 10 Aug 2024 11:14:06 +0200 Subject: [PATCH 16/25] Seperating mount tests from normal tests any reduce the number of test threads for the mount tests --- .github/workflows/rust.yml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index cc19c3db..ac38c0e0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -54,7 +54,15 @@ jobs: - name: Test run: > cargo test --no-default-features --features=${{ matrix.features }} - --features fail/failpoints -- --include-ignored + --features fail/failpoints -- --include-ignored --skip mount + - name: Test (Mount) + run: > + cargo test --no-default-features --features=${{ matrix.features }} + --features fail/failpoints -- mount + env: + # Running multiple instances in parallel might cause a crash on low-end environments + # when executing the mounting tests on Windows due to projfs. + RUST_TEST_THREADS: 1 # Run rustfmt separately so that it does not block test results rustfmt: From ef27f3aff6e648035ddb86b155c0527c152e01ae Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Sat, 19 Oct 2024 17:58:10 -0700 Subject: [PATCH 17/25] Skip some files from mutants --- .cargo/mutants.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.cargo/mutants.toml b/.cargo/mutants.toml index ed431365..a7730283 100644 --- a/.cargo/mutants.toml +++ b/.cargo/mutants.toml @@ -39,10 +39,12 @@ exclude_globs = [ "bandid.rs", # almost well tested but some gaps "metric_recorder.rs", "progress.rs", + "src/hunk_index.rs", # not well tested yet? + "src/mount/projfs.rs", # mostly for Windows + "src/owner/windows.rs", "src/progress/term.rs", "src/transport.rs", # almost well tested but some gaps "src/transport/s3.rs", "src/ui/termui.rs", - "src/owner/windows.rs", "stats.rs", ] From 4a1360275239531a68b7f48cb44112690c4c0d74 Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Mon, 21 Oct 2024 09:10:14 -0700 Subject: [PATCH 18/25] fix: New Transport API --- src/bin/conserve.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index a91c4084..02c69972 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -489,7 +489,7 @@ impl Command { destination, cleanup, } => { - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; let options = MountOptions { clean: *cleanup }; let projection = match mount(archive, destination, options) { Ok(handle) => handle, From 941a82cb5ba60ae56b489cf1dc1ed831b45253a2 Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Mon, 21 Oct 2024 09:10:58 -0700 Subject: [PATCH 19/25] fix: Better error messages --- src/bin/conserve.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index 02c69972..46efbfb8 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -494,13 +494,11 @@ impl Command { let projection = match mount(archive, destination, options) { Ok(handle) => handle, Err(Error::MountDestinationExists) => { - error!("The destination already exists."); - error!("Please ensure, that the destination does not exists."); + error!("Mount point {} already exists", destination.display()); return Ok(ExitCode::Failure); } Err(Error::MountDestinationDoesNotExists) => { - error!("The destination does not exists."); - error!("Please ensure, that the destination does exist prior mounting."); + error!("Mount destination {} does not exist", destination.display()); return Ok(ExitCode::Failure); } Err(error) => return Err(error), From fb09fa0f1c3ce90133d910498719ad546606226d Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Wed, 23 Oct 2024 07:58:58 -0700 Subject: [PATCH 20/25] Separate out no-op Unix mount --- src/mount/mod.rs | 16 ++++++---------- src/mount/unix.rs | 12 ++++++++++++ 2 files changed, 18 insertions(+), 10 deletions(-) create mode 100644 src/mount/unix.rs diff --git a/src/mount/mod.rs b/src/mount/mod.rs index 92ff4287..b2eebf3e 100644 --- a/src/mount/mod.rs +++ b/src/mount/mod.rs @@ -1,10 +1,11 @@ use std::path::Path; -use crate::{Archive, Result}; - #[cfg(windows)] mod projfs; +#[cfg(unix)] +mod unix; + /// Options for mounting an archive /// into an existing file systems. pub struct MountOptions { @@ -22,11 +23,6 @@ pub trait MountHandle { #[cfg(windows)] pub use projfs::mount; - #[cfg(not(windows))] -pub fn mount( - _archive: Archive, - _destination: &Path, - _options: MountOptions, -) -> Result> { - Err(crate::Error::NotImplemented) -} + +#[cfg(unix)] +pub use unix::mount; diff --git a/src/mount/unix.rs b/src/mount/unix.rs new file mode 100644 index 00000000..c5a55d8b --- /dev/null +++ b/src/mount/unix.rs @@ -0,0 +1,12 @@ +use std::path::Path; + +use crate::{Archive, Result, Error}; +use super::{MountHandle, MountOptions}; + +pub fn mount( + _archive: Archive, + _destination: &Path, + _options: MountOptions, +) -> Result> { + Err(Error::NotImplemented) +} From 8863d09a28bb3e851b64f8558bd788a382c8f2fb Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Wed, 23 Oct 2024 08:05:53 -0700 Subject: [PATCH 21/25] rustfmt --- src/mount/unix.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mount/unix.rs b/src/mount/unix.rs index c5a55d8b..9d1fac04 100644 --- a/src/mount/unix.rs +++ b/src/mount/unix.rs @@ -1,7 +1,7 @@ use std::path::Path; -use crate::{Archive, Result, Error}; use super::{MountHandle, MountOptions}; +use crate::{Archive, Error, Result}; pub fn mount( _archive: Archive, From 85d495c321a9e0937d82ff5210d03f8d407c081d Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Wed, 23 Oct 2024 08:39:33 -0700 Subject: [PATCH 22/25] clippy: some things only needed on Windows --- src/bin/conserve.rs | 4 +++- tests/mount.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index 46efbfb8..b109a32b 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -15,7 +15,7 @@ use std::cell::RefCell; use std::fs::{File, OpenOptions}; -use std::io::{self, BufWriter, Read, Write}; +use std::io::{self, BufWriter, Write}; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -489,6 +489,8 @@ impl Command { destination, cleanup, } => { + use std::io::Read; + let archive = Archive::open(Transport::new(archive)?)?; let options = MountOptions { clean: *cleanup }; let projection = match mount(archive, destination, options) { diff --git a/tests/mount.rs b/tests/mount.rs index 529d4ddc..e116b72d 100644 --- a/tests/mount.rs +++ b/tests/mount.rs @@ -11,6 +11,7 @@ use conserve::{ }; use tempfile::TempDir; +#[cfg(windows)] fn read_dir(path: &Path) -> Vec<(bool, String)> { fs::read_dir(path) .unwrap() From f66d6b06890b9b88a785f40c09f9f8710b95e950 Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Wed, 23 Oct 2024 08:44:44 -0700 Subject: [PATCH 23/25] Only build hunk_index on Windows --- src/hunk_index.rs | 8 +++++++- src/lib.rs | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/hunk_index.rs b/src/hunk_index.rs index 02972084..a8c14220 100644 --- a/src/hunk_index.rs +++ b/src/hunk_index.rs @@ -1,3 +1,9 @@ +// TODO: This uses single indexes, but for consistency with the rest of Conserve +// it should probably use stitched indexes, so that it sees the continuation +// of interrupted backups. + +// TODO: Unit tests. + use std::cmp::Ordering; use rayon::iter::{IntoParallelIterator, ParallelIterator}; @@ -24,7 +30,7 @@ impl IndexHunkIndex { /// /// Note: /// Depending on the index size this might not be a cheap operation - /// as we loop trough every hunk and read its contents. + /// as we loop through every hunk and read its contents. pub fn from_index(index: &IndexRead) -> Result { let mut hunk_info = index .hunks_available()? diff --git a/src/lib.rs b/src/lib.rs index a92594c2..0480ad64 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub mod entry; pub mod errors; pub mod excludes; mod gc_lock; +#[cfg(windows)] // currently only used by projfs mod hunk_index; pub mod index; mod io; From 210d7909e514e192cd0167d41e5b614a3ff48bdf Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Wed, 23 Oct 2024 08:48:38 -0700 Subject: [PATCH 24/25] Suppress unused warnings on Unix for code only used on Windows --- src/hunk_index.rs | 3 +++ src/lib.rs | 1 - tests/mount.rs | 3 +++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/hunk_index.rs b/src/hunk_index.rs index a8c14220..6d932819 100644 --- a/src/hunk_index.rs +++ b/src/hunk_index.rs @@ -4,6 +4,9 @@ // TODO: Unit tests. +// This is currently only used by projfs, but is not inherently Windows-specific. +#![cfg_attr(not(windows), allow(unused))] + use std::cmp::Ordering; use rayon::iter::{IntoParallelIterator, ParallelIterator}; diff --git a/src/lib.rs b/src/lib.rs index 0480ad64..a92594c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,6 @@ pub mod entry; pub mod errors; pub mod excludes; mod gc_lock; -#[cfg(windows)] // currently only used by projfs mod hunk_index; pub mod index; mod io; diff --git a/tests/mount.rs b/tests/mount.rs index e116b72d..00b6a60f 100644 --- a/tests/mount.rs +++ b/tests/mount.rs @@ -1,3 +1,6 @@ +// Mostly inactive on Unix, as the mount function is not implemented for Unix. +#![cfg_attr(not(windows), allow(unused))] + use std::{ fs::{self}, path::Path, From c6a0e91b42c48de6fd51e27e15dbd15c5c564387 Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Wed, 23 Oct 2024 08:56:12 -0700 Subject: [PATCH 25/25] Rename option to --cleanup-projfs and add help --- src/bin/conserve.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index b109a32b..91400930 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -180,7 +180,22 @@ enum Command { long_listing: bool, }, - /// Mount the archive as projection. + /// Mount the archive as a filesystem. + /// + /// Files and directories from all previous backups are visible. + /// + /// This is currently only supported on Windows. + /// + /// On Windows you must first enable the Projected Filesystem feature by running this command + /// in an elevated PowerShell: + /// + /// Enable-WindowsOptionalFeature -Online -FeatureName Client-ProjFS -NoRestart + /// + /// ProjFS by default retains extracted files in the destination directory. This can make + /// access to the archive faster on subsequent mounts, but will use more disk space. + /// + /// If `--cleanup-projfs` is set, then the directory will be deleted when the projection is stopped. + /// Also, if this option is set, the destination directory must not exist. #[cfg(windows)] Mount { /// The archive to mount @@ -191,8 +206,8 @@ enum Command { /// Create the target folder and remove all temporarily created /// files on exit - #[arg(long, default_value_t = true)] - cleanup: bool, + #[arg(long)] + cleanup_projfs: bool, }, /// Copy a stored tree to a restore directory. @@ -487,7 +502,7 @@ impl Command { Command::Mount { archive, destination, - cleanup, + cleanup_projfs: cleanup, } => { use std::io::Read;