Skip to content

Commit

Permalink
Merge pull request #88 from mycognosist/ebt_clock_match
Browse files Browse the repository at this point in the history
Match on vector clock response and use negative request numbers
  • Loading branch information
mycognosist authored Jan 29, 2024
2 parents 79514e9 + b20778f commit 7a0809f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 26 deletions.
62 changes: 40 additions & 22 deletions solar/src/actors/muxrpc/ebt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ where
}
// Handle an incoming MUXRPC response.
RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => {
self.recv_rpc_response(ch_broker, *req_no, res).await
self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id)
.await
}
// Handle an incoming MUXRPC 'cancel stream' response.
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => {
Expand All @@ -103,15 +104,17 @@ where
BrokerMessage::Ebt(EbtEvent::SendClock(req_no, clock)) => {
// Serialize the vector clock as a JSON string.
let json_clock = serde_json::to_string(&clock)?;
api.ebt_clock_res_send(*req_no, &json_clock).await?;
// The request number must be negative (response).
api.ebt_clock_res_send(-(*req_no), &json_clock).await?;

Ok(false)
}
BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, ssb_id, msg)) => {
// Ensure the message is sent to the correct peer.
if peer_ssb_id == *ssb_id {
let json_msg = msg.to_string();
api.ebt_feed_res_send(*req_no, &json_msg).await?;
// The request number must be negative (response).
api.ebt_feed_res_send(-(*req_no), &json_msg).await?;
}

Ok(false)
Expand Down Expand Up @@ -188,38 +191,53 @@ where
}

/// Process an incoming MUXRPC response.
/// The response is expected to contain an SSB message.
/// The response is expected to contain a vector clock or an SSB message.
async fn recv_rpc_response(
&mut self,
ch_broker: &mut ChBrokerSend,
req_no: ReqNo,
res: &[u8],
peer_ssb_id: String,
) -> Result<bool> {
trace!(target: "ebt-handler", "Received RPC response: {}", req_no);

// Only handle the response if the associated request number is known
// to us, either because we sent or received the initiating replicate
// request.
if self.active_requests.contains_key(&req_no) {
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
// The response may be a vector clock (aka. notes) or an SSB message.
//
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
// Since there is no explicit way to determine which was received,
// we first attempt deserialization of a vector clock and move on
// to attempting message deserialization if that fails.
if let Ok(clock) = serde_json::from_slice(res) {
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)),
))
.await?;
} else {
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
//
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
}
}

Ok(false)
Expand Down
6 changes: 2 additions & 4 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,9 @@ impl EbtManager {
async fn handle_wait_for_session_request(&self, connection_data: ConnectionData) {
trace!(target: "ebt", "Waiting for EBT session request");

let session_role = SessionRole::Responder;
task::spawn(replicator::run(
connection_data,
session_role,
SessionRole::Responder,
self.session_wait_timeout,
));
}
Expand All @@ -387,10 +386,9 @@ impl EbtManager {
connection_data.peer_public_key.unwrap()
);

let session_role = SessionRole::Requester;
task::spawn(replicator::run(
connection_data,
session_role,
SessionRole::Requester,
self.session_wait_timeout,
));
}
Expand Down

0 comments on commit 7a0809f

Please sign in to comment.