From 34ac1b8f33aa18f1e5f274c61a7b35222194001e Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 11 Jan 2024 10:36:58 +0200 Subject: [PATCH 1/4] create ebt directory on startup --- solar/src/config.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/solar/src/config.rs b/solar/src/config.rs index a11c54a..a2c4ec0 100644 --- a/solar/src/config.rs +++ b/solar/src/config.rs @@ -53,26 +53,29 @@ impl ApplicationConfig { /// Create the root data directory for solar, along with the feed and blob /// directories. This is where all application data is stored, including /// the public-private keypair, key-value database and blob store. - fn create_data_directories(path: Option) -> Result<(PathBuf, PathBuf)> { + fn create_data_directories(path: Option) -> Result<(PathBuf, PathBuf, PathBuf)> { let base_path = path.unwrap_or(BaseDirectories::new()?.create_data_directory("solar")?); // Define the directory name for the feed store. let feeds_path = base_path.join("feeds"); // Define the directory name for the blob store. let blobs_path = base_path.join("blobs"); + // Define the directory name for the EBT vector clocks. + let ebt_path = base_path.join("ebt"); - // Create the feed and blobs directories. + // Create the feed, blobs and ebt directories. std::fs::create_dir_all(&feeds_path)?; std::fs::create_dir_all(blobs_path)?; + std::fs::create_dir_all(&ebt_path)?; - Ok((base_path, feeds_path)) + Ok((base_path, feeds_path, ebt_path)) } /// Configure the application based on CLI options, environment variables /// and defaults. pub fn new(path: Option) -> Result { // Create the application data directories if they don't already exist. - let (base_path, feeds_path) = Self::create_data_directories(path)?; + let (base_path, feeds_path, _ebt_path) = Self::create_data_directories(path)?; info!("Base directory is {:?}", base_path); From 112e22254c51f1a45b695fd23d791be7b69f2c8f Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 11 Jan 2024 14:09:20 +0200 Subject: [PATCH 2/4] define path for ebt peer clocks --- solar/src/node.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/solar/src/node.rs b/solar/src/node.rs index 128fb08..8e3c5fa 100644 --- a/solar/src/node.rs +++ b/solar/src/node.rs @@ -43,6 +43,7 @@ impl Node { // Define the directory name for the blob store. let blobs_path = config .base_path + .as_ref() .expect("Base path not supplied") .join("blobs"); @@ -136,11 +137,18 @@ impl Node { // intervals). Broker::spawn(connection_scheduler::actor(peers_to_dial)); + // Define the directory name for the ebt clock store. + let ebt_path = config + .base_path + .expect("Base path not supplied") + .join("ebt"); + // Spawn the EBT replication manager actor. let ebt_replication_manager = EbtManager::default(); Broker::spawn(EbtManager::event_loop( ebt_replication_manager, owned_identity.id, + ebt_path, )); // Spawn the connection manager message loop. From f069093297813103cb7d982dbca69b9f7ee1dde8 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Thu, 11 Jan 2024 14:09:56 +0200 Subject: [PATCH 3/4] persist peer vector clocks to disk before shutdown --- solar/src/actors/replication/ebt/manager.rs | 28 ++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 6432f6a..59351fc 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -10,6 +10,8 @@ use std::{ collections::{HashMap, HashSet}, fmt::Display, + fs, + path::PathBuf, }; use async_std::task; @@ -120,8 +122,6 @@ impl Default for EbtManager { impl EbtManager { // Read peer clock state from file. // fn load_peer_clocks() - // Write peer clock state to file. - // fn persist_peer_clocks() /// Initialise the local clock based on peers to be replicated. /// @@ -166,6 +166,25 @@ impl EbtManager { } } + /// Persist all peer clocks to disk (`ebt` directory). + fn persist_peer_clocks(&self, ebt_config_path: PathBuf) -> Result<()> { + for (ssb_id, clock) in self.peer_clocks.iter() { + // Format ID as: @.ed25519, replacing an `/` characters + // with `-`. + let clock_author_id = + format!("@{}", ssb_id.to_string().replace('/', "-").replace('=', "")); + + let clock_filepath = ebt_config_path.join(clock_author_id); + let json_clock = serde_json::to_string(clock)?; + + fs::write(clock_filepath, json_clock)?; + + debug!("Wrote vector clock to file for: {}", ssb_id); + } + + Ok(()) + } + /// Retrieve the stored vector clock for the first peer, check for the /// second peer in the vector clock and return the value of the receive /// flag. @@ -594,7 +613,7 @@ impl EbtManager { /// /// Listen for EBT event messages via the broker and update EBT session /// state accordingly. - pub async fn event_loop(mut self, local_id: SsbId) -> Result<()> { + pub async fn event_loop(mut self, local_id: SsbId, ebt_config_path: PathBuf) -> Result<()> { debug!("Started EBT event loop"); // Set the ID (@-prefixed public key) of the local node. @@ -681,6 +700,9 @@ impl EbtManager { } } + // Write all peer clocks to disk before exiting. + self.persist_peer_clocks(ebt_config_path)?; + Ok(()) } } From d8fae0f0e791b19aac0dbdd7268e5294b81eee10 Mon Sep 17 00:00:00 2001 From: mycognosist Date: Wed, 17 Jan 2024 10:27:37 +0200 Subject: [PATCH 4/4] load peer vector clocks on startup --- solar/src/actors/replication/ebt/manager.rs | 56 +++++++++++++++++---- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 59351fc..37f8496 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -10,7 +10,8 @@ use std::{ collections::{HashMap, HashSet}, fmt::Display, - fs, + fs::{self, File}, + io::Read, path::PathBuf, }; @@ -120,14 +121,11 @@ impl Default for EbtManager { } impl EbtManager { - // Read peer clock state from file. - // fn load_peer_clocks() - /// Initialise the local clock based on peers to be replicated. /// /// This defines the public keys of all feeds we wish to replicate, /// along with the latest sequence number for each. - async fn init_local_clock(&mut self) -> Result<()> { + async fn init_local_clock(&mut self, ebt_config_path: &PathBuf) -> Result<()> { debug!("Initialising local EBT clock"); let local_id = self.local_id.to_owned(); @@ -143,7 +141,8 @@ impl EbtManager { } } - // TODO: Load peer clocks from file and update `peer_clocks`. + // Load peer clocks from file and update `peer_clocks`. + self.load_peer_clocks(ebt_config_path)?; Ok(()) } @@ -166,11 +165,50 @@ impl EbtManager { } } + /// Load all peer clocks from disk (`ebt` directory). + fn load_peer_clocks(&mut self, ebt_config_path: &PathBuf) -> Result<()> { + // Iterate over all stored vector clocks in the directory. + if let Ok(entries) = fs::read_dir(ebt_config_path) { + for clock_entry in entries.flatten() { + // Get the SSB ID of the vector clock from the filename. + let clock_filename = clock_entry.file_name(); + let ssb_id = clock_filename.into_string().map_err(|os_string| { + Error::Other(format!( + "Invalid unicode in EBT clock filename: {:?}", + os_string + )) + })?; + + // Format the SSB ID as: =.ed25519, replacing + // any `-` characters with `/`. + // + // TODO: Rewrite this to avoid extra allocations. + let mut ssb_id = ssb_id.replace('@', "").replace('-', "/"); + if let Some(dot_index) = ssb_id.find('.') { + ssb_id.insert(dot_index, '=') + } + + // Read and parse the vector clock from the file. + let mut clock_file = File::open(&clock_entry.path())?; + let mut clock_file_contents = String::new(); + clock_file.read_to_string(&mut clock_file_contents)?; + let clock: VectorClock = serde_json::from_str(&clock_file_contents)?; + + // Set the vector clock in memory. + self.set_clock(&ssb_id, clock); + + debug!("Loaded vector clock from file for: {}", ssb_id) + } + } + + Ok(()) + } + /// Persist all peer clocks to disk (`ebt` directory). fn persist_peer_clocks(&self, ebt_config_path: PathBuf) -> Result<()> { for (ssb_id, clock) in self.peer_clocks.iter() { - // Format ID as: @.ed25519, replacing an `/` characters - // with `-`. + // Format the SSB ID as: @.ed25519, replacing any `/` + // characters with `-`. let clock_author_id = format!("@{}", ssb_id.to_string().replace('/', "-").replace('=', "")); @@ -620,7 +658,7 @@ impl EbtManager { self.local_id = local_id; // Initialise the local clock based on peers to be replicated. - self.init_local_clock().await?; + self.init_local_clock(&ebt_config_path).await?; // Register the EBT event loop actor with the broker. let ActorEndpoint {