Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send partial vector clock when local KV store is updated #85

Merged
merged 13 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 25 additions & 97 deletions solar/src/actors/muxrpc/ebt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
peer_ssb_id: String,
active_request: Option<ReqNo>,
) -> Result<bool> {
trace!(target: "ebt-handler", "Received MUXRPC input: {:?}", op);
trace!(target: "muxrpc-ebt-handler", "Received MUXRPC input: {:?}", op);

// An outbound EBT replicate request was made before the handler was
// called; add it to the map of active requests.
Expand Down Expand Up @@ -88,8 +88,7 @@ where
}
// Handle an incoming MUXRPC response.
RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => {
self.recv_rpc_response(api, ch_broker, *req_no, res, peer_ssb_id)
.await
self.recv_rpc_response(ch_broker, *req_no, res).await
}
// Handle an incoming MUXRPC 'cancel stream' response.
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => {
Expand Down Expand Up @@ -119,21 +118,6 @@ where
}
_ => Ok(false),
},
/*
RpcInput::Message(msg) => {
if let Some(kv_event) = msg.downcast_ref::<StoreKvEvent>() {
match kv_event {
// Notification from the key-value store indicating that
// a new message has just been appended to the feed
// identified by `id`.
StoreKvEvent::IdChanged(id) => {
return self.recv_storageevent_idchanged(api, id).await
}
}
}
Ok(false)
}
*/
_ => Ok(false),
}
}
Expand Down Expand Up @@ -203,56 +187,39 @@ where
Ok(false)
}

/// Process an incoming MUXRPC response. The response is expected to
/// contain a vector clock or an SSB message.
/// Process an incoming MUXRPC response.
/// The response is expected to contain an SSB message.
async fn recv_rpc_response(
&mut self,
_api: &mut ApiCaller<W>,
ch_broker: &mut ChBrokerSend,
req_no: ReqNo,
res: &[u8],
peer_ssb_id: String,
) -> Result<bool> {
trace!(target: "ebt-handler", "Received RPC response: {}", req_no);

// Only handle the response if the associated request number is known
// to us, either because we sent or received the initiating replicate
// request.
if self.active_requests.contains_key(&req_no) {
// The response may be a vector clock (aka. notes) or an SSB message.
//
// Since there is no explicit way to determine which was received,
// we first attempt deserialization of a vector clock and move on
// to attempting message deserialization if that fails.
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
//
// TODO: Is matching on clock here redundant?
// We are already matching on `OtherRequest` in the handler.
if let Ok(clock) = serde_json::from_slice(res) {
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)),
))
.await?;
} else {
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
//
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
}
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
}

Ok(false)
Expand All @@ -263,6 +230,7 @@ where
async fn recv_cancelstream(&mut self, api: &mut ApiCaller<W>, req_no: ReqNo) -> Result<bool> {
api.rpc().send_stream_eof(-req_no).await?;
self.active_requests.remove(&req_no);

Ok(true)
}

Expand All @@ -277,22 +245,6 @@ where
}

/*
/// Extract blob references from post-type messages.
fn extract_blob_refs(&mut self, msg: &Message) -> Vec<String> {
let mut refs = Vec::new();

let msg = serde_json::from_value(msg.content().clone());

if let Ok(dto::content::TypedMessage::Post { text, .. }) = msg {
for cap in BLOB_REGEX.captures_iter(&text) {
let key = cap.get(0).unwrap().as_str().to_owned();
refs.push(key);
}
}

refs
}

/// Process an incoming MUXRPC response. The response is expected to
/// contain an SSB message.
async fn recv_rpc_response(
Expand Down Expand Up @@ -365,28 +317,4 @@ where
}
}
*/

/*
/// Respond to a key-value store state change for the given peer.
/// This is triggered when a new message is appended to the local feed.
/// Remove the peer from the list of active streams, send the requested
/// messages from the local feed to the peer and then reinsert the public
/// key of the peer to the list of active streams.
async fn recv_storageevent_idchanged(
&mut self,
api: &mut ApiCaller<W>,
id: &str,
) -> Result<bool> {
// Attempt to remove the peer from the list of active streams.
if let Some(mut req) = self.reqs.remove(id) {
// Send local messages to the peer.
self.send_history(api, &mut req).await?;
// Reinsert the peer into the list of active streams.
self.reqs.insert(id.to_string(), req);
Ok(true)
} else {
Ok(false)
}
}
*/
}
12 changes: 6 additions & 6 deletions solar/src/actors/muxrpc/history_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ where
self.recv_error_response(api, *req_no, err).await
}
// Handle a broker message.
RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(id))) => {
RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) => {
// Notification from the key-value store indicating that
// a new message has just been appended to the feed
// identified by `id`.
return self.recv_storageevent_idchanged(api, id).await;
// identified by `ssb_id`.
return self.recv_storageevent_idchanged(api, ssb_id).await;
}
// Handle a timer event.
RpcInput::Timer => self.on_timer(api).await,
Expand Down Expand Up @@ -315,14 +315,14 @@ where
async fn recv_storageevent_idchanged(
&mut self,
api: &mut ApiCaller<W>,
id: &str,
ssb_id: &str,
) -> Result<bool> {
// Attempt to remove the peer from the list of active streams.
if let Some(mut req) = self.reqs.remove(id) {
if let Some(mut req) = self.reqs.remove(ssb_id) {
// Send local messages to the peer.
self.send_history(api, &mut req).await?;
// Reinsert the peer into the list of active streams.
self.reqs.insert(id.to_string(), req);
self.reqs.insert(ssb_id.to_string(), req);
Ok(true)
} else {
Ok(false)
Expand Down
Loading