Skip to content

Commit

Permalink
add lock command
Browse files Browse the repository at this point in the history
  • Loading branch information
aawsome committed Feb 16, 2024
1 parent 0117234 commit 37bcc7b
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 0 deletions.
1 change: 1 addition & 0 deletions crates/core/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod dump;
pub mod forget;
pub mod init;
pub mod key;
pub mod lock;
pub mod merge;
pub mod prune;
/// The `repair` command.
Expand Down
256 changes: 256 additions & 0 deletions crates/core/src/commands/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
//! `lock` subcommand
use std::collections::BTreeSet;

use chrono::{DateTime, Local};
use derive_setters::Setters;
use log::error;
use rayon::ThreadPoolBuilder;

use crate::{
backend::{
decrypt::{DecryptReadBackend, DecryptWriteBackend},
node::NodeType,
FileType,
},
blob::{tree::TreeStreamerOnce, BlobType},
error::{CommandErrorKind, RepositoryErrorKind, RusticResult},
id::Id,
index::{
binarysorted::{IndexCollector, IndexType},
indexer::Indexer,
GlobalIndex, ReadGlobalIndex,
},
progress::{Progress, ProgressBars},
repofile::{IndexFile, SnapshotFile},
repository::{Open, Repository},
};

pub(super) mod constants {
/// The maximum number of reader threads to use for locking.
pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20;
}

#[derive(Debug, Clone, Default, Copy, Setters)]
/// Options for the `lock` command
pub struct LockOptions {
/// Extend locks even if the files are already locked long enough
always_extend_lock: bool,

/// Specify until when to extend the lock. If None, lock forever
until: Option<DateTime<Local>>,
}

impl LockOptions {
/// Lock the given snapshots and corresponding pack files
pub fn lock<P: ProgressBars, S: Open>(
&self,
repo: &Repository<P, S>,
snapshots: &[SnapshotFile],
now: DateTime<Local>,
) -> RusticResult<()> {
let pb = &repo.pb;
let be = repo.dbe();

let mut index_files = Vec::new();

let p = pb.progress_counter("reading index...");
let mut index_collector = IndexCollector::new(IndexType::Full);
for index in be.stream_all::<IndexFile>(&p)? {
let (id, index) = index?;
index_collector.extend(index.packs.clone());
index_files.push((id, index));
}
let index = GlobalIndex::new_from_index(index_collector.into_index());
p.finish();

let snap_tress = snapshots.iter().map(|sn| sn.tree).collect();
let packs = find_needed_packs(be, &index, snap_tress, pb)?;
self.lock_packs(repo, index_files, packs)?;

self.lock_snapshots(repo, snapshots, now)?;

Ok(())
}

fn lock_snapshots<P: ProgressBars, S: Open>(
&self,
repo: &Repository<P, S>,
snapshots: &[SnapshotFile],
now: DateTime<Local>,
) -> RusticResult<()> {
let mut new_snaps = Vec::new();
let mut remove_snaps = Vec::new();
let mut lock_snaps = Vec::new();

for snap in snapshots {
if !snap.delete.is_locked(self.until) {
new_snaps.push(SnapshotFile {
delete: self.until.into(),
..snap.clone()
});
if !snap.must_keep(now) {
remove_snaps.push(snap.id);
}
} else if self.always_extend_lock {
lock_snaps.push(snap.id);
}
}

// save new snapshots
let new_ids = repo.save_snapshots(new_snaps)?;
lock_snaps.extend(new_ids);

// remove old snapshots
repo.delete_snapshots(&remove_snaps)?;

// Do the actual locking
lock_files(repo, FileType::Snapshot, &lock_snaps, self.until)?;

Ok(())
}

fn lock_packs<P: ProgressBars, S: Open>(
&self,
repo: &Repository<P, S>,
index_files: Vec<(Id, IndexFile)>,
packs: BTreeSet<Id>,
) -> RusticResult<()> {
let mut lock_packs = Vec::new();
let mut remove_index = Vec::new();

// Check for indexfiles-to-modify and for packs to lock
// Also already write the new index from the index files which are modified.
let p = repo.pb.progress_counter("processing index files...");
p.set_length(index_files.len().try_into().unwrap());
let indexer = Indexer::new_unindexed(repo.dbe().clone()).into_shared();
for (id, mut index) in index_files {
let mut modified = false;
for pack in &mut index.packs {
if !packs.contains(&pack.id) {
continue;
}
if !pack.lock.is_locked(self.until) {
pack.lock = self.until.into();
modified = true;
lock_packs.push(pack.id);
} else if self.always_extend_lock {
lock_packs.push(pack.id);
}
}
if modified {
for pack in index.packs {
indexer.write().unwrap().add(pack)?;
}
for pack_remove in index.packs_to_delete {
indexer.write().unwrap().add_remove(pack_remove)?;
}
remove_index.push(id);
}
p.inc(1);
}
indexer.write().unwrap().finalize()?;
p.finish();

// Remove old index files
let p = repo.pb.progress_counter("removing old index files...");
repo.dbe()
.delete_list(FileType::Index, true, remove_index.iter(), p)?;

// Do the actual locking
lock_files(repo, FileType::Pack, &lock_packs, self.until)?;

Ok(())
}
}

