Skip to content

Commit

Permalink
Log moooore
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Dec 8, 2024
1 parent a9f191b commit a2ef565
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,33 +168,38 @@ pub fn run() -> Result<(

// Process the operations and forward application messages to app layer.
while let Some(message) = stream.next().await {
let operation = message.expect("stream message is ok");

// When we discover a new author we need to add them to our "document store".
{
let mut write_lock = documents_store.write();
write_lock
.authors
.entry(operation.header.public_key)
.and_modify(|documents| {
if !documents.contains(&document_id_clone) {
documents.push(document_id_clone.clone());
}
})
.or_insert(vec![document_id_clone.clone()]);
};

println!("received {:?}", operation);

// Forward the payload up to the app.
to_app
.send(
operation
.body
.expect("all operations have a body")
.to_bytes(),
)
.await?;
match message {
Ok(operation) => {
// When we discover a new author we need to add them to our "document store".
{
let mut write_lock = documents_store.write();
write_lock
.authors
.entry(operation.header.public_key)
.and_modify(|documents| {
if !documents.contains(&document_id_clone) {
documents.push(document_id_clone.clone());
}
})
.or_insert(vec![document_id_clone.clone()]);
};

println!("received {:?}", operation);

// Forward the payload up to the app.
to_app
.send(
operation
.body
.expect("all operations have a body")
.to_bytes(),
)
.await?;
}
Err(err) => {
eprintln!("could not ingest message: {err}");
}
}
}

println!("stream ended");
Expand Down

0 comments on commit a2ef565

Please sign in to comment.