Skip to content

Commit

Permalink
feat: augment NATS tracing & add INFO span support for subscriptions
Browse files Browse the repository at this point in the history
This change is doing...a lot, but is primarily focused on enabling a
tracing span that can sit over an invoming NATS message that ultimately
send one or more reply messages at the end (in this way, a very RPC-like
interaction).

The low level span-creating machinery and OpenTelemetry header
injection/extraction is contained in the `lib/telemetry-nats-rs` Rust
crate. This machinery is wired in to our somewhat generic NATS
subscription streaming crate `lib/nats-subscriber` as Pinga, Veritech,
and parts of SDF already use this as their interface to a NATS
subscription.

As part of creating this "request span", a fair amount of span metadata
has been updated in the `lib/si-data-nats` crate to help with more
meaningful/contextual information.

At present in `lib/nats-subscriber`, the `Subscriber` stream yields a
generic `Request` type (as before) but now contains this "request span"
and as long as this field and/or the request type is kept in some scope
(that is, it isn't dropped), this will keep the "request span" open.
Some effort in this change (and more in future work) is being made to
make all associated work be captured in child spans or "follows from"
spans if it is related work but not bound to the lifetime of the
request/reponse lifetime.

At the moment, Pinga is most mature here, creating a `pinga-jobs
receive` span for each incoming job request. While not 100% of the
related work is fully parented/associated, almost all of it is ;)

Signed-off-by: Fletcher Nichol <[email protected]>
  • Loading branch information
