Skip to content

Commit

Permalink
use connection id for better session tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
mycognosist committed Feb 2, 2024
1 parent fd969f3 commit 9f48d07
Showing 1 changed file with 113 additions and 43 deletions.
156 changes: 113 additions & 43 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use serde_json::Value;
use crate::{
actors::{
muxrpc::{ReqNo, RpcBlobsGetEvent},
network::{connection::ConnectionData, connection_manager::ConnectionEvent},
network::{
connection::{ConnectionData, ConnectionId},
connection_manager::ConnectionEvent,
},
replication::{
blobs,
ebt::{clock, replicator, EncodedClockValue, VectorClock},
Expand All @@ -46,14 +49,14 @@ use crate::{
pub enum EbtEvent {
WaitForSessionRequest(ConnectionData),
RequestSession(ConnectionData),
SessionInitiated(ReqNo, SsbId, SessionRole),
SendClock(ReqNo, VectorClock),
SendMessage(ReqNo, SsbId, Value),
ReceivedClock(ReqNo, SsbId, VectorClock),
SessionInitiated(ConnectionId, ReqNo, SsbId, SessionRole),
SendClock(ConnectionId, ReqNo, VectorClock),
SendMessage(ConnectionId, ReqNo, SsbId, Value),
ReceivedClock(ConnectionId, ReqNo, SsbId, VectorClock),
ReceivedMessage(Message),
SessionConcluded(SsbId),
SessionTimeout(ConnectionData),
Error(ConnectionData, ReqNo, SsbId, String),
SessionConcluded(ConnectionId, SsbId),
SessionTimeout(ConnectionData, SsbId),
Error(ConnectionData, Option<ReqNo>, SsbId, String),
}

/// Role of a peer in an EBT session.
Expand All @@ -75,7 +78,7 @@ impl Display for SessionRole {
#[derive(Debug)]
pub struct EbtManager {
/// Active EBT peer sessions.
active_sessions: HashMap<SsbId, (ReqNo, SessionRole)>,
active_sessions: HashSet<(ConnectionId, SsbId)>,
/// Duration to wait before switching feed request to a different peer.
_feed_wait_timeout: u64,
/// The state of the replication loop.
Expand All @@ -97,6 +100,9 @@ pub struct EbtManager {
/// request number.
//
// TODO: Do we want to remove each entry when the session concludes?
//
// TODO: Rather use connection ID as the key (req no is not unique
// across sessions).
sent_clocks: HashMap<ReqNo, VectorClock>,
/// The sequence number of the latest message sent to each peer
/// for each requested feed.
Expand All @@ -106,7 +112,7 @@ pub struct EbtManager {
impl Default for EbtManager {
fn default() -> Self {
EbtManager {
active_sessions: HashMap::new(),
active_sessions: HashSet::new(),
_feed_wait_timeout: 3,
_is_replication_loop_active: false,
local_clock: HashMap::new(),
Expand Down Expand Up @@ -149,7 +155,7 @@ impl EbtManager {

/// Retrieve either the local vector clock or the stored vector clock
/// for the peer represented by the given SSB ID.
fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option<VectorClock> {
fn _get_clock(&self, ssb_id: Option<&SsbId>) -> Option<VectorClock> {
match ssb_id {
Some(id) => self.peer_clocks.get(id).cloned(),
None => Some(self.local_clock.to_owned()),
Expand Down Expand Up @@ -228,7 +234,7 @@ impl EbtManager {
/// flag.
fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result<bool> {
// Retrieve the vector clock for the first peer.
if let Some(clock) = self.get_clock(Some(&peer_ssb_id)) {
if let Some(clock) = self._get_clock(Some(&peer_ssb_id)) {
// Check if the second peer is represented in the vector clock.
if let Some(encoded_seq_no) = clock.get(&ssb_id) {
// Check if the receive flag is true.
Expand Down Expand Up @@ -273,16 +279,21 @@ impl EbtManager {
}

/// Register a new EBT session for the given peer.
fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) {
//fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) {
fn register_session(&mut self, connection_id: ConnectionId, peer_ssb_id: &SsbId) {
self.active_sessions
.insert(peer_ssb_id.to_owned(), (req_no, session_role));
.insert((connection_id, peer_ssb_id.to_owned()));

trace!(target: "ebt-session", "Registered new EBT session {} for {}", req_no, peer_ssb_id);
trace!(target: "ebt-session", "Registered new EBT session for connection {} with {}", connection_id, peer_ssb_id);
}

/// Remove the given peer from the list of active session.
fn remove_session(&mut self, peer_ssb_id: &SsbId) {
let _ = self.active_sessions.remove(peer_ssb_id);
fn remove_session(&mut self, connection_id: ConnectionId, peer_ssb_id: &SsbId) {
// TODO: Clean-up the string story so we're not sprinkling additional
// allocations everywhere.
let _ = self
.active_sessions
.remove(&(connection_id, peer_ssb_id.to_owned()));
}

/// Revoke a replication request for the feed represented by the given SSB
Expand Down Expand Up @@ -379,7 +390,10 @@ impl EbtManager {

// Only proceed with session initiation if there
// is no currently active session with the given peer.
if !self.active_sessions.contains_key(&peer_ssb_id) {
if !self
.active_sessions
.contains(&(connection_data.id, peer_ssb_id))
{
trace!(
target: "ebt",
"Requesting an EBT session with {:?}",
Expand All @@ -397,24 +411,27 @@ impl EbtManager {

async fn handle_session_initiated(
&mut self,
connection_id: ConnectionId,
req_no: ReqNo,
peer_ssb_id: SsbId,
session_role: SessionRole,
) -> Result<()> {
trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role);

self.register_session(&peer_ssb_id, req_no, session_role.to_owned());
self.register_session(connection_id, &peer_ssb_id);
let local_clock = self.local_clock.to_owned();

match session_role {
SessionRole::Responder => {
// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();

trace!(target: "ebt-replication", "Sending clock as responder for request {}", req_no);

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)),
BrokerMessage::Ebt(EbtEvent::SendClock(connection_id, req_no, local_clock)),
))
.await?;
}
Expand All @@ -427,12 +444,19 @@ impl EbtManager {
Ok(())
}

fn handle_send_clock(&mut self, req_no: ReqNo, clock: VectorClock) -> Option<VectorClock> {
fn handle_send_clock(
&mut self,
_connection_id: ConnectionId,
req_no: ReqNo,
clock: VectorClock,
) -> Option<VectorClock> {
// TODO: Include connection ID in sent clock tracking.
self.sent_clocks.insert(req_no, clock)
}

async fn handle_received_clock(
&mut self,
connection_id: ConnectionId,
req_no: ReqNo,
peer_ssb_id: SsbId,
clock: VectorClock,
Expand All @@ -450,11 +474,17 @@ impl EbtManager {
// assumed that the clock was sent in response to a locally-sent
// EBT replicate request. Ie. the session was requested by the
// local peer.
if !self.active_sessions.contains_key(&peer_ssb_id) {
//
// TODO: Can we avoid the allocation on `peer_ssb_id`?
if !self
.active_sessions
.contains(&(connection_id, peer_ssb_id.to_owned()))
{
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::SessionInitiated(
connection_id,
req_no,
peer_ssb_id.to_owned(),
SessionRole::Requester,
Expand All @@ -472,7 +502,7 @@ impl EbtManager {
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)),
BrokerMessage::Ebt(EbtEvent::SendClock(connection_id, req_no, local_clock)),
))
.await?;
}
Expand All @@ -484,7 +514,12 @@ impl EbtManager {
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, peer_ssb_id.to_owned(), msg)),
BrokerMessage::Ebt(EbtEvent::SendMessage(
connection_id,
req_no,
peer_ssb_id.to_owned(),
msg,
)),
))
.await?;
}
Expand Down Expand Up @@ -564,6 +599,10 @@ impl EbtManager {
Ok(())
}

/*
TODO: Reintroduce this when we figure out the connection ID / request ID
association.
/// Look up the latest sequence number for the updated feed, encode it as
/// the single entry of a vector clock and send that to any active session
/// peers.
Expand Down Expand Up @@ -599,14 +638,26 @@ impl EbtManager {
Ok(())
}
*/

async fn handle_session_concluded(&mut self, peer_ssb_id: SsbId) {
trace!(target: "ebt-replication", "Session concluded with: {}", peer_ssb_id);
self.remove_session(&peer_ssb_id);
async fn handle_session_concluded(&mut self, connection_id: ConnectionId, peer_ssb_id: SsbId) {
trace!(target: "ebt-replication", "Session concluded for connection {} with {}", connection_id, peer_ssb_id);
self.remove_session(connection_id, &peer_ssb_id);
}

async fn handle_session_timeout(&mut self, connection_data: ConnectionData) -> Result<()> {
trace!(target: "ebt-replication", "Session timeout while waiting for request");
async fn handle_session_timeout(
&mut self,
connection_data: ConnectionData,
peer_ssb_id: SsbId,
) -> Result<()> {
trace!(target: "ebt-replication", "Session timeout while waiting for request from {} on connection {}", peer_ssb_id, connection_data.id);

// Session should not have been initiated in the first place, meaning
// that this removal action should be unnecessary. Keeping it here
// for now out of caution.
//
// TODO: Remove this line when it's clear that it's not needed.
self.remove_session(connection_data.id, &peer_ssb_id);

// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();
Expand All @@ -625,22 +676,36 @@ impl EbtManager {
async fn handle_error(
&mut self,
connection_data: ConnectionData,
req_no: ReqNo,
req_no: Option<ReqNo>,
peer_ssb_id: SsbId,
err_msg: String,
) -> Result<()> {
trace!(target: "ebt-replication", "Session error with {} for request number {}: {}", peer_ssb_id, req_no, err_msg);
trace!(target: "ebt-replication", "Session error with {} for request number {:?}: {}", peer_ssb_id, req_no, err_msg);

self.remove_session(connection_data.id, &peer_ssb_id);

// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Connection(ConnectionEvent::Disconnecting(connection_data)),
))
.await?;

// TODO: Match on error variant and fallback to class replication if
// the error was `Error::EbtReplicate`.

/*
// Fallback to classic replication.
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)),
))
.await?;
*/

Ok(())
}
Expand Down Expand Up @@ -684,17 +749,17 @@ impl EbtManager {
EbtEvent::RequestSession(connection_data) => {
self.handle_request_session(connection_data).await;
}
EbtEvent::SessionInitiated(req_no, peer_ssb_id, session_role) => {
if let Err(err) = self.handle_session_initiated(req_no, peer_ssb_id, session_role).await {
EbtEvent::SessionInitiated(connection_id, req_no, peer_ssb_id, session_role) => {
if let Err(err) = self.handle_session_initiated(connection_id, req_no, peer_ssb_id, session_role).await {
error!("Error while handling 'session initiated' event: {}", err)
}
}
EbtEvent::SendClock(req_no, clock) => {
EbtEvent::SendClock(connection_id, req_no, clock) => {
trace!(target: "ebt-replication", "Sending vector clock: {:?}", clock);
let _ = self.handle_send_clock(req_no, clock);
let _ = self.handle_send_clock(connection_id, req_no, clock);
}
EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock) => {
if let Err(err) = self.handle_received_clock(req_no, peer_ssb_id, clock).await {
EbtEvent::ReceivedClock(connection_id, req_no, peer_ssb_id, clock) => {
if let Err(err) = self.handle_received_clock(connection_id, req_no, peer_ssb_id, clock).await {
error!("Error while handling 'received clock' event: {}", err)
}
}
Expand All @@ -703,17 +768,17 @@ impl EbtManager {
error!("Error while handling 'received message' event: {}", err)
}
}
EbtEvent::SendMessage(_req_no, peer_ssb_id, msg) => {
EbtEvent::SendMessage(_connection_id, _req_no, peer_ssb_id, msg) => {
trace!(target: "ebt-replication", "Sending message: {:?}...", msg);
if let Err(err) = self.handle_send_message(peer_ssb_id, msg).await {
error!("Error while handling 'send message' event: {}", err)
}
}
EbtEvent::SessionConcluded(connection_data) => {
self.handle_session_concluded(connection_data).await;
EbtEvent::SessionConcluded(connection_id, peer_ssb_id) => {
self.handle_session_concluded(connection_id, peer_ssb_id).await;
}
EbtEvent::SessionTimeout(connection_data) => {
if let Err(err) = self.handle_session_timeout(connection_data).await {
EbtEvent::SessionTimeout(connection_data, peer_ssb_id) => {
if let Err(err) = self.handle_session_timeout(connection_data, peer_ssb_id).await {
error!("Error while handling 'session timeout' event: {}", err)
}
}
Expand All @@ -723,14 +788,19 @@ impl EbtManager {
}
}
}
} else if let Some(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) = msg {
} else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg {
debug!("Received KV store event from broker");

/*
TODO: Reintroduce this later, once Manyverse restart
issue is solved.
// Respond to a key-value store state change for the given peer.
// This is triggered when a new message is appended to the local feed.
if let Err(err) = self.handle_local_store_updated(ssb_id).await {
error!("Error while handling 'local store updated' event: {}", err)
}
*/
}
}
}
Expand Down

0 comments on commit 9f48d07

Please sign in to comment.