From a0b4d6f98ce22a802fefc0475aa76cbc9b220644 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 27 Jun 2024 16:28:49 +0300 Subject: [PATCH] Improve farming cluster logging --- .../src/cluster/nats_client.rs | 61 ++++++++++++++----- crates/subspace-farmer/src/cluster/plotter.rs | 6 ++ 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index ea65769362..a99f3cb996 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -182,6 +182,7 @@ pub struct StreamResponseSubscriber { #[deref] #[deref_mut] subscriber: Subscriber, + response_subject: String, buffered_responses: Option>, next_index: u32, acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>, @@ -217,6 +218,7 @@ where actual_index = %responses.index(), expected_index = %*projected.next_index, message_type = %type_name::(), + response_subject = %projected.response_subject, "Received unexpected response stream index, aborting stream" ); @@ -235,6 +237,7 @@ where %error, %index, message_type = %type_name::(), + response_subject = %projected.response_subject, %ack_subject, "Failed to send acknowledgement for stream response" ); @@ -252,6 +255,7 @@ where warn!( %error, response_type = %type_name::(), + response_subject = %projected.response_subject, message = %hex::encode(message.payload), "Failed to decode stream response" ); @@ -267,19 +271,37 @@ where } impl StreamResponseSubscriber { - fn new(subscriber: Subscriber, nats_client: NatsClient) -> Self { + fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self { let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::unbounded::<(String, u32)>(); let background_task = AsyncJoinOnDrop::new( - tokio::spawn(async move { - while let Some((subject, index)) = acknowledgement_receiver.next().await { - if let Err(error) = nats_client - .publish(subject.clone(), index.to_le_bytes().to_vec().into()) - .await - { - warn!(%error, %subject, %index, "Failed to send acknowledgement"); - return; + tokio::spawn({ + let response_subject = response_subject.clone(); + + async move { + while let Some((subject, index)) = acknowledgement_receiver.next().await { + warn!( + %subject, + %index, + %response_subject, + %index, + "Sending stream response acknowledgement" + ); + if let Err(error) = nats_client + .publish(subject.clone(), index.to_le_bytes().to_vec().into()) + .await + { + warn!( + %error, + %subject, + %index, + %response_subject, + %index, + "Failed to send stream response acknowledgement" + ); + return; + } } } }), @@ -287,6 +309,7 @@ impl StreamResponseSubscriber { ); Self { + response_subject, subscriber, buffered_responses: None, next_index: 0, @@ -631,17 +654,25 @@ impl NatsClient { .client .subscribe(stream_request.response_subject.clone()) .await?; - debug!(request_type = %type_name::(), ?subscriber, "Stream request subscription"); + + let stream_request_subject = subject_with_instance(Request::SUBJECT, instance); + debug!( + request_type = %type_name::(), + %stream_request_subject, + ?subscriber, + "Stream request subscription" + ); self.inner .client - .publish( - subject_with_instance(Request::SUBJECT, instance), - stream_request.encode().into(), - ) + .publish(stream_request_subject, stream_request.encode().into()) .await?; - Ok(StreamResponseSubscriber::new(subscriber, self.clone())) + Ok(StreamResponseSubscriber::new( + subscriber, + stream_request.response_subject, + self.clone(), + )) } /// Helper method to send responses to requests initiated with [`Self::stream_request`] diff --git a/crates/subspace-farmer/src/cluster/plotter.rs b/crates/subspace-farmer/src/cluster/plotter.rs index fe45baafa5..10744ef963 100644 --- a/crates/subspace-farmer/src/cluster/plotter.rs +++ b/crates/subspace-farmer/src/cluster/plotter.rs @@ -533,6 +533,12 @@ where PS: Sink + Unpin + Send + 'static, PS::Error: Error, { + if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) { + trace!(?response, "Processing plotting response notification"); + } else { + trace!("Processing plotting response notification (sector chunk)"); + } + match response { ClusterSectorPlottingProgress::Occupied => { debug!(%free_instance, "Instance was occupied, retrying #2");