From b23d2c43ba503b3dc5a49666e70b381964969766 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Fri, 27 Sep 2024 15:42:18 +0200 Subject: [PATCH 01/18] added partial --- Cargo.lock | 9 +- Cargo.toml | 4 +- src/main.rs | 36 +++++++ src/partial_quicksync.rs | 203 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 250 insertions(+), 2 deletions(-) create mode 100644 src/partial_quicksync.rs diff --git a/Cargo.lock b/Cargo.lock index d037cd8..1d6fc26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,6 +485,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "1.1.0" @@ -964,12 +970,13 @@ dependencies = [ [[package]] name = "quicksync" -version = "0.1.16" +version = "0.2.0" dependencies = [ "anyhow", "chrono", "clap", "duration-string", + "hex", "md5", "mockito", "rand", diff --git a/Cargo.toml b/Cargo.toml index e8af115..b7c7b49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quicksync" -version = "0.1.16" +version = "0.2.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -18,6 +18,8 @@ serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" url = "2.5.0" zstd = "0.13.0" +hex = "0.4" + [dev-dependencies] mockito = "1.5.0" diff --git a/src/main.rs b/src/main.rs index 16c1501..ae8d0f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ mod download; mod eta; mod go_spacemesh; mod parsers; +mod partial_quicksync; mod read_error_response; mod reader_with_bytes; mod sql; @@ -24,6 +25,7 @@ use checksum::*; use download::download_with_retries; use go_spacemesh::get_version; use parsers::*; +use partial_quicksync::partial_restore; use sql::get_last_layer_from_db; use utils::*; @@ -79,6 +81,18 @@ enum Commands { #[clap(short = 'r', long, default_value = "10")] max_retries: u32, }, + /// Uses partial recovery quicksync method + Partial { + /// Path to the node state.sql + #[clap(short = 's', long)] + state_sql: PathBuf, + /// Starting layer to recover from OR 0 to determine latest in the db + #[clap(short = 'l', long, default_value = "0")] + start_layer: i64, + /// Jump-back to recover earlier than latest layer. It will jump back one row in recovery metadata + #[clap(short = 'j', long, default_value = "0")] + jump_back: usize, + }, } fn go_spacemesh_default_path() -> &'static str { @@ -311,5 +325,27 @@ fn main() -> anyhow::Result<()> { Ok(()) } + Commands::Partial { + state_sql, + start_layer, + jump_back, + } => { + let state_sql_path = resolve_path(&state_sql).context("resolving state.sql path")?; + if state_sql_path.try_exists().unwrap_or(false) { + println!("State file found: {}", state_sql_path.display()); + } else { + eprintln!("State file not found: {}", state_sql_path.display()); + process::exit(1); + } + let result = partial_restore( + start_layer, + &state_sql_path.into_os_string().into_string().unwrap(), + jump_back, + ); + if result.is_err() { + process::exit(1); + } + result + } } } diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs new file mode 100644 index 0000000..6477973 --- /dev/null +++ b/src/partial_quicksync.rs @@ -0,0 +1,203 @@ +use anyhow::{Context, Result}; +use reqwest::blocking::Client; +use rusqlite::Connection; +use std::{ + env, + fs::{self, File}, + io::{self, BufReader, BufWriter}, + time::Instant, +}; +use zstd::stream::Decoder; + +const DEFAULT_BASE_URL: &str = "https://quicksync.spacemesh.network/partials"; + +#[derive(Clone, Debug)] +struct RestorePoint { + from: i64, + to: i64, + hash: String, +} + +fn get_base_url() -> String { + env::var("QS_BASE_URL").unwrap_or_else(|_| DEFAULT_BASE_URL.to_string()) +} + +fn get_previous_hash(layer_at: i64, conn: &Connection) -> Result { + conn + .query_row( + "SELECT aggregated_hash FROM layers WHERE id = ?", + [layer_at - 1], + |row| { + let hash: Vec = row.get(0)?; + Ok(hex::encode(&hash[..2])) + }, + ) + .context("Failed to get previous hash") +} + +fn find_start_points(layer_from: i64, metadata: &str, jump_back: usize) -> Vec { + let mut result = Vec::new(); + let mut continuous = false; + + for (index, line) in metadata.lines().enumerate() { + let items: Vec<&str> = line.split(',').collect(); + let from: i64 = items[0].parse().unwrap(); + let to: i64 = items[1].parse().unwrap(); + + if to > layer_from && from <= layer_from || continuous { + continuous = true; + if index >= jump_back { + let jump_back_line = metadata.lines().nth(index - jump_back).unwrap(); + let jump_back_items: Vec<&str> = jump_back_line.split(',').collect(); + + result.push(RestorePoint { + from: jump_back_items[0].parse().unwrap(), + to: jump_back_items[1].parse().unwrap(), + hash: jump_back_items[2].to_string(), + }); + } + } + } + + result +} + +fn get_latest_from_db(target_path: &str) -> Result { + let conn = Connection::open(target_path)?; + conn + .query_row( + "SELECT max(id) FROM layers WHERE applied_block IS NOT null", + [], + |row| row.get(0), + ) + .context("Failed to get latest layer from DB") +} + +fn get_user_version(target_path: &str) -> Result { + let conn = Connection::open(target_path)?; + conn + .query_row("PRAGMA user_version", [], |row| row.get(0)) + .context("Failed to get user version") +} + +fn download_file( + client: &Client, + user_version: i64, + layer_from: i64, + layer_to: i64, + hash: &str, + target_path: &str, +) -> Result<()> { + let base_url = get_base_url(); + let suffix = if target_path.ends_with("zst") { + ".zst" + } else { + "" + }; + let url = format!( + "{}/{}/{}_{}_{}/state.sql_diff.{}_{}.sql{}", + base_url, user_version, layer_from, layer_to, hash, layer_from, layer_to, suffix + ); + println!("Downloading from {}", url); + let mut resp = client.get(&url).send().context("Failed to send request")?; + if !resp.status().is_success() { + anyhow::bail!( + "Failed to download file {}: HTTP status {}", + url, + resp.status() + ); + } + let mut file = File::create(target_path).context("Failed to create file")?; + resp + .copy_to(&mut file) + .context("Failed to copy response to file")?; + Ok(()) +} + +fn decompress_file(input_path: &str, output_path: &str) -> Result<()> { + let input_file = File::open(input_path).context("Failed to open input file")?; + let output_file = File::create(output_path).context("Failed to create output file")?; + + let mut reader = BufReader::new(input_file); + let mut writer = BufWriter::new(output_file); + + let mut decoder = Decoder::new(&mut reader).context("Failed to create decoder")?; + decoder + .window_log_max(31) + .context("Failed to set window log max")?; + + io::copy(&mut decoder, &mut writer).context("Failed to decompress")?; + + Ok(()) +} + +fn execute_restore(target_conn: &Connection, restore_string: &str) -> Result<()> { + target_conn + .execute_batch(restore_string) + .context("Failed to execute restore") +} + +pub fn partial_restore(layer_from: i64, target_db_path: &str, jump_back: usize) -> Result<()> { + let client = Client::new(); + let base_url = get_base_url(); + let user_version = get_user_version(target_db_path)?; + let remote_metadata = client + .get(format!("{}/{}/metadata.csv", base_url, user_version)) + .send()? + .text()?; + let restore_string = client + .get(format!("{}/{}/restore.sql", base_url, user_version)) + .send()? + .text()?; + let layer_from = if layer_from == 0 { + get_latest_from_db(target_db_path)? + } else { + layer_from + }; + let start_points = find_start_points(layer_from, &remote_metadata, jump_back); + if start_points.is_empty() { + anyhow::bail!("No suitable restore point found"); + } + println!("Found {} potential restore points", start_points.len()); + + for point in start_points { + let conn = Connection::open(target_db_path)?; + let previous_hash = get_previous_hash(point.from, &conn)?; + + if previous_hash == &point.hash[..4] { + let source_db_path_zst = "backup_source.db.zst"; + let source_db_path = "backup_source.db"; + + if download_file( + &client, + user_version, + point.from, + point.to, + &point.hash, + source_db_path_zst, + ) + .is_err() + { + download_file( + &client, + user_version, + point.from, + point.to, + &point.hash, + source_db_path, + )?; + } else { + decompress_file(source_db_path_zst, source_db_path)?; + } + + println!("Restoring from {} to {}...", point.from, point.to); + let start = Instant::now(); + execute_restore(&conn, &restore_string)?; + fs::remove_file(source_db_path)?; + let _ = fs::remove_file(source_db_path_zst); + let duration = start.elapsed(); + println!("Restored {} to {} in {:?}", point.from, point.to, duration); + } + } + Ok(()) +} From ff6fa0c25361a1b2fa1713f427bf32948c956d6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Nowak?= Date: Fri, 27 Sep 2024 17:21:43 +0200 Subject: [PATCH 02/18] Update src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplified partial_restore. Co-authored-by: Bartosz Różański --- src/main.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index ae8d0f5..7cb52c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -337,11 +337,7 @@ fn main() -> anyhow::Result<()> { eprintln!("State file not found: {}", state_sql_path.display()); process::exit(1); } - let result = partial_restore( - start_layer, - &state_sql_path.into_os_string().into_string().unwrap(), - jump_back, - ); + let result = partial_restore(start_layer, &state_sql_path, jump_back); if result.is_err() { process::exit(1); } From e01da27c8a4d2b8d0d6c8c5c026a28d1b2070ab5 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Fri, 27 Sep 2024 17:54:04 +0200 Subject: [PATCH 03/18] Improvements and some basic tests --- Cargo.lock | 49 ++++++++++++++++++++ Cargo.toml | 2 +- src/main.rs | 22 ++++----- src/partial_quicksync.rs | 98 ++++++++++++++++++++++++++++++++-------- 4 files changed, 141 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d6fc26..99264b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -906,6 +906,31 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parse-display" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287d8d3ebdce117b8539f59411e4ed9ec226e0a4153c7f55495c6070d68e6f72" +dependencies = [ + "parse-display-derive", + "regex", + "regex-syntax", +] + +[[package]] +name = "parse-display-derive" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fc048687be30d79502dea2f623d052f3a074012c6eac41726b7ab17213616b1" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax", + "structmeta", + "syn", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -979,6 +1004,7 @@ dependencies = [ "hex", "md5", "mockito", + "parse-display", "rand", "regex", "reqwest", @@ -1338,6 +1364,29 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "structmeta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive", + "syn", +] + +[[package]] +name = "structmeta-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index b7c7b49..bff075f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ serde_json = "1.0.128" url = "2.5.0" zstd = "0.13.0" hex = "0.4" - +parse-display = "0.10.0" [dev-dependencies] mockito = "1.5.0" diff --git a/src/main.rs b/src/main.rs index 7cb52c3..e009332 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ mod unpack; mod user_agent; mod utils; -use anyhow::Context; +use anyhow::{anyhow, Context}; use checksum::*; use download::download_with_retries; use go_spacemesh::get_version; @@ -331,17 +331,17 @@ fn main() -> anyhow::Result<()> { jump_back, } => { let state_sql_path = resolve_path(&state_sql).context("resolving state.sql path")?; - if state_sql_path.try_exists().unwrap_or(false) { - println!("State file found: {}", state_sql_path.display()); - } else { - eprintln!("State file not found: {}", state_sql_path.display()); - process::exit(1); - } - let result = partial_restore(start_layer, &state_sql_path, jump_back); - if result.is_err() { - process::exit(1); + if !state_sql_path + .try_exists() + .context("checking if state file exists")? + { + return Err(anyhow!("state file not found: {:?}", state_sql_path)); } - result + partial_restore( + start_layer, + &state_sql_path.into_os_string().into_string().unwrap(), + jump_back, + ) } } } diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 6477973..0706340 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -1,17 +1,20 @@ use anyhow::{Context, Result}; +// use parse_display::{Display, FromStr}; use reqwest::blocking::Client; use rusqlite::Connection; use std::{ env, fs::{self, File}, io::{self, BufReader, BufWriter}, + str::FromStr, time::Instant, }; use zstd::stream::Decoder; const DEFAULT_BASE_URL: &str = "https://quicksync.spacemesh.network/partials"; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, parse_display::FromStr)] +#[display("{from},{to},{hash}")] struct RestorePoint { from: i64, to: i64, @@ -37,28 +40,21 @@ fn get_previous_hash(layer_at: i64, conn: &Connection) -> Result { fn find_start_points(layer_from: i64, metadata: &str, jump_back: usize) -> Vec { let mut result = Vec::new(); - let mut continuous = false; + + let mut target_index = 0; for (index, line) in metadata.lines().enumerate() { let items: Vec<&str> = line.split(',').collect(); - let from: i64 = items[0].parse().unwrap(); let to: i64 = items[1].parse().unwrap(); - - if to > layer_from && from <= layer_from || continuous { - continuous = true; - if index >= jump_back { - let jump_back_line = metadata.lines().nth(index - jump_back).unwrap(); - let jump_back_items: Vec<&str> = jump_back_line.split(',').collect(); - - result.push(RestorePoint { - from: jump_back_items[0].parse().unwrap(), - to: jump_back_items[1].parse().unwrap(), - hash: jump_back_items[2].to_string(), - }); - } + if to > layer_from { + target_index = index; + break; } } - + target_index = target_index - jump_back; + for line in metadata.lines().skip(target_index) { + result.push(RestorePoint::from_str(line).unwrap()); + } result } @@ -188,16 +184,82 @@ pub fn partial_restore(layer_from: i64, target_db_path: &str, jump_back: usize) )?; } else { decompress_file(source_db_path_zst, source_db_path)?; + fs::remove_file(source_db_path_zst)?; } println!("Restoring from {} to {}...", point.from, point.to); let start = Instant::now(); execute_restore(&conn, &restore_string)?; fs::remove_file(source_db_path)?; - let _ = fs::remove_file(source_db_path_zst); let duration = start.elapsed(); println!("Restored {} to {} in {:?}", point.from, point.to, duration); } } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Result; + use rusqlite::Connection; + use tempfile::tempdir; + + fn create_test_db() -> Result<(tempfile::TempDir, String)> { + let dir = tempdir()?; + let db_path = dir.path().join("test.db"); + let conn = Connection::open(&db_path)?; + conn.execute( + "CREATE TABLE layers (id INTEGER, applied_block INTEGER, aggregated_hash BLOB)", + [], + )?; + conn.execute( + "INSERT INTO layers (id, applied_block, aggregated_hash) VALUES (?, ?, ?), (?, ?, ?)", + rusqlite::params![1, 100, 0xAAAA, 2, 200, 0xBBBB], + )?; + Ok((dir, db_path.to_str().unwrap().to_string())) + } + + #[test] + fn test_get_base_url() { + std::env::set_var("QS_BASE_URL", "https://test.com"); + assert_eq!(get_base_url(), "https://test.com"); + std::env::remove_var("QS_BASE_URL"); + assert_eq!(get_base_url(), DEFAULT_BASE_URL); + } + + #[test] + fn test_find_start_points() { + let metadata = "0,100,aaaa\n101,200,bbbb\n201,300,ijkl\n"; + let result = find_start_points(150, metadata, 0); + assert_eq!(result.len(), 2); + assert_eq!(result[0].from, 101); + assert_eq!(result[0].to, 200); + assert_eq!(result[0].hash, "bbbb"); + + let result = find_start_points(150, metadata, 1); + println!("{:?}", result); + assert_eq!(result.len(), 3); + assert_eq!(result[0].from, 0); + assert_eq!(result[0].to, 100); + assert_eq!(result[0].hash, "aaaa"); + } + + #[test] + fn test_get_latest_from_db() -> Result<()> { + let (_dir, db_path) = create_test_db()?; + let result = get_latest_from_db(&db_path)?; + assert_eq!(result, 2); + Ok(()) + } + + #[test] + fn test_get_user_version() -> Result<()> { + let (_dir, db_path) = create_test_db()?; + let conn = Connection::open(&db_path)?; + conn.execute("PRAGMA user_version = 42", [])?; + let result = get_user_version(&db_path)?; + assert_eq!(result, 42); + Ok(()) + } +} From af1e447055a41e51c046da1164564d8556e2d0de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 2 Oct 2024 12:25:42 +0200 Subject: [PATCH 04/18] add tests --- Cargo.toml | 2 +- src/main.rs | 17 +- src/partial_quicksync.rs | 414 ++++++++++++++++++++++++++------------- 3 files changed, 282 insertions(+), 151 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bff075f..471636d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ duration-string = "0.4.0" md5 = "0.7.0" regex = "1.10.3" reqwest = { version = "0.12.7", features = ["json", "stream", "blocking"] } -rusqlite = { version = "0.32.1", features = ["bundled"] } +rusqlite = { version = "0.32.1", features = ["bundled", "backup"] } serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" url = "2.5.0" diff --git a/src/main.rs b/src/main.rs index e009332..e020fa1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,12 +86,12 @@ enum Commands { /// Path to the node state.sql #[clap(short = 's', long)] state_sql: PathBuf, - /// Starting layer to recover from OR 0 to determine latest in the db - #[clap(short = 'l', long, default_value = "0")] - start_layer: i64, /// Jump-back to recover earlier than latest layer. It will jump back one row in recovery metadata - #[clap(short = 'j', long, default_value = "0")] + #[clap(short = 'j', long, default_value_t = 0)] jump_back: usize, + /// URL to download parts from + #[clap(short = 'u', long, default_value = partial_quicksync::DEFAULT_BASE_URL)] + base_url: String, }, } @@ -327,8 +327,8 @@ fn main() -> anyhow::Result<()> { } Commands::Partial { state_sql, - start_layer, jump_back, + base_url, } => { let state_sql_path = resolve_path(&state_sql).context("resolving state.sql path")?; if !state_sql_path @@ -337,11 +337,8 @@ fn main() -> anyhow::Result<()> { { return Err(anyhow!("state file not found: {:?}", state_sql_path)); } - partial_restore( - start_layer, - &state_sql_path.into_os_string().into_string().unwrap(), - jump_back, - ) + let conn = rusqlite::Connection::open(state_sql_path)?; + partial_restore(&base_url, &conn, jump_back) } } } diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 0706340..c1d6605 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -1,99 +1,107 @@ use anyhow::{Context, Result}; -// use parse_display::{Display, FromStr}; use reqwest::blocking::Client; use rusqlite::Connection; use std::{ - env, fs::{self, File}, io::{self, BufReader, BufWriter}, + path::Path, str::FromStr, time::Instant, }; use zstd::stream::Decoder; -const DEFAULT_BASE_URL: &str = "https://quicksync.spacemesh.network/partials"; +pub(crate) const DEFAULT_BASE_URL: &str = "https://quicksync.spacemesh.network/partials"; -#[derive(Clone, Debug, parse_display::FromStr)] +#[derive(Clone, Debug, PartialEq, Eq, parse_display::Display, parse_display::FromStr)] #[display("{from},{to},{hash}")] struct RestorePoint { - from: i64, - to: i64, + from: u32, + to: u32, hash: String, } -fn get_base_url() -> String { - env::var("QS_BASE_URL").unwrap_or_else(|_| DEFAULT_BASE_URL.to_string()) -} - -fn get_previous_hash(layer_at: i64, conn: &Connection) -> Result { +fn get_previous_hash(layer_at: u32, conn: &Connection) -> Result { + let layer_at = layer_at - 1; conn .query_row( "SELECT aggregated_hash FROM layers WHERE id = ?", - [layer_at - 1], + [layer_at], |row| { let hash: Vec = row.get(0)?; Ok(hex::encode(&hash[..2])) }, ) - .context("Failed to get previous hash") + .with_context(|| format!("failed to get previous hash for layer {layer_at}")) } -fn find_start_points(layer_from: i64, metadata: &str, jump_back: usize) -> Vec { - let mut result = Vec::new(); - - let mut target_index = 0; +// Find restore points for layers >= `layer_from` in layers described by `metadata`. +// The `metadata` holds non-overlapping, ordered restore points (one per line) in form: +// {layer_from (inlusive)},{layer_to (exclusive)},{short hash (4)} +// +// The `jump_back` tells how many "previous" points should be included in +// the returned vector. +fn find_restore_points(layer_from: u32, metadata: &str, jump_back: usize) -> Vec { + let mut all_points = Vec::new(); + let mut target_index = None; - for (index, line) in metadata.lines().enumerate() { - let items: Vec<&str> = line.split(',').collect(); - let to: i64 = items[1].parse().unwrap(); - if to > layer_from { - target_index = index; - break; + for (index, line) in metadata.trim().lines().enumerate() { + let point = RestorePoint::from_str(line.trim()).expect("parsing restore point"); + if point.to > layer_from && target_index.is_none() { + target_index = Some(index); } + all_points.push(point); } - target_index = target_index - jump_back; - for line in metadata.lines().skip(target_index) { - result.push(RestorePoint::from_str(line).unwrap()); - } - result + // A None `target_index` means there aren't any layers > `layer_from` + // in the data described by `metadata`. + let target_index = if let Some(t) = target_index { + if t >= jump_back { + t - jump_back + } else { + 0 + } + } else { + return vec![]; + }; + + all_points.split_off(target_index) } -fn get_latest_from_db(target_path: &str) -> Result { - let conn = Connection::open(target_path)?; +fn get_latest_from_db(conn: &Connection) -> Result { conn .query_row( "SELECT max(id) FROM layers WHERE applied_block IS NOT null", [], |row| row.get(0), ) - .context("Failed to get latest layer from DB") + .context("failed to get latest layer from DB") } -fn get_user_version(target_path: &str) -> Result { - let conn = Connection::open(target_path)?; +fn get_user_version(conn: &Connection) -> Result { conn .query_row("PRAGMA user_version", [], |row| row.get(0)) - .context("Failed to get user version") + .context("failed to get user version") +} + +fn file_url(user_version: usize, p: &RestorePoint, suffix: Option<&str>) -> String { + let suffix = suffix.unwrap_or_default(); + format!( + "{}/{}_{}_{}/state.sql_diff.{}_{}.sql{}", + user_version, p.from, p.to, p.hash, p.from, p.to, suffix + ) } fn download_file( client: &Client, - user_version: i64, - layer_from: i64, - layer_to: i64, - hash: &str, - target_path: &str, + base_url: &str, + user_version: usize, + point: &RestorePoint, + target_path: &Path, ) -> Result<()> { - let base_url = get_base_url(); - let suffix = if target_path.ends_with("zst") { - ".zst" - } else { - "" - }; - let url = format!( - "{}/{}/{}_{}_{}/state.sql_diff.{}_{}.sql{}", - base_url, user_version, layer_from, layer_to, hash, layer_from, layer_to, suffix - ); + let suffix = target_path + .extension() + .is_some_and(|ext| ext == "zst") + .then_some(".zst"); + let url = format!("{}/{}", base_url, file_url(user_version, point, suffix)); println!("Downloading from {}", url); let mut resp = client.get(&url).send().context("Failed to send request")?; if !resp.status().is_success() { @@ -110,7 +118,7 @@ fn download_file( Ok(()) } -fn decompress_file(input_path: &str, output_path: &str) -> Result<()> { +fn decompress_file(input_path: &Path, output_path: &Path) -> Result<()> { let input_file = File::open(input_path).context("Failed to open input file")?; let output_file = File::create(output_path).context("Failed to create output file")?; @@ -127,16 +135,9 @@ fn decompress_file(input_path: &str, output_path: &str) -> Result<()> { Ok(()) } -fn execute_restore(target_conn: &Connection, restore_string: &str) -> Result<()> { - target_conn - .execute_batch(restore_string) - .context("Failed to execute restore") -} - -pub fn partial_restore(layer_from: i64, target_db_path: &str, jump_back: usize) -> Result<()> { +pub fn partial_restore(base_url: &str, conn: &Connection, jump_back: usize) -> Result<()> { let client = Client::new(); - let base_url = get_base_url(); - let user_version = get_user_version(target_db_path)?; + let user_version = get_user_version(conn)?; let remote_metadata = client .get(format!("{}/{}/metadata.csv", base_url, user_version)) .send()? @@ -145,121 +146,254 @@ pub fn partial_restore(layer_from: i64, target_db_path: &str, jump_back: usize) .get(format!("{}/{}/restore.sql", base_url, user_version)) .send()? .text()?; - let layer_from = if layer_from == 0 { - get_latest_from_db(target_db_path)? - } else { - layer_from - }; - let start_points = find_start_points(layer_from, &remote_metadata, jump_back); + let latest_layer = get_latest_from_db(conn)?; + let layer_from = latest_layer + 1; // start with the first layer that is not in the DB + let start_points = find_restore_points(layer_from, &remote_metadata, jump_back); if start_points.is_empty() { anyhow::bail!("No suitable restore point found"); } - println!("Found {} potential restore points", start_points.len()); - - for point in start_points { - let conn = Connection::open(target_db_path)?; - let previous_hash = get_previous_hash(point.from, &conn)?; - - if previous_hash == &point.hash[..4] { - let source_db_path_zst = "backup_source.db.zst"; - let source_db_path = "backup_source.db"; - - if download_file( - &client, - user_version, - point.from, - point.to, - &point.hash, - source_db_path_zst, - ) - .is_err() - { - download_file( - &client, - user_version, - point.from, - point.to, - &point.hash, - source_db_path, - )?; - } else { - decompress_file(source_db_path_zst, source_db_path)?; - fs::remove_file(source_db_path_zst)?; - } - - println!("Restoring from {} to {}...", point.from, point.to); - let start = Instant::now(); - execute_restore(&conn, &restore_string)?; - fs::remove_file(source_db_path)?; - let duration = start.elapsed(); - println!("Restored {} to {} in {:?}", point.from, point.to, duration); + let total = start_points.len(); + println!("Found {total} potential restore points"); + + for (idx, p) in start_points.into_iter().enumerate() { + let previous_hash = get_previous_hash(p.from, conn)?; + anyhow::ensure!( + previous_hash == p.hash[..4], + "unexpected hash: '{previous_hash}' doesn't match restore point {p:?}", + ); + let source_db_path_zst = &Path::new("backup_source.db.zst"); + let source_db_path = &Path::new("backup_source.db"); + + if download_file(&client, base_url, user_version, &p, source_db_path_zst).is_err() { + download_file(&client, base_url, user_version, &p, source_db_path)?; + } else { + decompress_file(source_db_path_zst, source_db_path)?; + fs::remove_file(source_db_path_zst)?; } + + println!("[{idx}/{total}] Restoring from {} to {}...", p.from, p.to); + let start = Instant::now(); + conn + .execute_batch(&restore_string) + .context("executing restore")?; + fs::remove_file(source_db_path)?; + let duration = start.elapsed(); + println!( + "[{idx}/{total}] Restored {} to {} in {:?}", + p.from, p.to, duration + ); } Ok(()) } +#[cfg(test)] +impl RestorePoint { + fn new(from: u32, to: u32, hash: String) -> Self { + Self { from, to, hash } + } +} + #[cfg(test)] mod tests { use super::*; - use anyhow::Result; use rusqlite::Connection; use tempfile::tempdir; - fn create_test_db() -> Result<(tempfile::TempDir, String)> { - let dir = tempdir()?; - let db_path = dir.path().join("test.db"); - let conn = Connection::open(&db_path)?; - conn.execute( - "CREATE TABLE layers (id INTEGER, applied_block INTEGER, aggregated_hash BLOB)", - [], - )?; - conn.execute( - "INSERT INTO layers (id, applied_block, aggregated_hash) VALUES (?, ?, ?), (?, ?, ?)", - rusqlite::params![1, 100, 0xAAAA, 2, 200, 0xBBBB], - )?; - Ok((dir, db_path.to_str().unwrap().to_string())) - } - - #[test] - fn test_get_base_url() { - std::env::set_var("QS_BASE_URL", "https://test.com"); - assert_eq!(get_base_url(), "https://test.com"); - std::env::remove_var("QS_BASE_URL"); - assert_eq!(get_base_url(), DEFAULT_BASE_URL); + fn create_test_db() -> Connection { + let conn = Connection::open_in_memory().unwrap(); + conn + .execute( + "CREATE TABLE layers (id INTEGER, applied_block INTEGER, aggregated_hash BLOB)", + [], + ) + .unwrap(); + conn } #[test] fn test_find_start_points() { - let metadata = "0,100,aaaa\n101,200,bbbb\n201,300,ijkl\n"; - let result = find_start_points(150, metadata, 0); + let metadata = r#" + 0,100,aaaa + 101,200,bbbb + 201,300,ijkl + "#; + let result = find_restore_points(150, metadata, 0); assert_eq!(result.len(), 2); assert_eq!(result[0].from, 101); assert_eq!(result[0].to, 200); assert_eq!(result[0].hash, "bbbb"); - let result = find_start_points(150, metadata, 1); - println!("{:?}", result); + let result = find_restore_points(150, metadata, 1); assert_eq!(result.len(), 3); assert_eq!(result[0].from, 0); assert_eq!(result[0].to, 100); assert_eq!(result[0].hash, "aaaa"); + + // `jump_back` over the first point + let result2 = find_restore_points(150, metadata, 5); + assert_eq!(result, result2); + + // no points for the requested starting layer + let result = find_restore_points(500, metadata, 1); + assert!(result.is_empty()); + } + + fn insert_layer(conn: &Connection, id: u32, applied_block: i64, hash: &[u8]) { + conn + .execute( + "INSERT INTO layers (id, applied_block, aggregated_hash) VALUES (?, ?, ?)", + rusqlite::params![id, applied_block, hash], + ) + .unwrap(); + } + + #[test] + fn getting_previous_hash() { + let conn = create_test_db(); + insert_layer(&conn, 2, 100, &[0xAA, 0xBB]); + let result = get_previous_hash(3, &conn).unwrap(); + assert_eq!("aabb", result); } #[test] - fn test_get_latest_from_db() -> Result<()> { - let (_dir, db_path) = create_test_db()?; - let result = get_latest_from_db(&db_path)?; + fn test_get_latest_from_db() { + let conn = create_test_db(); + insert_layer(&conn, 2, 100, &[0xAA, 0xBB]); + let result = get_latest_from_db(&conn).unwrap(); assert_eq!(result, 2); - Ok(()) } #[test] - fn test_get_user_version() -> Result<()> { - let (_dir, db_path) = create_test_db()?; - let conn = Connection::open(&db_path)?; - conn.execute("PRAGMA user_version = 42", [])?; - let result = get_user_version(&db_path)?; + fn test_get_user_version() { + let conn = create_test_db(); + conn.execute("PRAGMA user_version = 42", []).unwrap(); + let result = get_user_version(&conn).unwrap(); assert_eq!(result, 42); - Ok(()) + } + + #[test] + fn downloading_file() { + let point = RestorePoint { + from: 100, + to: 200, + hash: "abcd".to_string(), + }; + let file_url = file_url(1, &point, Some(".zst")); + let mut server = mockito::Server::new(); + let mock = server + .mock("GET", format!("/{file_url}").as_str()) + .with_status(200) + .with_body("file contents") + .create(); + + let dir = tempdir().unwrap(); + let dst = dir.path().join("dst.zst"); + super::download_file(&Client::new(), &server.url(), 1, &point, &dst).unwrap(); + mock.assert(); + + let data = std::fs::read(&dst).unwrap(); + assert_eq!(&data, "file contents".as_bytes()); + } + + #[test] + fn partial_restore() { + let conn = create_test_db(); + insert_layer(&conn, 99, 100, &[0xBB, 0xBB]); + let mut server = mockito::Server::new(); + let user_version = 0; + + let points = [ + ("bbbb", RestorePoint::new(0, 100, "aaaa".into())), + ("cccc", RestorePoint::new(100, 200, "bbbb".into())), + ("dddd", RestorePoint::new(200, 300, "cccc".into())), + ("eeee", RestorePoint::new(300, 400, "dddd".into())), + ]; + + let metadata = points + .iter() + .map(|(_, p)| p.to_string()) + .collect::>() + .join("\n"); + + let mock_metadata = server + .mock("GET", format!("/{user_version}/metadata.csv").as_str()) + .with_status(200) + .with_body(metadata) + .create(); + + // Restore SQL just copies contents of the `layers` table + let mock_query = server + .mock("GET", format!("/{user_version}/restore.sql").as_str()) + .with_status(200) + .with_body( + r#"ATTACH DATABASE 'backup_source.db' AS src; + INSERT OR IGNORE INTO layers SELECT * from src.layers; + DETACH DATABASE src"#, + ) + .create(); + + let data_mocks = points + .iter() + .skip(1) + .map(|(hash, point)| { + // For simplicity, the database used to restore contains only + // the last layer of the point and its expected hash. + let conn = create_test_db(); + let hash = hex::decode(hash).unwrap(); + insert_layer(&conn, point.to - 1, 111, &hash); + let dir = tempdir().unwrap(); + let checkpoint = dir.path().join("checkpoint.db"); + conn + .backup(rusqlite::DatabaseName::Main, &checkpoint, None) + .unwrap(); + + let file_url = file_url(user_version, point, None); + server + .mock("GET", format!("/{file_url}").as_str()) + .with_status(200) + .with_body(std::fs::read(&checkpoint).unwrap()) + .create() + }) + .collect::>(); + + super::partial_restore(&server.url(), &conn, 0).unwrap(); + + mock_metadata.assert(); + mock_query.assert(); + for mock in data_mocks { + mock.assert(); + } + + let latest = get_latest_from_db(&conn).unwrap(); + assert_eq!(latest, points.last().unwrap().1.to - 1); + + let result = get_previous_hash(latest + 1, &conn).unwrap(); + assert_eq!(result, points.last().unwrap().0); + } + + #[test] + fn fails_on_hash_mismatch() { + let conn = create_test_db(); + insert_layer(&conn, 99, 100, &[0xFF, 0xFF]); + let mut server = mockito::Server::new(); + let user_version = 0; + + let metadata = RestorePoint::new(100, 200, "aaaa".to_string()).to_string(); + let mock_metadata = server + .mock("GET", format!("/{user_version}/metadata.csv").as_str()) + .with_status(200) + .with_body(metadata) + .create(); + + let mock_query = server + .mock("GET", format!("/{user_version}/restore.sql").as_str()) + .with_status(200) + .with_body(".import backup_source.db layers") + .create(); + + let err = super::partial_restore(&server.url(), &conn, 0).unwrap_err(); + assert!(err.to_string().contains("unexpected hash")); + mock_metadata.assert(); + mock_query.assert(); } } From 4231ec12e2f08796e9b6bc10f04070adc13ac485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 2 Oct 2024 12:41:08 +0200 Subject: [PATCH 05/18] reformat --- src/partial_quicksync.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index c1d6605..3da6ab8 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -196,7 +196,7 @@ impl RestorePoint { #[cfg(test)] mod tests { use super::*; - use rusqlite::Connection; + use rusqlite::{Connection, DatabaseName}; use tempfile::tempdir; fn create_test_db() -> Connection { @@ -343,9 +343,7 @@ mod tests { insert_layer(&conn, point.to - 1, 111, &hash); let dir = tempdir().unwrap(); let checkpoint = dir.path().join("checkpoint.db"); - conn - .backup(rusqlite::DatabaseName::Main, &checkpoint, None) - .unwrap(); + conn.backup(DatabaseName::Main, &checkpoint, None).unwrap(); let file_url = file_url(user_version, point, None); server From 8731ab0763ff0dba7d5305c428b7dc292ea8a34e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 2 Oct 2024 13:03:48 +0200 Subject: [PATCH 06/18] don't rely on DETACH in the restore SQL --- src/main.rs | 3 +-- src/partial_quicksync.rs | 51 ++++++++++++++++++++++++++-------------- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index e020fa1..db53374 100644 --- a/src/main.rs +++ b/src/main.rs @@ -337,8 +337,7 @@ fn main() -> anyhow::Result<()> { { return Err(anyhow!("state file not found: {:?}", state_sql_path)); } - let conn = rusqlite::Connection::open(state_sql_path)?; - partial_restore(&base_url, &conn, jump_back) + partial_restore(&base_url, &state_sql_path, jump_back) } } } diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 3da6ab8..107e4b9 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -135,9 +135,10 @@ fn decompress_file(input_path: &Path, output_path: &Path) -> Result<()> { Ok(()) } -pub fn partial_restore(base_url: &str, conn: &Connection, jump_back: usize) -> Result<()> { +pub fn partial_restore(base_url: &str, target_db_path: &Path, jump_back: usize) -> Result<()> { let client = Client::new(); - let user_version = get_user_version(conn)?; + let conn = Connection::open(target_db_path)?; + let user_version = get_user_version(&conn)?; let remote_metadata = client .get(format!("{}/{}/metadata.csv", base_url, user_version)) .send()? @@ -146,7 +147,7 @@ pub fn partial_restore(base_url: &str, conn: &Connection, jump_back: usize) -> R .get(format!("{}/{}/restore.sql", base_url, user_version)) .send()? .text()?; - let latest_layer = get_latest_from_db(conn)?; + let latest_layer = get_latest_from_db(&conn)?; let layer_from = latest_layer + 1; // start with the first layer that is not in the DB let start_points = find_restore_points(layer_from, &remote_metadata, jump_back); if start_points.is_empty() { @@ -154,9 +155,16 @@ pub fn partial_restore(base_url: &str, conn: &Connection, jump_back: usize) -> R } let total = start_points.len(); println!("Found {total} potential restore points"); + drop(conn.close()); for (idx, p) in start_points.into_iter().enumerate() { - let previous_hash = get_previous_hash(p.from, conn)?; + // Reopen the DB on each iteration to force flushing all operations + // on the end of each iteration, when the connection is dropped. + // + // Note: the restore SQL query attaches the downloaded DB, but it + // does not DETACH it because it causes problems. + let conn = Connection::open(target_db_path)?; + let previous_hash = get_previous_hash(p.from, &conn)?; anyhow::ensure!( previous_hash == p.hash[..4], "unexpected hash: '{previous_hash}' doesn't match restore point {p:?}", @@ -199,8 +207,11 @@ mod tests { use rusqlite::{Connection, DatabaseName}; use tempfile::tempdir; - fn create_test_db() -> Connection { - let conn = Connection::open_in_memory().unwrap(); + fn create_test_db(path: Option<&Path>) -> Connection { + let conn = match path { + Some(path) => Connection::open(path).unwrap(), + None => Connection::open_in_memory().unwrap(), + }; conn .execute( "CREATE TABLE layers (id INTEGER, applied_block INTEGER, aggregated_hash BLOB)", @@ -249,7 +260,7 @@ mod tests { #[test] fn getting_previous_hash() { - let conn = create_test_db(); + let conn = create_test_db(None); insert_layer(&conn, 2, 100, &[0xAA, 0xBB]); let result = get_previous_hash(3, &conn).unwrap(); assert_eq!("aabb", result); @@ -257,7 +268,7 @@ mod tests { #[test] fn test_get_latest_from_db() { - let conn = create_test_db(); + let conn = create_test_db(None); insert_layer(&conn, 2, 100, &[0xAA, 0xBB]); let result = get_latest_from_db(&conn).unwrap(); assert_eq!(result, 2); @@ -265,7 +276,7 @@ mod tests { #[test] fn test_get_user_version() { - let conn = create_test_db(); + let conn = create_test_db(None); conn.execute("PRAGMA user_version = 42", []).unwrap(); let result = get_user_version(&conn).unwrap(); assert_eq!(result, 42); @@ -297,8 +308,11 @@ mod tests { #[test] fn partial_restore() { - let conn = create_test_db(); + let dir = tempdir().unwrap(); + let db_path = dir.path().join("state.db"); + let conn = create_test_db(Some(&db_path)); insert_layer(&conn, 99, 100, &[0xBB, 0xBB]); + let mut server = mockito::Server::new(); let user_version = 0; @@ -322,13 +336,14 @@ mod tests { .create(); // Restore SQL just copies contents of the `layers` table + // Note: there's no detach because the real restore query also + // doesn't do this (it causes problems). let mock_query = server .mock("GET", format!("/{user_version}/restore.sql").as_str()) .with_status(200) .with_body( r#"ATTACH DATABASE 'backup_source.db' AS src; - INSERT OR IGNORE INTO layers SELECT * from src.layers; - DETACH DATABASE src"#, + INSERT OR IGNORE INTO layers SELECT * from src.layers;"#, ) .create(); @@ -338,10 +353,10 @@ mod tests { .map(|(hash, point)| { // For simplicity, the database used to restore contains only // the last layer of the point and its expected hash. - let conn = create_test_db(); + let conn = create_test_db(None); let hash = hex::decode(hash).unwrap(); insert_layer(&conn, point.to - 1, 111, &hash); - let dir = tempdir().unwrap(); + let checkpoint = dir.path().join("checkpoint.db"); conn.backup(DatabaseName::Main, &checkpoint, None).unwrap(); @@ -354,7 +369,7 @@ mod tests { }) .collect::>(); - super::partial_restore(&server.url(), &conn, 0).unwrap(); + super::partial_restore(&server.url(), &db_path, 0).unwrap(); mock_metadata.assert(); mock_query.assert(); @@ -371,7 +386,9 @@ mod tests { #[test] fn fails_on_hash_mismatch() { - let conn = create_test_db(); + let dir = tempdir().unwrap(); + let db_path = dir.path().join("state.db"); + let conn = create_test_db(Some(&db_path)); insert_layer(&conn, 99, 100, &[0xFF, 0xFF]); let mut server = mockito::Server::new(); let user_version = 0; @@ -389,7 +406,7 @@ mod tests { .with_body(".import backup_source.db layers") .create(); - let err = super::partial_restore(&server.url(), &conn, 0).unwrap_err(); + let err = super::partial_restore(&server.url(), &db_path, 0).unwrap_err(); assert!(err.to_string().contains("unexpected hash")); mock_metadata.assert(); mock_query.assert(); From 16df5f9fe2e182ea77f6b1c70a1618b34eda0ccc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 2 Oct 2024 14:12:32 +0200 Subject: [PATCH 07/18] fix tests on windows --- src/partial_quicksync.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 107e4b9..538623a 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -310,8 +310,10 @@ mod tests { fn partial_restore() { let dir = tempdir().unwrap(); let db_path = dir.path().join("state.db"); - let conn = create_test_db(Some(&db_path)); - insert_layer(&conn, 99, 100, &[0xBB, 0xBB]); + { + let conn = create_test_db(Some(&db_path)); + insert_layer(&conn, 99, 100, &[0xBB, 0xBB]); + } let mut server = mockito::Server::new(); let user_version = 0; @@ -377,6 +379,7 @@ mod tests { mock.assert(); } + let conn = Connection::open(&db_path).unwrap(); let latest = get_latest_from_db(&conn).unwrap(); assert_eq!(latest, points.last().unwrap().1.to - 1); @@ -388,8 +391,10 @@ mod tests { fn fails_on_hash_mismatch() { let dir = tempdir().unwrap(); let db_path = dir.path().join("state.db"); - let conn = create_test_db(Some(&db_path)); - insert_layer(&conn, 99, 100, &[0xFF, 0xFF]); + { + let conn = create_test_db(Some(&db_path)); + insert_layer(&conn, 99, 100, &[0xFF, 0xFF]); + } let mut server = mockito::Server::new(); let user_version = 0; From 3f6b83826b7a1efc7af1606f712e90a464ac9bcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Wed, 2 Oct 2024 15:56:22 +0200 Subject: [PATCH 08/18] Close the DB connection before removing the attached restore DB --- src/partial_quicksync.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 538623a..03fa8b3 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -155,11 +155,14 @@ pub fn partial_restore(base_url: &str, target_db_path: &Path, jump_back: usize) } let total = start_points.len(); println!("Found {total} potential restore points"); - drop(conn.close()); + conn.close().expect("closing DB connection"); + + let source_db_path_zst = &Path::new("backup_source.db.zst"); + let source_db_path = &Path::new("backup_source.db"); for (idx, p) in start_points.into_iter().enumerate() { // Reopen the DB on each iteration to force flushing all operations - // on the end of each iteration, when the connection is dropped. + // on the end of each iteration, when the connection is closed. // // Note: the restore SQL query attaches the downloaded DB, but it // does not DETACH it because it causes problems. @@ -169,14 +172,13 @@ pub fn partial_restore(base_url: &str, target_db_path: &Path, jump_back: usize) previous_hash == p.hash[..4], "unexpected hash: '{previous_hash}' doesn't match restore point {p:?}", ); - let source_db_path_zst = &Path::new("backup_source.db.zst"); - let source_db_path = &Path::new("backup_source.db"); if download_file(&client, base_url, user_version, &p, source_db_path_zst).is_err() { download_file(&client, base_url, user_version, &p, source_db_path)?; } else { decompress_file(source_db_path_zst, source_db_path)?; - fs::remove_file(source_db_path_zst)?; + fs::remove_file(source_db_path_zst) + .with_context(|| format!("removing {}", source_db_path_zst.display()))?; } println!("[{idx}/{total}] Restoring from {} to {}...", p.from, p.to); @@ -184,12 +186,16 @@ pub fn partial_restore(base_url: &str, target_db_path: &Path, jump_back: usize) conn .execute_batch(&restore_string) .context("executing restore")?; - fs::remove_file(source_db_path)?; + conn.close().expect("closing DB connection"); + let duration = start.elapsed(); println!( "[{idx}/{total}] Restored {} to {} in {:?}", p.from, p.to, duration ); + + fs::remove_file(source_db_path) + .with_context(|| format!("removing {}", source_db_path.display()))?; } Ok(()) } From 83267c6b6cc4f81361bd48b2f52273692b06aae9 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Wed, 2 Oct 2024 20:56:48 +0200 Subject: [PATCH 09/18] Fixed IF to not look for wrong layers --- src/partial_quicksync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 03fa8b3..3ae432e 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -46,7 +46,7 @@ fn find_restore_points(layer_from: u32, metadata: &str, jump_back: usize) -> Vec for (index, line) in metadata.trim().lines().enumerate() { let point = RestorePoint::from_str(line.trim()).expect("parsing restore point"); - if point.to > layer_from && target_index.is_none() { + if point.from > layer_from && point.to > layer_from && target_index.is_none() { target_index = Some(index); } all_points.push(point); From 8e7ede5f3b17bd71753c4c45c9805acdaac14bdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Thu, 3 Oct 2024 11:51:46 +0200 Subject: [PATCH 10/18] add --untrusted-layers and more tests + fixes --- src/main.rs | 7 +- src/partial_quicksync.rs | 245 ++++++++++++++++++++++++++++++--------- 2 files changed, 199 insertions(+), 53 deletions(-) diff --git a/src/main.rs b/src/main.rs index db53374..cf07a57 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,6 +86,10 @@ enum Commands { /// Path to the node state.sql #[clap(short = 's', long)] state_sql: PathBuf, + /// Number of layers present in the DB that are not trusted to be fully synced. + /// These layers will also be synced. + #[clap(long, default_value_t = 10)] + untrusted_layers: u32, /// Jump-back to recover earlier than latest layer. It will jump back one row in recovery metadata #[clap(short = 'j', long, default_value_t = 0)] jump_back: usize, @@ -327,6 +331,7 @@ fn main() -> anyhow::Result<()> { } Commands::Partial { state_sql, + untrusted_layers, jump_back, base_url, } => { @@ -337,7 +342,7 @@ fn main() -> anyhow::Result<()> { { return Err(anyhow!("state file not found: {:?}", state_sql_path)); } - partial_restore(&base_url, &state_sql_path, jump_back) + partial_restore(&base_url, &state_sql_path, untrusted_layers, jump_back) } } } diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 3ae432e..ad0c87b 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -46,24 +46,26 @@ fn find_restore_points(layer_from: u32, metadata: &str, jump_back: usize) -> Vec for (index, line) in metadata.trim().lines().enumerate() { let point = RestorePoint::from_str(line.trim()).expect("parsing restore point"); - if point.from > layer_from && point.to > layer_from && target_index.is_none() { + if (point.from..point.to).contains(&layer_from) && target_index.is_none() { target_index = Some(index); } all_points.push(point); } // A None `target_index` means there aren't any layers > `layer_from` // in the data described by `metadata`. - let target_index = if let Some(t) = target_index { - if t >= jump_back { - t - jump_back - } else { - 0 + match target_index { + Some(t) => { + all_points.drain(..t.saturating_sub(jump_back)); + } + None if jump_back == 0 => { + all_points.drain(..); + } + None => { + all_points.drain(..all_points.len().saturating_sub(jump_back)); } - } else { - return vec![]; }; - all_points.split_off(target_index) + all_points } fn get_latest_from_db(conn: &Connection) -> Result { @@ -135,7 +137,12 @@ fn decompress_file(input_path: &Path, output_path: &Path) -> Result<()> { Ok(()) } -pub fn partial_restore(base_url: &str, target_db_path: &Path, jump_back: usize) -> Result<()> { +pub fn partial_restore( + base_url: &str, + target_db_path: &Path, + untrusted_layers: u32, + jump_back: usize, +) -> Result<()> { let client = Client::new(); let conn = Connection::open(target_db_path)?; let user_version = get_user_version(&conn)?; @@ -143,16 +150,17 @@ pub fn partial_restore(base_url: &str, target_db_path: &Path, jump_back: usize) .get(format!("{}/{}/metadata.csv", base_url, user_version)) .send()? .text()?; + + let latest_layer = get_latest_from_db(&conn)?; + let layer_from = (latest_layer + 1).saturating_sub(untrusted_layers); + let start_points = find_restore_points(layer_from, &remote_metadata, jump_back); + anyhow::ensure!(!start_points.is_empty(), "no suitable restore point found"); + let restore_string = client .get(format!("{}/{}/restore.sql", base_url, user_version)) .send()? .text()?; - let latest_layer = get_latest_from_db(&conn)?; - let layer_from = latest_layer + 1; // start with the first layer that is not in the DB - let start_points = find_restore_points(layer_from, &remote_metadata, jump_back); - if start_points.is_empty() { - anyhow::bail!("No suitable restore point found"); - } + let total = start_points.len(); println!("Found {total} potential restore points"); conn.close().expect("closing DB connection"); @@ -167,11 +175,13 @@ pub fn partial_restore(base_url: &str, target_db_path: &Path, jump_back: usize) // Note: the restore SQL query attaches the downloaded DB, but it // does not DETACH it because it causes problems. let conn = Connection::open(target_db_path)?; - let previous_hash = get_previous_hash(p.from, &conn)?; - anyhow::ensure!( - previous_hash == p.hash[..4], - "unexpected hash: '{previous_hash}' doesn't match restore point {p:?}", - ); + if p.from != 0 { + let previous_hash = get_previous_hash(p.from, &conn)?; + anyhow::ensure!( + previous_hash == p.hash[..4], + "unexpected hash: '{previous_hash}' doesn't match restore point {p:?}", + ); + } if download_file(&client, base_url, user_version, &p, source_db_path_zst).is_err() { download_file(&client, base_url, user_version, &p, source_db_path)?; @@ -202,8 +212,12 @@ pub fn partial_restore(base_url: &str, target_db_path: &Path, jump_back: usize) #[cfg(test)] impl RestorePoint { - fn new(from: u32, to: u32, hash: String) -> Self { - Self { from, to, hash } + fn new>(from: u32, to: u32, hash: H) -> Self { + Self { + from, + to, + hash: hash.into(), + } } } @@ -228,31 +242,64 @@ mod tests { } #[test] - fn test_find_start_points() { + fn restore_points_dont_have_missing_data() { let metadata = r#" - 0,100,aaaa - 101,200,bbbb - 201,300,ijkl + 100,200,bbbb + 200,300,ijkl "#; + // 90-100 are not available for restore + let result = find_restore_points(90, metadata, 0); + assert!(result.is_empty()); + } + + #[test] + fn finding_restore_points() { + let points = [ + RestorePoint::new(0, 100, "aaaa"), + RestorePoint::new(100, 200, "bbbb"), + RestorePoint::new(200, 300, "ijkl"), + ]; + let metadata = &points + .iter() + .map(|p| p.to_string()) + .collect::>() + .join("\n"); + + let result = find_restore_points(99, metadata, 0); + assert_eq!(result, points); + + let result = find_restore_points(100, metadata, 0); + assert_eq!(result, points[1..]); + + let result = find_restore_points(101, metadata, 0); + assert_eq!(result, points[1..]); + + let result = find_restore_points(101, metadata, 1); + assert_eq!(result, points); + let result = find_restore_points(150, metadata, 0); - assert_eq!(result.len(), 2); - assert_eq!(result[0].from, 101); - assert_eq!(result[0].to, 200); - assert_eq!(result[0].hash, "bbbb"); + assert_eq!(result, points[1..]); let result = find_restore_points(150, metadata, 1); - assert_eq!(result.len(), 3); - assert_eq!(result[0].from, 0); - assert_eq!(result[0].to, 100); - assert_eq!(result[0].hash, "aaaa"); + assert_eq!(result, points); // `jump_back` over the first point - let result2 = find_restore_points(150, metadata, 5); - assert_eq!(result, result2); + let result = find_restore_points(150, metadata, 5); + assert_eq!(result, points); - // no points for the requested starting layer - let result = find_restore_points(500, metadata, 1); + let result = find_restore_points(300, metadata, 0); assert!(result.is_empty()); + + // synced but jumping back 1 + let result = find_restore_points(300, metadata, 1); + assert_eq!(result, points[2..]); + + // synced but jumping back 1 + let result = find_restore_points(300, metadata, 2); + assert_eq!(result, points[1..]); + + let result = find_restore_points(500, metadata, 1); + assert_eq!(result, points[2..]); } fn insert_layer(conn: &Connection, id: u32, applied_block: i64, hash: &[u8]) { @@ -325,10 +372,10 @@ mod tests { let user_version = 0; let points = [ - ("bbbb", RestorePoint::new(0, 100, "aaaa".into())), - ("cccc", RestorePoint::new(100, 200, "bbbb".into())), - ("dddd", RestorePoint::new(200, 300, "cccc".into())), - ("eeee", RestorePoint::new(300, 400, "dddd".into())), + ("bbbb", RestorePoint::new(0, 100, "aaaa")), + ("cccc", RestorePoint::new(100, 200, "bbbb")), + ("dddd", RestorePoint::new(200, 300, "cccc")), + ("eeee", RestorePoint::new(300, 400, "dddd")), ]; let metadata = points @@ -338,8 +385,7 @@ mod tests { .join("\n"); let mock_metadata = server - .mock("GET", format!("/{user_version}/metadata.csv").as_str()) - .with_status(200) + .mock("GET", "/0/metadata.csv") .with_body(metadata) .create(); @@ -347,8 +393,7 @@ mod tests { // Note: there's no detach because the real restore query also // doesn't do this (it causes problems). let mock_query = server - .mock("GET", format!("/{user_version}/restore.sql").as_str()) - .with_status(200) + .mock("GET", "/0/restore.sql") .with_body( r#"ATTACH DATABASE 'backup_source.db' AS src; INSERT OR IGNORE INTO layers SELECT * from src.layers;"#, @@ -371,13 +416,90 @@ mod tests { let file_url = file_url(user_version, point, None); server .mock("GET", format!("/{file_url}").as_str()) - .with_status(200) .with_body(std::fs::read(&checkpoint).unwrap()) .create() }) .collect::>(); - super::partial_restore(&server.url(), &db_path, 0).unwrap(); + super::partial_restore(&server.url(), &db_path, 0, 0).unwrap(); + + mock_metadata.assert(); + mock_query.assert(); + for mock in data_mocks { + mock.assert(); + } + + let conn = Connection::open(&db_path).unwrap(); + let latest = get_latest_from_db(&conn).unwrap(); + assert_eq!(latest, points.last().unwrap().1.to - 1); + + let result = get_previous_hash(latest + 1, &conn).unwrap(); + assert_eq!(result, points.last().unwrap().0); + } + + #[test] + fn partial_restore_with_untrusted_layers() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("state.db"); + { + let conn = create_test_db(Some(&db_path)); + insert_layer(&conn, 99, 100, &[0xBB, 0xBB]); + } + + let mut server = mockito::Server::new(); + let user_version = 0; + + let points = [ + ("bbbb", RestorePoint::new(0, 100, "aaaa")), + ("cccc", RestorePoint::new(100, 200, "bbbb")), + ("dddd", RestorePoint::new(200, 300, "cccc")), + ("eeee", RestorePoint::new(300, 400, "dddd")), + ]; + + let metadata = points + .iter() + .map(|(_, p)| p.to_string()) + .collect::>() + .join("\n"); + + let mock_metadata = server + .mock("GET", "/0/metadata.csv") + .with_body(metadata) + .create(); + + // Restore SQL just copies contents of the `layers` table + // Note: there's no detach because the real restore query also + // doesn't do this (it causes problems). + let mock_query = server + .mock("GET", "/0/restore.sql") + .with_body( + r#"ATTACH DATABASE 'backup_source.db' AS src; + INSERT OR IGNORE INTO layers SELECT * from src.layers;"#, + ) + .create(); + + let data_mocks = points + .iter() + .map(|(hash, point)| { + // For simplicity, the database used to restore contains only + // the last layer of the point and its expected hash. + let conn = create_test_db(None); + let hash = hex::decode(hash).unwrap(); + insert_layer(&conn, point.to - 1, 111, &hash); + + let checkpoint = dir.path().join("checkpoint.db"); + conn.backup(DatabaseName::Main, &checkpoint, None).unwrap(); + + let file_url = file_url(user_version, point, None); + server + .mock("GET", format!("/{file_url}").as_str()) + .with_body(std::fs::read(&checkpoint).unwrap()) + .create() + }) + .collect::>(); + + let untrusted_layers = 10; + super::partial_restore(&server.url(), &db_path, untrusted_layers, 0).unwrap(); mock_metadata.assert(); mock_query.assert(); @@ -407,19 +529,38 @@ mod tests { let metadata = RestorePoint::new(100, 200, "aaaa".to_string()).to_string(); let mock_metadata = server .mock("GET", format!("/{user_version}/metadata.csv").as_str()) - .with_status(200) .with_body(metadata) .create(); let mock_query = server .mock("GET", format!("/{user_version}/restore.sql").as_str()) - .with_status(200) .with_body(".import backup_source.db layers") .create(); - let err = super::partial_restore(&server.url(), &db_path, 0).unwrap_err(); + let err = super::partial_restore(&server.url(), &db_path, 0, 0).unwrap_err(); assert!(err.to_string().contains("unexpected hash")); mock_metadata.assert(); mock_query.assert(); } + + #[test] + fn no_matching_restore_points() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("state.db"); + { + let conn = create_test_db(Some(&db_path)); + insert_layer(&conn, 80, 100, &[0xFF, 0xFF]); + } + let mut server = mockito::Server::new(); + + let metadata = RestorePoint::new(200, 300, "aaaa".to_string()).to_string(); + let mock_metadata = server + .mock("GET", "/0/metadata.csv") + .with_body(metadata) + .create(); + + let err = super::partial_restore(&server.url(), &db_path, 0, 0).unwrap_err(); + assert!(err.to_string().contains("no suitable restore point found")); + mock_metadata.assert(); + } } From 9613a3ebdab2a16a586b707604e575e82f022ce0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Thu, 3 Oct 2024 12:59:13 +0200 Subject: [PATCH 11/18] fix tests running in parallel, using the same backup_source.db file --- src/main.rs | 9 ++++++- src/partial_quicksync.rs | 54 +++++++++++++++++++--------------------- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/src/main.rs b/src/main.rs index cf07a57..df9fb41 100644 --- a/src/main.rs +++ b/src/main.rs @@ -342,7 +342,14 @@ fn main() -> anyhow::Result<()> { { return Err(anyhow!("state file not found: {:?}", state_sql_path)); } - partial_restore(&base_url, &state_sql_path, untrusted_layers, jump_back) + let download_path = resolve_path(Path::new(".")).unwrap(); + partial_restore( + &base_url, + &state_sql_path, + &download_path, + untrusted_layers, + jump_back, + ) } } } diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index ad0c87b..7ab2671 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -1,9 +1,10 @@ use anyhow::{Context, Result}; use reqwest::blocking::Client; use rusqlite::Connection; +use std::{fs, io}; use std::{ - fs::{self, File}, - io::{self, BufReader, BufWriter}, + fs::File, + io::{BufReader, BufWriter}, path::Path, str::FromStr, time::Instant, @@ -140,6 +141,7 @@ fn decompress_file(input_path: &Path, output_path: &Path) -> Result<()> { pub fn partial_restore( base_url: &str, target_db_path: &Path, + download_path: &Path, untrusted_layers: u32, jump_back: usize, ) -> Result<()> { @@ -165,8 +167,8 @@ pub fn partial_restore( println!("Found {total} potential restore points"); conn.close().expect("closing DB connection"); - let source_db_path_zst = &Path::new("backup_source.db.zst"); - let source_db_path = &Path::new("backup_source.db"); + let source_db_path_zst = &download_path.join("backup_source.db.zst"); + let source_db_path = &download_path.join("backup_source.db"); for (idx, p) in start_points.into_iter().enumerate() { // Reopen the DB on each iteration to force flushing all operations @@ -213,11 +215,8 @@ pub fn partial_restore( #[cfg(test)] impl RestorePoint { fn new>(from: u32, to: u32, hash: H) -> Self { - Self { - from, - to, - hash: hash.into(), - } + let hash = hash.into(); + Self { from, to, hash } } } @@ -369,7 +368,6 @@ mod tests { } let mut server = mockito::Server::new(); - let user_version = 0; let points = [ ("bbbb", RestorePoint::new(0, 100, "aaaa")), @@ -394,10 +392,11 @@ mod tests { // doesn't do this (it causes problems). let mock_query = server .mock("GET", "/0/restore.sql") - .with_body( - r#"ATTACH DATABASE 'backup_source.db' AS src; - INSERT OR IGNORE INTO layers SELECT * from src.layers;"#, - ) + .with_body(format!( + r#"ATTACH DATABASE '{}' AS src; + INSERT OR IGNORE INTO layers SELECT * from src.layers;"#, + dir.path().join("backup_source.db").display(), + )) .create(); let data_mocks = points @@ -413,7 +412,7 @@ mod tests { let checkpoint = dir.path().join("checkpoint.db"); conn.backup(DatabaseName::Main, &checkpoint, None).unwrap(); - let file_url = file_url(user_version, point, None); + let file_url = file_url(0, point, None); server .mock("GET", format!("/{file_url}").as_str()) .with_body(std::fs::read(&checkpoint).unwrap()) @@ -421,7 +420,7 @@ mod tests { }) .collect::>(); - super::partial_restore(&server.url(), &db_path, 0, 0).unwrap(); + super::partial_restore(&server.url(), &db_path, dir.path(), 0, 0).unwrap(); mock_metadata.assert(); mock_query.assert(); @@ -447,7 +446,6 @@ mod tests { } let mut server = mockito::Server::new(); - let user_version = 0; let points = [ ("bbbb", RestorePoint::new(0, 100, "aaaa")), @@ -472,10 +470,11 @@ mod tests { // doesn't do this (it causes problems). let mock_query = server .mock("GET", "/0/restore.sql") - .with_body( - r#"ATTACH DATABASE 'backup_source.db' AS src; - INSERT OR IGNORE INTO layers SELECT * from src.layers;"#, - ) + .with_body(format!( + r#"ATTACH DATABASE '{}' AS src; + INSERT OR IGNORE INTO layers SELECT * from src.layers;"#, + dir.path().join("backup_source.db").display(), + )) .create(); let data_mocks = points @@ -490,7 +489,7 @@ mod tests { let checkpoint = dir.path().join("checkpoint.db"); conn.backup(DatabaseName::Main, &checkpoint, None).unwrap(); - let file_url = file_url(user_version, point, None); + let file_url = file_url(0, point, None); server .mock("GET", format!("/{file_url}").as_str()) .with_body(std::fs::read(&checkpoint).unwrap()) @@ -499,7 +498,7 @@ mod tests { .collect::>(); let untrusted_layers = 10; - super::partial_restore(&server.url(), &db_path, untrusted_layers, 0).unwrap(); + super::partial_restore(&server.url(), &db_path, dir.path(), untrusted_layers, 0).unwrap(); mock_metadata.assert(); mock_query.assert(); @@ -524,20 +523,19 @@ mod tests { insert_layer(&conn, 99, 100, &[0xFF, 0xFF]); } let mut server = mockito::Server::new(); - let user_version = 0; let metadata = RestorePoint::new(100, 200, "aaaa".to_string()).to_string(); let mock_metadata = server - .mock("GET", format!("/{user_version}/metadata.csv").as_str()) + .mock("GET", "/0/metadata.csv") .with_body(metadata) .create(); let mock_query = server - .mock("GET", format!("/{user_version}/restore.sql").as_str()) + .mock("GET", "/0/restore.sql") .with_body(".import backup_source.db layers") .create(); - let err = super::partial_restore(&server.url(), &db_path, 0, 0).unwrap_err(); + let err = super::partial_restore(&server.url(), &db_path, dir.path(), 0, 0).unwrap_err(); assert!(err.to_string().contains("unexpected hash")); mock_metadata.assert(); mock_query.assert(); @@ -559,7 +557,7 @@ mod tests { .with_body(metadata) .create(); - let err = super::partial_restore(&server.url(), &db_path, 0, 0).unwrap_err(); + let err = super::partial_restore(&server.url(), &db_path, dir.path(), 0, 0).unwrap_err(); assert!(err.to_string().contains("no suitable restore point found")); mock_metadata.assert(); } From f8273ef287ad08b9882e431f4a48c9e1a179956d Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 7 Oct 2024 13:30:33 +0200 Subject: [PATCH 12/18] added partial to the readme --- README.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 62d237b..c293cc3 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ When a new node joins the Spacemesh network, it must first get up to speed with With Quicksync, instead of performing all of the syncing actions as stated above and calculating the network state from genesis, one just needs to download the current state from a trusted peer like the Spacemesh dev team or some other node. While this runs contrary to the web3 philosophy of "Don't trust, verify", we believe that this could be a choice some smeshers may be interested in given the high rate of trouble with syncing. Moreover, nothing precludes a smesher from verifying this state in the background once it is downloaded. -The state (also called an archive) that is downloaded is in the form of a state.sql file and can either be downloaded automatically using Smapp, or manually by using the `quicksync-rs` utility. +The state (also called an archive) that is downloaded is in the form of a state.sql file and can either be downloaded automatically using Smapp, or manually by using the `quicksync-rs` utility. Instructions for using `quicksync-rs` to download the latest state are given below. Note that if you use the latest version of Smapp, it will automatically offer to use quicksync to fetch the latest state. @@ -56,6 +56,16 @@ Listed below are the exit codes and what they mean: - `7` - Invalid checksum of archive. - `8` - Cannot validate archive checksum. + +# Partial quicksync + +It is also possible to download and apply delta-based quicksync. Assuming that the `state.sql` is already present, it's worth considering applying only deltas on top of that. +Please note that syncing large portions will be faster with full quicksync, but if you are already synced and just need to catch up with the latest state, partial quicksync is the way to go. + +Partial quicksync works by checking the latest verified layer in the database and then downloading small files (usually about 50MB but up to 200MB) and applying them on top of the existing `state.sql`. Each batch can be interrupted. + +Restoring the same batch twice is considered a no-op and will not affect the database. + ## Commands The list of available commands for the `quicksync` utility is presented below. Note that these commands are for Linux. Simply, Change `./quicksync` to `.\quicksync.exe` For the Windows commands. @@ -63,5 +73,6 @@ The list of available commands for the `quicksync` utility is presented below. N - `./quicksync download`: Downloads the latest `state.sql` file. - `./quicksync check`: Checks if the current `state.sql` is up to date. - `./quicksync help`: Displays all operations that `quicksync` can perform. +- `./quicksync partial`: Allows to work with delta based quicksync. - `./quicksync --version`: Displays the quicksync version. - `cargo run -- help`: Displays helpful commands for running the package. Relevant for developers. From 4ca3e2cc997976edebd82221f86f223c9865adab Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 7 Oct 2024 14:26:33 +0200 Subject: [PATCH 13/18] Better error msg when node too old. --- src/partial_quicksync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 7ab2671..7230ad2 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -156,7 +156,7 @@ pub fn partial_restore( let latest_layer = get_latest_from_db(&conn)?; let layer_from = (latest_layer + 1).saturating_sub(untrusted_layers); let start_points = find_restore_points(layer_from, &remote_metadata, jump_back); - anyhow::ensure!(!start_points.is_empty(), "no suitable restore point found"); + anyhow::ensure!(!start_points.is_empty(), "No suitable restore points found, seems that state.sql is too old"); let restore_string = client .get(format!("{}/{}/restore.sql", base_url, user_version)) @@ -558,7 +558,7 @@ mod tests { .create(); let err = super::partial_restore(&server.url(), &db_path, dir.path(), 0, 0).unwrap_err(); - assert!(err.to_string().contains("no suitable restore point found")); + assert!(err.to_string().contains("No suitable restore points found, seems that state.sql is too old")); mock_metadata.assert(); } } From 5edafe1be707a09d8210417e4e07d81d1814bbc8 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 7 Oct 2024 21:01:08 +0200 Subject: [PATCH 14/18] Fix one off error --- src/partial_quicksync.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 7230ad2..4409eaa 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -193,7 +193,8 @@ pub fn partial_restore( .with_context(|| format!("removing {}", source_db_path_zst.display()))?; } - println!("[{idx}/{total}] Restoring from {} to {}...", p.from, p.to); + let current_idx = idx + 1; + println!("[{current_idx}/{total}] Restoring from {} to {}...", p.from, p.to); let start = Instant::now(); conn .execute_batch(&restore_string) @@ -202,7 +203,7 @@ pub fn partial_restore( let duration = start.elapsed(); println!( - "[{idx}/{total}] Restored {} to {} in {:?}", + "[{current_idx}/{total}] Restored {} to {} in {:?}", p.from, p.to, duration ); From cea7c78a2d95851832091f125e28aad0ff131df1 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 7 Oct 2024 21:08:01 +0200 Subject: [PATCH 15/18] Added one log line informing about the used params --- src/partial_quicksync.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index 4409eaa..c61fec0 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -156,7 +156,10 @@ pub fn partial_restore( let latest_layer = get_latest_from_db(&conn)?; let layer_from = (latest_layer + 1).saturating_sub(untrusted_layers); let start_points = find_restore_points(layer_from, &remote_metadata, jump_back); - anyhow::ensure!(!start_points.is_empty(), "No suitable restore points found, seems that state.sql is too old"); + anyhow::ensure!( + !start_points.is_empty(), + "No suitable restore points found, seems that state.sql is too old" + ); let restore_string = client .get(format!("{}/{}/restore.sql", base_url, user_version)) @@ -164,6 +167,9 @@ pub fn partial_restore( .text()?; let total = start_points.len(); + println!( + "Looking for restore points with untrusted_layers={untrusted_layers}, jump_back={jump_back}" + ); println!("Found {total} potential restore points"); conn.close().expect("closing DB connection"); @@ -194,7 +200,10 @@ pub fn partial_restore( } let current_idx = idx + 1; - println!("[{current_idx}/{total}] Restoring from {} to {}...", p.from, p.to); + println!( + "[{current_idx}/{total}] Restoring from {} to {}...", + p.from, p.to + ); let start = Instant::now(); conn .execute_batch(&restore_string) @@ -559,7 +568,9 @@ mod tests { .create(); let err = super::partial_restore(&server.url(), &db_path, dir.path(), 0, 0).unwrap_err(); - assert!(err.to_string().contains("No suitable restore points found, seems that state.sql is too old")); + assert!(err + .to_string() + .contains("No suitable restore points found, seems that state.sql is too old")); mock_metadata.assert(); } } From 6e0053d5ff07556fd162358bef68e4d15bf81f7d Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 8 Oct 2024 18:26:37 +0200 Subject: [PATCH 16/18] Set the final url --- src/partial_quicksync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/partial_quicksync.rs b/src/partial_quicksync.rs index c61fec0..1cdbf48 100644 --- a/src/partial_quicksync.rs +++ b/src/partial_quicksync.rs @@ -11,7 +11,7 @@ use std::{ }; use zstd::stream::Decoder; -pub(crate) const DEFAULT_BASE_URL: &str = "https://quicksync.spacemesh.network/partials"; +pub(crate) const DEFAULT_BASE_URL: &str = "https://quicksync-partials.spacemesh.network"; #[derive(Clone, Debug, PartialEq, Eq, parse_display::Display, parse_display::FromStr)] #[display("{from},{to},{hash}")] From abf1d93f4059aec4ee0069de43499fe57694942d Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Wed, 9 Oct 2024 09:26:31 +0200 Subject: [PATCH 17/18] Added beta disclaimer --- src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main.rs b/src/main.rs index df9fb41..8e5944a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -335,6 +335,7 @@ fn main() -> anyhow::Result<()> { jump_back, base_url, } => { + println!("Partial quicksync is considered to be beta feature for now"); let state_sql_path = resolve_path(&state_sql).context("resolving state.sql path")?; if !state_sql_path .try_exists() From 44d1cd9308602b66054f6ad6e586f650b58f0cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Nowak?= Date: Wed, 9 Oct 2024 10:59:15 +0200 Subject: [PATCH 18/18] Add warning to the beta log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartosz Różański --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 8e5944a..262abee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -335,7 +335,7 @@ fn main() -> anyhow::Result<()> { jump_back, base_url, } => { - println!("Partial quicksync is considered to be beta feature for now"); + println!("Warning: partial quicksync is considered to be beta feature for now"); let state_sql_path = resolve_path(&state_sql).context("resolving state.sql path")?; if !state_sql_path .try_exists()