Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request blobs from EBT msgs #84

Merged
merged 5 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 8 additions & 31 deletions solar/src/actors/muxrpc/history_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use kuska_ssb::{
rpc,
};
use log::{debug, info, warn};
use once_cell::sync::Lazy;
use regex::Regex;

use crate::{
actors::muxrpc::{
blobs_get::RpcBlobsGetEvent,
handler::{RpcHandler, RpcInput},
actors::{
muxrpc::{
blobs_get::RpcBlobsGetEvent,
handler::{RpcHandler, RpcInput},
},
replication::blobs,
},
broker::{BrokerEvent, BrokerMessage, ChBrokerSend, Destination},
config::{PEERS_TO_REPLICATE, RESYNC_CONFIG, SECRET_CONFIG},
Expand All @@ -26,10 +27,6 @@ use crate::{
Result,
};

/// Regex pattern used to match blob references.
pub static BLOB_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new(r"(&[0-9A-Za-z/+=]*.sha256)").unwrap());

#[derive(Debug)]
struct HistoryStreamRequest {
req_no: i32,
Expand Down Expand Up @@ -174,22 +171,6 @@ where
Ok(false)
}

/// Extract blob references from post-type messages.
fn extract_blob_refs(&mut self, msg: &Message) -> Vec<String> {
let mut refs = Vec::new();

let msg = serde_json::from_value(msg.content().clone());

if let Ok(dto::content::TypedMessage::Post { text, .. }) = msg {
for cap in BLOB_REGEX.captures_iter(&text) {
let key = cap.get(0).unwrap().as_str().to_owned();
refs.push(key);
}
}

refs
}

/// Process an incoming MUXRPC response. The response is expected to
/// contain an SSB message.
async fn recv_rpc_response(
Expand Down Expand Up @@ -236,13 +217,9 @@ where
// Extract blob references from the received message and
// request those blobs if they are not already in the local
// blobstore.
for key in self.extract_blob_refs(&msg) {
for key in blobs::extract_blob_refs(&msg) {
if !BLOB_STORE.read().await.exists(&key) {
let event = RpcBlobsGetEvent(dto::BlobsGetIn {
key,
size: None,
max: None,
});
let event = RpcBlobsGetEvent(dto::BlobsGetIn::new(key));
let broker_msg = BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::RpcBlobsGet(event),
Expand Down
23 changes: 23 additions & 0 deletions solar/src/actors/replication/blobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use kuska_ssb::{api::dto::content::TypedMessage, feed::Message};
use once_cell::sync::Lazy;
use regex::Regex;

/// Regex pattern used to match blob references.
pub static BLOB_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new(r"(&[0-9A-Za-z/+=]*.sha256)").unwrap());

/// Extract blob references from post-type messages.
pub fn extract_blob_refs(msg: &Message) -> Vec<String> {
let mut refs = Vec::new();

let msg = serde_json::from_value(msg.content().clone());

if let Ok(TypedMessage::Post { text, .. }) = msg {
for cap in BLOB_REGEX.captures_iter(&text) {
let key = cap.get(0).unwrap().as_str().to_owned();
refs.push(key);
}
}

refs
}
30 changes: 26 additions & 4 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@ use std::{

use async_std::task;
use futures::{select_biased, FutureExt, SinkExt, StreamExt};
use kuska_ssb::{api::dto::content::SsbId, crypto::ToSsbId, feed::Message};
use kuska_ssb::{
api::dto::{content::SsbId, BlobsGetIn},
crypto::ToSsbId,
feed::Message,
};
use log::{debug, error, trace, warn};
use serde_json::Value;

use crate::{
actors::{
muxrpc::ReqNo,
muxrpc::{ReqNo, RpcBlobsGetEvent},
network::{connection::ConnectionData, connection_manager::ConnectionEvent},
replication::ebt::{clock, replicator, EncodedClockValue, VectorClock},
replication::{
blobs,
ebt::{clock, replicator, EncodedClockValue, VectorClock},
},
},
broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER},
config::PEERS_TO_REPLICATE,
node::KV_STORE,
node::{BLOB_STORE, KV_STORE},
Result,
};

Expand Down Expand Up @@ -354,6 +361,21 @@ impl EbtManager {
msg.sequence(),
msg.author()
);

// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();

// Extract blob references from the received message and
// request those blobs if they are not already in the local
// blobstore.
for key in blobs::extract_blob_refs(&msg) {
if !BLOB_STORE.read().await.exists(&key) {
let event = RpcBlobsGetEvent(BlobsGetIn::new(key));
let broker_msg =
BrokerEvent::new(Destination::Broadcast, BrokerMessage::RpcBlobsGet(event));
ch_broker.send(broker_msg).await?;
}
}
} else {
warn!(
"Received out-of-order message from {}; received: {}, expected: {} + 1",
Expand Down
1 change: 1 addition & 0 deletions solar/src/actors/replication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod blobs;
pub mod classic;
pub mod config;
pub mod ebt;