Skip to content

Commit

Permalink
Merge pull request #86 from mycognosist/clock_persistence
Browse files Browse the repository at this point in the history
Load and persist peer vector clocks
  • Loading branch information
mycognosist authored Jan 17, 2024
2 parents fd4f76d + d8fae0f commit 79514e9
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 13 deletions.
78 changes: 69 additions & 9 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
use std::{
collections::{HashMap, HashSet},
fmt::Display,
fs::{self, File},
io::Read,
path::PathBuf,
};

use async_std::task;
Expand Down Expand Up @@ -118,16 +121,11 @@ impl Default for EbtManager {
}

impl EbtManager {
// Read peer clock state from file.
// fn load_peer_clocks()
// Write peer clock state to file.
// fn persist_peer_clocks()

/// Initialise the local clock based on peers to be replicated.
///
/// This defines the public keys of all feeds we wish to replicate,
/// along with the latest sequence number for each.
async fn init_local_clock(&mut self) -> Result<()> {
async fn init_local_clock(&mut self, ebt_config_path: &PathBuf) -> Result<()> {
debug!("Initialising local EBT clock");

let local_id = self.local_id.to_owned();
Expand All @@ -143,7 +141,8 @@ impl EbtManager {
}
}

// TODO: Load peer clocks from file and update `peer_clocks`.
// Load peer clocks from file and update `peer_clocks`.
self.load_peer_clocks(ebt_config_path)?;

Ok(())
}
Expand All @@ -166,6 +165,64 @@ impl EbtManager {
}
}

/// Load all peer clocks from disk (`ebt` directory).
fn load_peer_clocks(&mut self, ebt_config_path: &PathBuf) -> Result<()> {
// Iterate over all stored vector clocks in the directory.
if let Ok(entries) = fs::read_dir(ebt_config_path) {
for clock_entry in entries.flatten() {
// Get the SSB ID of the vector clock from the filename.
let clock_filename = clock_entry.file_name();
let ssb_id = clock_filename.into_string().map_err(|os_string| {
Error::Other(format!(
"Invalid unicode in EBT clock filename: {:?}",
os_string
))
})?;

// Format the SSB ID as: <PUBLIC_KEY>=.ed25519, replacing
// any `-` characters with `/`.
//
// TODO: Rewrite this to avoid extra allocations.
let mut ssb_id = ssb_id.replace('@', "").replace('-', "/");
if let Some(dot_index) = ssb_id.find('.') {
ssb_id.insert(dot_index, '=')
}

// Read and parse the vector clock from the file.
let mut clock_file = File::open(&clock_entry.path())?;
let mut clock_file_contents = String::new();
clock_file.read_to_string(&mut clock_file_contents)?;
let clock: VectorClock = serde_json::from_str(&clock_file_contents)?;

// Set the vector clock in memory.
self.set_clock(&ssb_id, clock);

debug!("Loaded vector clock from file for: {}", ssb_id)
}
}

Ok(())
}

/// Persist all peer clocks to disk (`ebt` directory).
fn persist_peer_clocks(&self, ebt_config_path: PathBuf) -> Result<()> {
for (ssb_id, clock) in self.peer_clocks.iter() {
// Format the SSB ID as: @<PUBLIC_KEY>.ed25519, replacing any `/`
// characters with `-`.
let clock_author_id =
format!("@{}", ssb_id.to_string().replace('/', "-").replace('=', ""));

let clock_filepath = ebt_config_path.join(clock_author_id);
let json_clock = serde_json::to_string(clock)?;

fs::write(clock_filepath, json_clock)?;

debug!("Wrote vector clock to file for: {}", ssb_id);
}

Ok(())
}

/// Retrieve the stored vector clock for the first peer, check for the
/// second peer in the vector clock and return the value of the receive
/// flag.
Expand Down Expand Up @@ -594,14 +651,14 @@ impl EbtManager {
///
/// Listen for EBT event messages via the broker and update EBT session
/// state accordingly.
pub async fn event_loop(mut self, local_id: SsbId) -> Result<()> {
pub async fn event_loop(mut self, local_id: SsbId, ebt_config_path: PathBuf) -> Result<()> {
debug!("Started EBT event loop");

// Set the ID (@-prefixed public key) of the local node.
self.local_id = local_id;

// Initialise the local clock based on peers to be replicated.
self.init_local_clock().await?;
self.init_local_clock(&ebt_config_path).await?;

// Register the EBT event loop actor with the broker.
let ActorEndpoint {
Expand Down Expand Up @@ -681,6 +738,9 @@ impl EbtManager {
}
}

// Write all peer clocks to disk before exiting.
self.persist_peer_clocks(ebt_config_path)?;

Ok(())
}
}
11 changes: 7 additions & 4 deletions solar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,29 @@ impl ApplicationConfig {
/// Create the root data directory for solar, along with the feed and blob
/// directories. This is where all application data is stored, including
/// the public-private keypair, key-value database and blob store.
fn create_data_directories(path: Option<PathBuf>) -> Result<(PathBuf, PathBuf)> {
fn create_data_directories(path: Option<PathBuf>) -> Result<(PathBuf, PathBuf, PathBuf)> {
let base_path = path.unwrap_or(BaseDirectories::new()?.create_data_directory("solar")?);

// Define the directory name for the feed store.
let feeds_path = base_path.join("feeds");
// Define the directory name for the blob store.
let blobs_path = base_path.join("blobs");
// Define the directory name for the EBT vector clocks.
let ebt_path = base_path.join("ebt");

// Create the feed and blobs directories.
// Create the feed, blobs and ebt directories.
std::fs::create_dir_all(&feeds_path)?;
std::fs::create_dir_all(blobs_path)?;
std::fs::create_dir_all(&ebt_path)?;

Ok((base_path, feeds_path))
Ok((base_path, feeds_path, ebt_path))
}

/// Configure the application based on CLI options, environment variables
/// and defaults.
pub fn new(path: Option<PathBuf>) -> Result<Self> {
// Create the application data directories if they don't already exist.
let (base_path, feeds_path) = Self::create_data_directories(path)?;
let (base_path, feeds_path, _ebt_path) = Self::create_data_directories(path)?;

info!("Base directory is {:?}", base_path);

Expand Down
8 changes: 8 additions & 0 deletions solar/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl Node {
// Define the directory name for the blob store.
let blobs_path = config
.base_path
.as_ref()
.expect("Base path not supplied")
.join("blobs");

Expand Down Expand Up @@ -136,11 +137,18 @@ impl Node {
// intervals).
Broker::spawn(connection_scheduler::actor(peers_to_dial));

// Define the directory name for the ebt clock store.
let ebt_path = config
.base_path
.expect("Base path not supplied")
.join("ebt");

// Spawn the EBT replication manager actor.
let ebt_replication_manager = EbtManager::default();
Broker::spawn(EbtManager::event_loop(
ebt_replication_manager,
owned_identity.id,
ebt_path,
));

// Spawn the connection manager message loop.
Expand Down

0 comments on commit 79514e9

Please sign in to comment.