fn lock_files<P: ProgressBars, S>(
repo: &Repository<P, S>,
file_type: FileType,
ids: &[Id],
until: Option<DateTime<Local>>,
) -> RusticResult<()> {
let pool = ThreadPoolBuilder::new()
.num_threads(constants::MAX_LOCKER_THREADS_NUM)
.build()
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?;
let p = &repo
.pb
.progress_counter(format!("locking {file_type:?} files.."));
p.set_length(ids.len().try_into().unwrap());
let backend = &repo.be;
pool.in_place_scope(|scope| {
for id in ids {
scope.spawn(move |_| {
if let Err(e) = backend.lock(file_type, id, until) {
// FIXME: Use error handling
error!("lock failed for {file_type:?} {id:?}. {e}");
};
p.inc(1);
});
}
});
p.finish();
Ok(())
}

/// Find packs which are needed for the given Trees
///
/// # Arguments
///
/// * `index` - The index to use
/// * `trees` - The trees to consider
/// * `pb` - The progress bars
///
/// # Errors
///
// TODO!: add errors!
fn find_needed_packs(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
trees: Vec<Id>,
pb: &impl ProgressBars,
) -> RusticResult<BTreeSet<Id>> {
let p = pb.progress_counter("finding needed packs...");

let mut packs = BTreeSet::new();

for tree_id in &trees {
_ = packs.insert(
index
.get_id(BlobType::Tree, tree_id)
.ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*tree_id))?
.pack,
);
}

let mut tree_streamer = TreeStreamerOnce::new(be, index, trees, p)?;
while let Some(item) = tree_streamer.next().transpose()? {
let (_, tree) = item;
for node in tree.nodes {
match node.node_type {
NodeType::File => {
for id in node.content.iter().flatten() {
_ = packs.insert(
index
.get_id(BlobType::Data, id)
.ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*id))?
.pack,
);
}
}
NodeType::Dir => {
let id = &node.subtree.unwrap();
_ = packs.insert(
index
.get_id(BlobType::Tree, id)
.ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*id))?
.pack,
);
}
_ => {} // nothing to do
}
}
}

Ok(packs)
}
2 changes: 2 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ pub enum CommandErrorKind {
FromRayonError(#[from] rayon::ThreadPoolBuildError),
/// conversion to `u64` failed: `{0:?}`
ConversionToU64Failed(TryFromIntError),
/// Id {0:?} not found in index
IdNotFoundinIndex(Id),
}

/// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub use crate::{
copy::CopySnapshot,
forget::{ForgetGroup, ForgetGroups, ForgetSnapshot, KeepOptions},
key::KeyOptions,
lock::LockOptions,
prune::{PruneOptions, PrunePlan, PruneStats},
repair::{index::RepairIndexOptions, snapshots::RepairSnapshotsOptions},
repoinfo::{BlobInfo, IndexInfos, PackInfo, RepoFileInfo, RepoFileInfos},
Expand Down
18 changes: 18 additions & 0 deletions crates/core/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
};

use bytes::Bytes;
use chrono::Local;
use derive_setters::Setters;
use log::{debug, error, info};
use serde_with::{serde_as, DisplayFromStr};
Expand Down Expand Up @@ -37,6 +38,7 @@ use crate::{
copy::CopySnapshot,
forget::{ForgetGroups, KeepOptions},
key::KeyOptions,
lock::LockOptions,
prune::{PruneOptions, PrunePlan},
repair::{index::RepairIndexOptions, snapshots::RepairSnapshotsOptions},
repoinfo::{IndexInfos, RepoFileInfos},
Expand Down Expand Up @@ -1058,6 +1060,22 @@ impl<P: ProgressBars, S: Open> Repository<P, S> {
opts.get_plan(self)
}

/// Lock snapshot and pack files needed for the given snapshots
///
/// # Arguments
///
/// * `opts` - The lock options to use
/// * `snaps` - The snapshots to lock
/// * `until` - until when to lock. None means lock forever.
///
/// # Errors
///
// TODO: Document errors
pub fn lock(&self, opts: &LockOptions, snaps: &[SnapshotFile]) -> RusticResult<()> {
let now = Local::now();
opts.lock(self, snaps, now)
}

/// Turn the repository into the `IndexedFull` state by reading and storing the index
///
/// # Errors
Expand Down

0 comments on commit 37bcc7b

Please sign in to comment.