Skip to content

Commit

Permalink
Merge pull request #14 from p2panda/adz/minor-improvements
Browse files Browse the repository at this point in the history
Minor improvements to serde type
  • Loading branch information
adzialocha authored Dec 17, 2024
2 parents ead0c45 + dd150ee commit 61d0f36
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 53 deletions.
2 changes: 1 addition & 1 deletion aardvark-node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod network;
pub mod operation;
pub mod operation;
85 changes: 42 additions & 43 deletions aardvark-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use async_trait::async_trait;
use iroh_gossip::proto::Config as GossipConfig;
use p2panda_core::{Extension, Hash, PrivateKey, PruneFlag, PublicKey};
use p2panda_discovery::mdns::LocalDiscovery;
use p2panda_net::{FromNetwork, NetworkBuilder, SyncConfiguration};
use p2panda_net::{ToNetwork, TopicId};
use p2panda_net::{FromNetwork, NetworkBuilder, SyncConfiguration, ToNetwork, TopicId};
use p2panda_store::MemoryStore;
use p2panda_stream::{DecodeExt, IngestExt};
use p2panda_sync::log_sync::LogSyncProtocol;
Expand Down Expand Up @@ -79,6 +78,7 @@ impl TopicMap<TextDocument, HashMap<PublicKey, Vec<LogId>>> for TextDocumentStor
}
}

#[allow(clippy::type_complexity)]
pub fn run() -> Result<(
oneshot::Sender<()>,
mpsc::Sender<Vec<u8>>,
Expand Down Expand Up @@ -178,49 +178,48 @@ pub fn run() -> Result<(
None
}
})
.ingest(operations_store_clone, 128);

// Process the operations and forward application messages to app layer.
while let Some(message) = stream.next().await {
match message {
Ok(operation) => {
let prune_flag: PruneFlag =
operation.header.extract().unwrap_or_default();
println!(
"received operation from {}, seq_num={}, prune_flag={}",
operation.header.public_key,
operation.header.seq_num,
prune_flag.is_set(),
);

// When we discover a new author we need to add them to our "document store".
{
let mut write_lock = documents_store.write();
write_lock
.authors
.entry(operation.header.public_key)
.and_modify(|documents| {
if !documents.contains(&document_id_clone) {
documents.push(document_id_clone.clone());
}
})
.or_insert(vec![document_id_clone.clone()]);
};

// Forward the payload up to the app.
to_app
.send(
operation
.body
.expect("all operations have a body")
.to_bytes(),
)
.await?;
}
.ingest(operations_store_clone, 128)
.filter_map(|result| match result {
Ok(operation) => Some(operation),
Err(err) => {
eprintln!("could not ingest message: {err}");
eprintln!("ingest operation error: {err}");
None
}
}
});

// Process the operations and forward application messages to app layer.
while let Some(operation) = stream.next().await {
let prune_flag: PruneFlag = operation.header.extract().unwrap_or_default();
println!(
"received operation from {}, seq_num={}, prune_flag={}",
operation.header.public_key,
operation.header.seq_num,
prune_flag.is_set(),
);

// When we discover a new author we need to add them to our "document store".
{
let mut write_lock = documents_store.write();
write_lock
.authors
.entry(operation.header.public_key)
.and_modify(|documents| {
if !documents.contains(&document_id_clone) {
documents.push(document_id_clone.clone());
}
})
.or_insert(vec![document_id_clone.clone()]);
};

// Forward the payload up to the app.
to_app
.send(
operation
.body
.expect("all operations have a body")
.to_bytes(),
)
.await?;
}

Ok(())
Expand Down
22 changes: 13 additions & 9 deletions aardvark-node/src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,26 @@ use crate::network::TextDocument;

#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct AardvarkExtensions {
#[serde(rename = "p", skip_serializing_if = "Option::is_none")]
pub prune_flag: Option<PruneFlag>,

#[serde(rename = "d", skip_serializing_if = "Option::is_none")]
pub document_id: Option<TextDocument>,
#[serde(
rename = "p",
skip_serializing_if = "PruneFlag::is_not_set",
default = "PruneFlag::default"
)]
pub prune_flag: PruneFlag,

#[serde(rename = "d")]
pub document_id: TextDocument,
}

impl Extension<PruneFlag> for AardvarkExtensions {
fn extract(&self) -> Option<PruneFlag> {
self.prune_flag.clone()
Some(self.prune_flag.clone())
}
}

impl Extension<TextDocument> for AardvarkExtensions {
fn extract(&self) -> Option<TextDocument> {
self.document_id.clone()
Some(self.document_id.clone())
}
}

Expand Down Expand Up @@ -69,8 +73,8 @@ pub async fn create_operation(
.as_secs();

let extensions = AardvarkExtensions {
prune_flag: Some(PruneFlag::new(prune_flag)),
document_id: Some(document_id.clone()),
prune_flag: PruneFlag::new(prune_flag),
document_id: document_id.clone(),
};

let mut header = Header {
Expand Down

0 comments on commit 61d0f36

Please sign in to comment.