diff --git a/aardvark-node/src/lib.rs b/aardvark-node/src/lib.rs index 5ffec0e..444a2f8 100644 --- a/aardvark-node/src/lib.rs +++ b/aardvark-node/src/lib.rs @@ -1,2 +1,2 @@ pub mod network; -pub mod operation; \ No newline at end of file +pub mod operation; diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index 90d4e8a..f18aa73 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -6,8 +6,7 @@ use async_trait::async_trait; use iroh_gossip::proto::Config as GossipConfig; use p2panda_core::{Extension, Hash, PrivateKey, PruneFlag, PublicKey}; use p2panda_discovery::mdns::LocalDiscovery; -use p2panda_net::{FromNetwork, NetworkBuilder, SyncConfiguration}; -use p2panda_net::{ToNetwork, TopicId}; +use p2panda_net::{FromNetwork, NetworkBuilder, SyncConfiguration, ToNetwork, TopicId}; use p2panda_store::MemoryStore; use p2panda_stream::{DecodeExt, IngestExt}; use p2panda_sync::log_sync::LogSyncProtocol; @@ -79,6 +78,7 @@ impl TopicMap>> for TextDocumentStor } } +#[allow(clippy::type_complexity)] pub fn run() -> Result<( oneshot::Sender<()>, mpsc::Sender>, @@ -178,49 +178,48 @@ pub fn run() -> Result<( None } }) - .ingest(operations_store_clone, 128); - - // Process the operations and forward application messages to app layer. - while let Some(message) = stream.next().await { - match message { - Ok(operation) => { - let prune_flag: PruneFlag = - operation.header.extract().unwrap_or_default(); - println!( - "received operation from {}, seq_num={}, prune_flag={}", - operation.header.public_key, - operation.header.seq_num, - prune_flag.is_set(), - ); - - // When we discover a new author we need to add them to our "document store". - { - let mut write_lock = documents_store.write(); - write_lock - .authors - .entry(operation.header.public_key) - .and_modify(|documents| { - if !documents.contains(&document_id_clone) { - documents.push(document_id_clone.clone()); - } - }) - .or_insert(vec![document_id_clone.clone()]); - }; - - // Forward the payload up to the app. - to_app - .send( - operation - .body - .expect("all operations have a body") - .to_bytes(), - ) - .await?; - } + .ingest(operations_store_clone, 128) + .filter_map(|result| match result { + Ok(operation) => Some(operation), Err(err) => { - eprintln!("could not ingest message: {err}"); + eprintln!("ingest operation error: {err}"); + None } - } + }); + + // Process the operations and forward application messages to app layer. + while let Some(operation) = stream.next().await { + let prune_flag: PruneFlag = operation.header.extract().unwrap_or_default(); + println!( + "received operation from {}, seq_num={}, prune_flag={}", + operation.header.public_key, + operation.header.seq_num, + prune_flag.is_set(), + ); + + // When we discover a new author we need to add them to our "document store". + { + let mut write_lock = documents_store.write(); + write_lock + .authors + .entry(operation.header.public_key) + .and_modify(|documents| { + if !documents.contains(&document_id_clone) { + documents.push(document_id_clone.clone()); + } + }) + .or_insert(vec![document_id_clone.clone()]); + }; + + // Forward the payload up to the app. + to_app + .send( + operation + .body + .expect("all operations have a body") + .to_bytes(), + ) + .await?; } Ok(()) diff --git a/aardvark-node/src/operation.rs b/aardvark-node/src/operation.rs index 0ae3fae..7b596bd 100644 --- a/aardvark-node/src/operation.rs +++ b/aardvark-node/src/operation.rs @@ -10,22 +10,26 @@ use crate::network::TextDocument; #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] pub struct AardvarkExtensions { - #[serde(rename = "p", skip_serializing_if = "Option::is_none")] - pub prune_flag: Option, - - #[serde(rename = "d", skip_serializing_if = "Option::is_none")] - pub document_id: Option, + #[serde( + rename = "p", + skip_serializing_if = "PruneFlag::is_not_set", + default = "PruneFlag::default" + )] + pub prune_flag: PruneFlag, + + #[serde(rename = "d")] + pub document_id: TextDocument, } impl Extension for AardvarkExtensions { fn extract(&self) -> Option { - self.prune_flag.clone() + Some(self.prune_flag.clone()) } } impl Extension for AardvarkExtensions { fn extract(&self) -> Option { - self.document_id.clone() + Some(self.document_id.clone()) } } @@ -69,8 +73,8 @@ pub async fn create_operation( .as_secs(); let extensions = AardvarkExtensions { - prune_flag: Some(PruneFlag::new(prune_flag)), - document_id: Some(document_id.clone()), + prune_flag: PruneFlag::new(prune_flag), + document_id: document_id.clone(), }; let mut header = Header {