diff --git a/src/network.rs b/src/network.rs index e855246..3b240d9 100644 --- a/src/network.rs +++ b/src/network.rs @@ -110,13 +110,16 @@ pub fn run() -> Result<( let mut network = NetworkBuilder::new(*network_id.as_bytes()) .sync(sync_config) - .private_key(private_key) + .private_key(private_key.clone()) .discovery(LocalDiscovery::new().unwrap()) .build() .await .unwrap(); - let (topic_tx, mut topic_rx, ready) = network.subscribe(document_id.clone()).await?; + let (topic_tx, mut topic_rx, ready) = network + .subscribe(document_id.clone()) + .await + .expect("can subscribe to channel"); // Task for handling operations arriving from the network. let operations_store_clone = operations_store.clone(); @@ -153,24 +156,27 @@ pub fn run() -> Result<( let operation = message.expect("stream message is ok"); // When we discover a new author we need to add them to our "document store". - let mut write = documents_store.write(); - write - .authors - .entry(operation.header.public_key) - .and_modify(|documents| { - if !documents.contains(&document_id_clone) { - documents.push(document_id_clone.clone()); - } - }) - .or_insert(vec![document_id.clone()]); - + { + let mut write_lock = documents_store.write(); + write_lock + .authors + .entry(operation.header.public_key) + .and_modify(|documents| { + if !documents.contains(&document_id_clone) { + documents.push(document_id_clone.clone()); + } + }) + .or_insert(vec![document_id_clone.clone()]); + }; // Forward the payload up to the app. - to_app.send( - operation - .body - .expect("all operations have a body") - .to_bytes(), - )?; + to_app + .send( + operation + .body + .expect("all operations have a body") + .to_bytes(), + ) + .await?; } Ok(()) @@ -178,7 +184,7 @@ pub fn run() -> Result<( // Task for handling events coming from the application layer. let result: JoinHandle> = tokio::task::spawn(async move { - while let Ok(bytes) = from_app.recv() { + while let Some(bytes) = from_app.recv().await { println!("New message from app"); // @TODO: set prune flag from the frontend.