Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load and persist peer vector clocks #86

Merged
merged 4 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 69 additions & 9 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
use std::{
collections::{HashMap, HashSet},
fmt::Display,
fs::{self, File},
io::Read,
path::PathBuf,
};

use async_std::task;
Expand Down Expand Up @@ -118,16 +121,11 @@ impl Default for EbtManager {
}

impl EbtManager {
// Read peer clock state from file.
// fn load_peer_clocks()
// Write peer clock state to file.
// fn persist_peer_clocks()

/// Initialise the local clock based on peers to be replicated.
///
/// This defines the public keys of all feeds we wish to replicate,
/// along with the latest sequence number for each.
async fn init_local_clock(&mut self) -> Result<()> {
async fn init_local_clock(&mut self, ebt_config_path: &PathBuf) -> Result<()> {
debug!("Initialising local EBT clock");

let local_id = self.local_id.to_owned();
Expand All @@ -143,7 +141,8 @@ impl EbtManager {
}
}

// TODO: Load peer clocks from file and update `peer_clocks`.
// Load peer clocks from file and update `peer_clocks`.
self.load_peer_clocks(ebt_config_path)?;

Ok(())
}
Expand All @@ -166,6 +165,64 @@ impl EbtManager {
}
}

/// Load all peer clocks from disk (`ebt` directory).
fn load_peer_clocks(&mut self, ebt_config_path: &PathBuf) -> Result<()> {
// Iterate over all stored vector clocks in the directory.
if let Ok(entries) = fs::read_dir(ebt_config_path) {
for clock_entry in entries.flatten() {
// Get the SSB ID of the vector clock from the filename.
let clock_filename = clock_entry.file_name();
let ssb_id = clock_filename.into_string().map_err(|os_string| {
Error::Other(format!(
"Invalid unicode in EBT clock filename: {:?}",
os_string
))
})?;

// Format the SSB ID as: <PUBLIC_KEY>=.ed25519, replacing
// any `-` characters with `/`.
//
// TODO: Rewrite this to avoid extra allocations.
let mut ssb_id = ssb_id.replace('@', "").replace('-', "/");
if let Some(dot_index) = ssb_id.find('.') {
ssb_id.insert(dot_index, '=')
}

// Read and parse the vector clock from the file.
let mut clock_file = File::open(&clock_entry.path())?;
let mut clock_file_contents = String::new();
clock_file.read_to_string(&mut clock_file_contents)?;
let clock: VectorClock = serde_json::from_str(&clock_file_contents)?;

// Set the vector clock in memory.
self.set_clock(&ssb_id, clock);

debug!("Loaded vector clock from file for: {}", ssb_id)
}
}

Ok(())
}

/// Persist all peer clocks to disk (`ebt` directory).
fn persist_peer_clocks(&self, ebt_config_path: PathBuf) -> Result<()> {
for (ssb_id, clock) in self.peer_clocks.iter() {
// Format the SSB ID as: @<PUBLIC_KEY>.ed25519, replacing any `/`
// characters with `-`.
let clock_author_id =
format!("@{}", ssb_id.to_string().replace('/', "-").replace('=', ""));

let clock_filepath = ebt_config_path.join(clock_author_id);
let json_clock = serde_json::to_string(clock)?;

fs::write(clock_filepath, json_clock)?;

debug!("Wrote vector clock to file for: {}", ssb_id);
}

Ok(())
}

/// Retrieve the stored vector clock for the first peer, check for the
/// second peer in the vector clock and return the value of the receive
/// flag.
Expand Down Expand Up @@ -594,14 +651,14 @@ impl EbtManager {
///
/// Listen for EBT event messages via the broker and update EBT session
/// state accordingly.
pub async fn event_loop(mut self, local_id: SsbId) -> Result<()> {
pub async fn event_loop(mut self, local_id: SsbId, ebt_config_path: PathBuf) -> Result<()> {
debug!("Started EBT event loop");

// Set the ID (@-prefixed public key) of the local node.
self.local_id = local_id;

// Initialise the local clock based on peers to be replicated.
self.init_local_clock().await?;
self.init_local_clock(&ebt_config_path).await?;

// Register the EBT event loop actor with the broker.
let ActorEndpoint {
Expand Down Expand Up @@ -681,6 +738,9 @@ impl EbtManager {
}
}

// Write all peer clocks to disk before exiting.
self.persist_peer_clocks(ebt_config_path)?;

Ok(())
}
}
11 changes: 7 additions & 4 deletions solar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,29 @@ impl ApplicationConfig {
/// Create the root data directory for solar, along with the feed and blob
/// directories. This is where all application data is stored, including
/// the public-private keypair, key-value database and blob store.
fn create_data_directories(path: Option<PathBuf>) -> Result<(PathBuf, PathBuf)> {
fn create_data_directories(path: Option<PathBuf>) -> Result<(PathBuf, PathBuf, PathBuf)> {
let base_path = path.unwrap_or(BaseDirectories::new()?.create_data_directory("solar")?);

// Define the directory name for the feed store.
let feeds_path = base_path.join("feeds");
// Define the directory name for the blob store.
let blobs_path = base_path.join("blobs");
// Define the directory name for the EBT vector clocks.
let ebt_path = base_path.join("ebt");

// Create the feed and blobs directories.
// Create the feed, blobs and ebt directories.
std::fs::create_dir_all(&feeds_path)?;
std::fs::create_dir_all(blobs_path)?;
std::fs::create_dir_all(&ebt_path)?;

Ok((base_path, feeds_path))
Ok((base_path, feeds_path, ebt_path))
}

/// Configure the application based on CLI options, environment variables
/// and defaults.
pub fn new(path: Option<PathBuf>) -> Result<Self> {
// Create the application data directories if they don't already exist.
let (base_path, feeds_path) = Self::create_data_directories(path)?;
let (base_path, feeds_path, _ebt_path) = Self::create_data_directories(path)?;

info!("Base directory is {:?}", base_path);

Expand Down
8 changes: 8 additions & 0 deletions solar/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl Node {
// Define the directory name for the blob store.
let blobs_path = config
.base_path
.as_ref()
.expect("Base path not supplied")
.join("blobs");

Expand Down Expand Up @@ -136,11 +137,18 @@ impl Node {
// intervals).
Broker::spawn(connection_scheduler::actor(peers_to_dial));

// Define the directory name for the ebt clock store.
let ebt_path = config
.base_path
.expect("Base path not supplied")
.join("ebt");

// Spawn the EBT replication manager actor.
let ebt_replication_manager = EbtManager::default();
Broker::spawn(EbtManager::event_loop(
ebt_replication_manager,
owned_identity.id,
ebt_path,
));

// Spawn the connection manager message loop.
Expand Down