Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send end-of-stream request to all active sessions before solar terminates #93

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions solar/src/actors/muxrpc/ebt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ where
}
// Handle a broker message.
RpcInput::Message(msg) => match msg {
BrokerMessage::Ebt(EbtEvent::TerminateSession(conn_id, session_role)) => {
if conn_id == &connection_id {
let req_no = match session_role {
SessionRole::Requester => self.active_request,
SessionRole::Responder => -(self.active_request),
};

return self.send_cancelstream(api, req_no).await;
}

Ok(false)
}
BrokerMessage::Ebt(EbtEvent::SendClock(conn_id, req_no, clock, session_role)) => {
// This is, regrettably, rather unintuitive.
//
Expand Down Expand Up @@ -324,8 +336,7 @@ where
Ok(false)
}

/// Remove the associated request from the map of active requests and close
/// the stream.
/// Receive close-stream request.
async fn recv_cancelstream(&mut self, api: &mut ApiCaller<W>, req_no: ReqNo) -> Result<bool> {
trace!(target: "ebt-handler", "Received cancel stream RPC response: {}", req_no);

Expand All @@ -334,6 +345,15 @@ where
Ok(true)
}

/// Send close-stream request.
async fn send_cancelstream(&mut self, api: &mut ApiCaller<W>, req_no: ReqNo) -> Result<bool> {
trace!(target: "ebt-handler", "Send cancel stream RPC response: {}", req_no);

api.rpc().send_stream_eof(-req_no).await?;

Ok(true)
}

/// Report a MUXRPC error and remove the associated request from the map of
/// active requests.
async fn recv_error_response(&mut self, req_no: ReqNo, err_msg: &str) -> Result<bool> {
Expand Down
8 changes: 8 additions & 0 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum EbtEvent {
ReceivedMessage(Message),
SessionConcluded(ConnectionId, SsbId),
SessionTimeout(ConnectionData, SsbId),
TerminateSession(ConnectionId, SessionRole),
Error(ConnectionData, SsbId, ErrorMsg),
}

Expand Down Expand Up @@ -686,6 +687,10 @@ impl EbtManager {
Ok(())
}

async fn handle_terminate_session(&mut self, connection_id: ConnectionId) {
trace!(target: "ebt-replication", "Terminating session for connection {}", connection_id);
}

async fn handle_error(
&mut self,
connection_data: ConnectionData,
Expand Down Expand Up @@ -801,6 +806,9 @@ impl EbtManager {
error!("Error while handling 'session timeout' event: {}", err)
}
}
EbtEvent::TerminateSession(connection_data, _session_role) => {
self.handle_terminate_session(connection_data).await;
}
EbtEvent::Error(connection_data, peer_ssb_id, error_msg) => {
if let Err(err) = self.handle_error(connection_data, peer_ssb_id, error_msg).await {
error!("Error while handling 'error' event: {}", err)
Expand Down
25 changes: 20 additions & 5 deletions solar/src/actors/replication/ebt/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
network::connection::ConnectionData,
replication::ebt::{EbtEvent, SessionRole},
},
broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER},
broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, Void, BROKER},
Error, Result,
};

Expand All @@ -27,7 +27,9 @@ pub async fn run(
// Register the EBT replication loop actor with the broker.
let ActorEndpoint {
ch_terminate,
ch_terminated,
ch_msg,
mut ch_broker,
..
} = BROKER
.lock()
Expand Down Expand Up @@ -69,9 +71,6 @@ pub async fn run(
let rpc_recv_stream = rpc_reader.into_stream().fuse();
pin_mut!(rpc_recv_stream);

// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();

trace!(target: "ebt-session", "Initiating EBT replication session with: {}", peer_ssb_id);

let mut session_initiated = false;
Expand All @@ -98,13 +97,26 @@ pub async fn run(
// ready, one will be selected in order of declaration.
let input = select_biased! {
_value = ch_terminate_fuse => {
break;
// Communicate stream termination to the session peer.
RpcInput::Message(
BrokerMessage::Ebt(
EbtEvent::TerminateSession(connection_id, session_role.to_owned())
)
)
},
packet = rpc_recv_stream.select_next_some() => {
let (req_no, packet) = packet;
RpcInput::Network(req_no, packet)
},
msg = ch_msg.next().fuse() => {
// Listen for a 'session concluded' event and terminate the
// replicator if the connection ID of the event matches the
// ID of this instance of the replicator.
if let Some(BrokerMessage::Ebt(EbtEvent::SessionConcluded(conn_id, _))) = msg {
if connection_id == conn_id {
break
}
}
// Listen for a 'session initiated' event.
if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_connection_id, ref req_no, ref ssb_id, ref session_role))) = msg {
if peer_ssb_id == *ssb_id && *session_role == SessionRole::Responder {
Expand Down Expand Up @@ -188,5 +200,8 @@ pub async fn run(
))
.await?;

// Send 'terminated' signal to broker.
let _ = ch_terminated.send(Void {});

Ok(())
}
Loading