diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 6432f6a..37f8496 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -10,6 +10,9 @@ use std::{ collections::{HashMap, HashSet}, fmt::Display, + fs::{self, File}, + io::Read, + path::PathBuf, }; use async_std::task; @@ -118,16 +121,11 @@ impl Default for EbtManager { } impl EbtManager { - // Read peer clock state from file. - // fn load_peer_clocks() - // Write peer clock state to file. - // fn persist_peer_clocks() - /// Initialise the local clock based on peers to be replicated. /// /// This defines the public keys of all feeds we wish to replicate, /// along with the latest sequence number for each. - async fn init_local_clock(&mut self) -> Result<()> { + async fn init_local_clock(&mut self, ebt_config_path: &PathBuf) -> Result<()> { debug!("Initialising local EBT clock"); let local_id = self.local_id.to_owned(); @@ -143,7 +141,8 @@ impl EbtManager { } } - // TODO: Load peer clocks from file and update `peer_clocks`. + // Load peer clocks from file and update `peer_clocks`. + self.load_peer_clocks(ebt_config_path)?; Ok(()) } @@ -166,6 +165,64 @@ impl EbtManager { } } + /// Load all peer clocks from disk (`ebt` directory). + fn load_peer_clocks(&mut self, ebt_config_path: &PathBuf) -> Result<()> { + // Iterate over all stored vector clocks in the directory. + if let Ok(entries) = fs::read_dir(ebt_config_path) { + for clock_entry in entries.flatten() { + // Get the SSB ID of the vector clock from the filename. + let clock_filename = clock_entry.file_name(); + let ssb_id = clock_filename.into_string().map_err(|os_string| { + Error::Other(format!( + "Invalid unicode in EBT clock filename: {:?}", + os_string + )) + })?; + + // Format the SSB ID as: =.ed25519, replacing + // any `-` characters with `/`. + // + // TODO: Rewrite this to avoid extra allocations. + let mut ssb_id = ssb_id.replace('@', "").replace('-', "/"); + if let Some(dot_index) = ssb_id.find('.') { + ssb_id.insert(dot_index, '=') + } + + // Read and parse the vector clock from the file. + let mut clock_file = File::open(&clock_entry.path())?; + let mut clock_file_contents = String::new(); + clock_file.read_to_string(&mut clock_file_contents)?; + let clock: VectorClock = serde_json::from_str(&clock_file_contents)?; + + // Set the vector clock in memory. + self.set_clock(&ssb_id, clock); + + debug!("Loaded vector clock from file for: {}", ssb_id) + } + } + + Ok(()) + } + + /// Persist all peer clocks to disk (`ebt` directory). + fn persist_peer_clocks(&self, ebt_config_path: PathBuf) -> Result<()> { + for (ssb_id, clock) in self.peer_clocks.iter() { + // Format the SSB ID as: @.ed25519, replacing any `/` + // characters with `-`. + let clock_author_id = + format!("@{}", ssb_id.to_string().replace('/', "-").replace('=', "")); + + let clock_filepath = ebt_config_path.join(clock_author_id); + let json_clock = serde_json::to_string(clock)?; + + fs::write(clock_filepath, json_clock)?; + + debug!("Wrote vector clock to file for: {}", ssb_id); + } + + Ok(()) + } + /// Retrieve the stored vector clock for the first peer, check for the /// second peer in the vector clock and return the value of the receive /// flag. @@ -594,14 +651,14 @@ impl EbtManager { /// /// Listen for EBT event messages via the broker and update EBT session /// state accordingly. - pub async fn event_loop(mut self, local_id: SsbId) -> Result<()> { + pub async fn event_loop(mut self, local_id: SsbId, ebt_config_path: PathBuf) -> Result<()> { debug!("Started EBT event loop"); // Set the ID (@-prefixed public key) of the local node. self.local_id = local_id; // Initialise the local clock based on peers to be replicated. - self.init_local_clock().await?; + self.init_local_clock(&ebt_config_path).await?; // Register the EBT event loop actor with the broker. let ActorEndpoint { @@ -681,6 +738,9 @@ impl EbtManager { } } + // Write all peer clocks to disk before exiting. + self.persist_peer_clocks(ebt_config_path)?; + Ok(()) } } diff --git a/solar/src/config.rs b/solar/src/config.rs index a11c54a..a2c4ec0 100644 --- a/solar/src/config.rs +++ b/solar/src/config.rs @@ -53,26 +53,29 @@ impl ApplicationConfig { /// Create the root data directory for solar, along with the feed and blob /// directories. This is where all application data is stored, including /// the public-private keypair, key-value database and blob store. - fn create_data_directories(path: Option) -> Result<(PathBuf, PathBuf)> { + fn create_data_directories(path: Option) -> Result<(PathBuf, PathBuf, PathBuf)> { let base_path = path.unwrap_or(BaseDirectories::new()?.create_data_directory("solar")?); // Define the directory name for the feed store. let feeds_path = base_path.join("feeds"); // Define the directory name for the blob store. let blobs_path = base_path.join("blobs"); + // Define the directory name for the EBT vector clocks. + let ebt_path = base_path.join("ebt"); - // Create the feed and blobs directories. + // Create the feed, blobs and ebt directories. std::fs::create_dir_all(&feeds_path)?; std::fs::create_dir_all(blobs_path)?; + std::fs::create_dir_all(&ebt_path)?; - Ok((base_path, feeds_path)) + Ok((base_path, feeds_path, ebt_path)) } /// Configure the application based on CLI options, environment variables /// and defaults. pub fn new(path: Option) -> Result { // Create the application data directories if they don't already exist. - let (base_path, feeds_path) = Self::create_data_directories(path)?; + let (base_path, feeds_path, _ebt_path) = Self::create_data_directories(path)?; info!("Base directory is {:?}", base_path); diff --git a/solar/src/node.rs b/solar/src/node.rs index 128fb08..8e3c5fa 100644 --- a/solar/src/node.rs +++ b/solar/src/node.rs @@ -43,6 +43,7 @@ impl Node { // Define the directory name for the blob store. let blobs_path = config .base_path + .as_ref() .expect("Base path not supplied") .join("blobs"); @@ -136,11 +137,18 @@ impl Node { // intervals). Broker::spawn(connection_scheduler::actor(peers_to_dial)); + // Define the directory name for the ebt clock store. + let ebt_path = config + .base_path + .expect("Base path not supplied") + .join("ebt"); + // Spawn the EBT replication manager actor. let ebt_replication_manager = EbtManager::default(); Broker::spawn(EbtManager::event_loop( ebt_replication_manager, owned_identity.id, + ebt_path, )); // Spawn the connection manager message loop.