fnichol committed Jan 18, 2024
1 parent 8d784c9 commit c6132d6
Show file tree
Hide file tree
Showing 28 changed files with 1,165 additions and 394 deletions.
50 changes: 30 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ members = [
"lib/si-test-macros",
"lib/telemetry-application-rs",
"lib/telemetry-http-rs",
"lib/telemetry-nats-rs",
"lib/telemetry-rs",
"lib/veritech-client",
"lib/veritech-core",
Expand Down
10 changes: 5 additions & 5 deletions lib/council-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{Graph, Id, Request, Response};
use std::time::Duration;

use futures::StreamExt;
use si_data_nats::NatsClient;
use si_data_nats::{NatsClient, Subject};
use telemetry::prelude::*;
use tokio::{signal, sync::watch};

Expand Down Expand Up @@ -213,7 +213,7 @@ pub enum Error {
#[instrument(level = "info")]
pub async fn register_graph_from_job(
complete_graph: &mut ChangeSetGraph,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
new_dependency_data: Graph,
) -> Result<(), Error> {
Expand All @@ -225,7 +225,7 @@ pub async fn register_graph_from_job(
pub async fn job_processed_a_value(
nats: &NatsClient,
complete_graph: &mut ChangeSetGraph,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
node_id: Id,
) -> Result<(), Error> {
Expand All @@ -251,7 +251,7 @@ pub async fn job_processed_a_value(
pub async fn job_failed_processing_a_value(
nats: &NatsClient,
complete_graph: &mut ChangeSetGraph,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
node_id: Id,
) -> Result<(), Error> {
Expand All @@ -278,7 +278,7 @@ pub async fn job_failed_processing_a_value(
#[instrument(level = "info")]
pub async fn job_is_going_away(
complete_graph: &mut ChangeSetGraph,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
) -> Result<(), Error> {
debug!(%reply_channel, %change_set_id, ?complete_graph, "Job is going away");
Expand Down
16 changes: 10 additions & 6 deletions lib/council-server/src/server/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
mod node_metadata;

use node_metadata::NodeMetadata;
use si_data_nats::Subject;

#[derive(Default, Debug)]
pub struct ChangeSetGraph {
Expand All @@ -20,7 +21,10 @@ impl ChangeSetGraph {
for graph in self.dependency_data.values_mut() {
for (id, metadata) in graph.iter_mut() {
if let Some(reply_channel) = metadata.next_to_process() {
result.entry(reply_channel.clone()).or_default().push(*id);
result
.entry(reply_channel.to_string())
.or_default()
.push(*id);
}
}
}
Expand All @@ -29,7 +33,7 @@ impl ChangeSetGraph {

pub fn merge_dependency_graph(
&mut self,
reply_channel: String,
reply_channel: Subject,
new_dependency_data: Graph,
change_set_id: Id,
) -> Result<(), Error> {
Expand Down Expand Up @@ -68,7 +72,7 @@ impl ChangeSetGraph {

pub fn mark_node_as_processed(
&mut self,
reply_channel: &str,
reply_channel: &Subject,
change_set_id: Id,
node_id: Id,
) -> Result<HashSet<String>, Error> {
Expand Down Expand Up @@ -96,7 +100,7 @@ impl ChangeSetGraph {
Ok(wanted_by_reply_channels)
}

pub fn remove_channel(&mut self, change_set_id: Id, reply_channel: &str) {
pub fn remove_channel(&mut self, change_set_id: Id, reply_channel: &Subject) {
if let Some(graph) = self.dependency_data.get_mut(&change_set_id) {
let mut to_remove = Vec::new();
for (id, metadata) in graph.iter_mut() {
Expand All @@ -118,10 +122,10 @@ impl ChangeSetGraph {
/// for the nodes that are being removed.
pub fn remove_node_and_dependents(
&mut self,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
node_id: Id,
) -> Result<Vec<(String, Id)>, Error> {
) -> Result<Vec<(Subject, Id)>, Error> {
let mut failure_notifications = Vec::new();
let change_set_graph_data = self.dependency_data.get_mut(&change_set_id).unwrap();

Expand Down
24 changes: 13 additions & 11 deletions lib/council-server/src/server/graph/node_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use std::{
time::Instant,
};

use si_data_nats::Subject;

use crate::{server::Error, Id};

#[derive(Debug)]
pub struct NodeMetadata {
// This should really be an ordered set, to remove duplicates, but we'll deal with
// that later.
wanted_by_reply_channels: VecDeque<String>,
processing_reply_channel: Option<String>,
wanted_by_reply_channels: VecDeque<Subject>,
processing_reply_channel: Option<Subject>,
depends_on_node_ids: HashSet<Id>,
processing_started_at: Option<Instant>,
last_updated_at: Instant,
Expand All @@ -29,7 +31,7 @@ impl Default for NodeMetadata {
}

impl NodeMetadata {
pub fn add_wanted_by_reply_channel(&mut self, reply_channel: &str) {
pub fn add_wanted_by_reply_channel(&mut self, reply_channel: &Subject) {
self.wanted_by_reply_channels
.push_back(reply_channel.to_owned());
self.last_updated_at = Instant::now();
Expand Down Expand Up @@ -62,7 +64,7 @@ impl NodeMetadata {

pub fn mark_as_processed(
&mut self,
reply_channel: &str,
reply_channel: &Subject,
) -> Result<(bool, HashSet<String>), Error> {
if self.processing_reply_channel().map(|p| &**p) != Some(reply_channel) {
return Err(Error::ShouldNotBeProcessingByJob);
Expand All @@ -79,7 +81,7 @@ impl NodeMetadata {
if self.dependencies_satisfied() {
let mut wanted_by_reply_channels = self.wanted_by_reply_channels();
if let Some(processed_by_reply_channel) = processing_reply_channel {
wanted_by_reply_channels.insert(processed_by_reply_channel);
wanted_by_reply_channels.insert(processed_by_reply_channel.to_string());
}

Ok((true, wanted_by_reply_channels))
Expand All @@ -88,7 +90,7 @@ impl NodeMetadata {
}
}

pub fn merge_metadata(&mut self, reply_channel: String, dependencies: &Vec<Id>) {
pub fn merge_metadata(&mut self, reply_channel: Subject, dependencies: &Vec<Id>) {
self.last_updated_at = Instant::now();

if !self.wanted_by_reply_channels.contains(&reply_channel) {
Expand All @@ -97,7 +99,7 @@ impl NodeMetadata {
self.depends_on_node_ids.extend(dependencies);
}

pub fn next_to_process(&mut self) -> Option<String> {
pub fn next_to_process(&mut self) -> Option<Subject> {
if self.depends_on_node_ids.is_empty() && self.processing_reply_channel.is_none() {
self.last_updated_at = Instant::now();

Expand All @@ -112,11 +114,11 @@ impl NodeMetadata {
None
}

pub fn processing_reply_channel(&self) -> Option<&String> {
pub fn processing_reply_channel(&self) -> Option<&Subject> {
self.processing_reply_channel.as_ref()
}

pub fn remove_channel(&mut self, reply_channel: &str) {
pub fn remove_channel(&mut self, reply_channel: &Subject) {
self.last_updated_at = Instant::now();

self.wanted_by_reply_channels
Expand All @@ -134,10 +136,10 @@ impl NodeMetadata {
}

pub fn wanted_by_reply_channels(&self) -> HashSet<String> {
HashSet::from_iter(self.wanted_by_reply_channels.iter().cloned())
HashSet::from_iter(self.wanted_by_reply_channels.iter().map(|s| s.to_string()))
}

pub fn wanted_by_reply_channels_iter(&self) -> Iter<'_, String> {
pub fn wanted_by_reply_channels_iter(&self) -> Iter<'_, Subject> {
self.wanted_by_reply_channels.iter()
}
}
3 changes: 3 additions & 0 deletions lib/dal/src/job/definition/dependent_values_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ impl DependentValuesUpdate {
task_ctx,
attribute_value,
pub_council.clone(),
Span::current(),
));
}
}
Expand Down Expand Up @@ -317,6 +318,7 @@ impl DependentValuesUpdate {
/// play more nicely with being spawned into a `JoinSet`.
#[instrument(
name = "dependent_values_update.update_value",
parent = &parent_span,
skip_all,
level = "info",
fields(
Expand All @@ -327,6 +329,7 @@ async fn update_value(
ctx: DalContext,
mut attribute_value: AttributeValue,
council: council_server::PubClient,
parent_span: Span,
) -> JobConsumerResult<()> {
let update_result = attribute_value.update_from_prototype_function(&ctx).await;
// We don't propagate the error up, because we want the rest of the nodes in the graph to make progress
Expand Down
Loading

0 comments on commit c6132d6

Please sign in to comment.