diff --git a/Cargo.lock b/Cargo.lock index caaf4ad993..d79f0991cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2243,6 +2243,25 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "fake-opentelemetry-collector" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc2711530ca4a8dd909e447cf61f299d4492ec5a046d5982e1c06694c8953756" +dependencies = [ + "futures", + "hex", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-proto", + "opentelemetry_sdk", + "serde 1.0.210", + "tokio", + "tokio-stream", + "tonic", + "tracing", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -4991,6 +5010,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0da0d6b47a3dbc6e9c9e36a0520e25cf943e046843818faaa3f87365a548c82" dependencies = [ + "async-std", "async-trait", "futures-channel", "futures-executor", @@ -6943,6 +6963,7 @@ dependencies = [ "ctrlc", "dialoguer", "dirs 5.0.1", + "fake-opentelemetry-collector", "futures", "glob", "hex", @@ -7117,6 +7138,7 @@ dependencies = [ "lru", "serde 1.0.210", "spin-core", + "spin-factor-observe", "spin-factors", "spin-factors-test", "spin-key-value-redis", @@ -7137,6 +7159,7 @@ dependencies = [ "anyhow", "async-trait", "serde 1.0.210", + "spin-factor-observe", "spin-factors", "spin-factors-test", "spin-llm-local", @@ -7149,6 +7172,23 @@ dependencies = [ "url", ] +[[package]] +name = "spin-factor-observe" +version = "2.8.0-pre0" +dependencies = [ + "anyhow", + "indexmap 2.5.0", + "opentelemetry", + "opentelemetry_sdk", + "spin-core", + "spin-factors", + "spin-world", + "table", + "toml 0.5.11", + "tracing", + "tracing-opentelemetry", +] + [[package]] name = "spin-factor-outbound-http" version = "2.8.0-pre0" @@ -7160,6 +7200,7 @@ dependencies = [ "ip_network", "reqwest 0.12.7", "rustls 0.23.13", + "spin-factor-observe", "spin-factor-outbound-networking", "spin-factor-variables", "spin-factors", @@ -7182,6 +7223,7 @@ dependencies = [ "anyhow", "rumqttc", "spin-core", + "spin-factor-observe", "spin-factor-outbound-networking", "spin-factor-variables", "spin-factors", @@ -7203,6 +7245,7 @@ dependencies = [ "spin-app", "spin-core", "spin-expressions", + "spin-factor-observe", "spin-factor-outbound-networking", "spin-factor-variables", "spin-factors", @@ -7253,6 +7296,7 @@ dependencies = [ "native-tls", "postgres-native-tls", "spin-core", + "spin-factor-observe", "spin-factor-outbound-networking", "spin-factor-variables", "spin-factors", @@ -7271,6 +7315,7 @@ dependencies = [ "anyhow", "redis 0.25.4", "spin-core", + "spin-factor-observe", "spin-factor-outbound-networking", "spin-factor-variables", "spin-factors", @@ -7287,6 +7332,7 @@ version = "2.8.0-pre0" dependencies = [ "async-trait", "serde 1.0.210", + "spin-factor-observe", "spin-factors", "spin-factors-test", "spin-locked-app", @@ -7307,6 +7353,7 @@ dependencies = [ "dotenvy", "serde 1.0.210", "spin-expressions", + "spin-factor-observe", "spin-factors", "spin-factors-test", "spin-world", @@ -7612,6 +7659,7 @@ dependencies = [ "spin-common", "spin-factor-key-value", "spin-factor-llm", + "spin-factor-observe", "spin-factor-outbound-http", "spin-factor-outbound-mqtt", "spin-factor-outbound-mysql", @@ -7643,6 +7691,7 @@ dependencies = [ "spin-common", "spin-factor-key-value", "spin-factor-llm", + "spin-factor-observe", "spin-factor-outbound-http", "spin-factor-outbound-mqtt", "spin-factor-outbound-mysql", @@ -7820,6 +7869,7 @@ dependencies = [ "serde_json", "spin-app", "spin-core", + "spin-factor-observe", "spin-factor-outbound-http", "spin-factor-outbound-networking", "spin-factor-wasi", @@ -7861,7 +7911,9 @@ dependencies = [ name = "spin-world" version = "2.8.0-pre0" dependencies = [ + "anyhow", "async-trait", + "opentelemetry", "wasmtime", ] diff --git a/Cargo.toml b/Cargo.toml index f6b19eae5f..1238431447 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ openssl = { version = "0.10" } anyhow = { workspace = true, features = ["backtrace"] } conformance = { path = "tests/conformance-tests" } conformance-tests = { workspace = true } +fake-opentelemetry-collector = "0.21.1" hex = "0.4" http-body-util = { workspace = true } hyper = { workspace = true } diff --git a/crates/factor-key-value/Cargo.toml b/crates/factor-key-value/Cargo.toml index 5f12662f84..eb0b41326a 100644 --- a/crates/factor-key-value/Cargo.toml +++ b/crates/factor-key-value/Cargo.toml @@ -9,6 +9,7 @@ anyhow = { workspace = true } lru = "0.12" serde = { workspace = true } spin-core = { path = "../core" } +spin-factor-observe = { path = "../factor-observe" } spin-factors = { path = "../factors" } spin-locked-app = { path = "../locked-app" } spin-world = { path = "../world" } diff --git a/crates/factor-key-value/src/host.rs b/crates/factor-key-value/src/host.rs index 7cc76a04b3..924bd90d99 100644 --- a/crates/factor-key-value/src/host.rs +++ b/crates/factor-key-value/src/host.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; use spin_core::{async_trait, wasmtime::component::Resource}; +use spin_factor_observe::ObserveContext; use spin_world::v2::key_value; use std::{collections::HashSet, sync::Arc}; use table::Table; @@ -36,22 +37,25 @@ pub struct KeyValueDispatch { allowed_stores: HashSet, manager: Arc, stores: Table>, + observe_context: Option, } impl KeyValueDispatch { pub fn new(allowed_stores: HashSet, manager: Arc) -> Self { - Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY) + Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY, None) } pub fn new_with_capacity( allowed_stores: HashSet, manager: Arc, capacity: u32, + observe_context: Option, ) -> Self { Self { allowed_stores, manager, stores: Table::new(capacity), + observe_context, } } @@ -71,6 +75,9 @@ impl key_value::Host for KeyValueDispatch {} impl key_value::HostStore for KeyValueDispatch { #[instrument(name = "spin_key_value.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))] async fn open(&mut self, name: String) -> Result, Error>> { + if let Some(observe_context) = self.observe_context.as_ref() { + observe_context.reparent_tracing_span() + } Ok(async { if self.allowed_stores.contains(&name) { let store = self @@ -91,6 +98,9 @@ impl key_value::HostStore for KeyValueDispatch { store: Resource, key: String, ) -> Result>, Error>> { + if let Some(observe_context) = self.observe_context.as_ref() { + observe_context.reparent_tracing_span() + } let store = self.get_store(store)?; Ok(store.get(&key).await) } @@ -102,6 +112,9 @@ impl key_value::HostStore for KeyValueDispatch { key: String, value: Vec, ) -> Result> { + if let Some(observe_context) = self.observe_context.as_ref() { + observe_context.reparent_tracing_span() + } let store = self.get_store(store)?; Ok(store.set(&key, &value).await) } @@ -112,6 +125,9 @@ impl key_value::HostStore for KeyValueDispatch { store: Resource, key: String, ) -> Result> { + if let Some(observe_context) = self.observe_context.as_ref() { + observe_context.reparent_tracing_span() + } let store = self.get_store(store)?; Ok(store.delete(&key).await) } @@ -122,6 +138,9 @@ impl key_value::HostStore for KeyValueDispatch { store: Resource, key: String, ) -> Result> { + if let Some(observe_context) = self.observe_context.as_ref() { + observe_context.reparent_tracing_span() + } let store = self.get_store(store)?; Ok(store.exists(&key).await) } @@ -131,6 +150,9 @@ impl key_value::HostStore for KeyValueDispatch { &mut self, store: Resource, ) -> Result, Error>> { + if let Some(observe_context) = self.observe_context.as_ref() { + observe_context.reparent_tracing_span() + } let store = self.get_store(store)?; Ok(store.get_keys().await) } diff --git a/crates/factor-key-value/src/lib.rs b/crates/factor-key-value/src/lib.rs index 685766b51c..3ea72b46d3 100644 --- a/crates/factor-key-value/src/lib.rs +++ b/crates/factor-key-value/src/lib.rs @@ -8,6 +8,7 @@ use std::{ }; use anyhow::ensure; +use spin_factor_observe::ObserveContext; use spin_factors::{ ConfigureAppContext, Factor, FactorInstanceBuilder, InitContext, PrepareContext, RuntimeFactors, }; @@ -81,7 +82,7 @@ impl Factor for KeyValueFactor { fn prepare( &self, - ctx: PrepareContext, + mut ctx: PrepareContext, ) -> anyhow::Result { let app_state = ctx.app_state(); let allowed_stores = app_state @@ -89,9 +90,11 @@ impl Factor for KeyValueFactor { .get(ctx.app_component().id()) .expect("component should be in component_stores") .clone(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; Ok(InstanceBuilder { store_manager: app_state.store_manager.clone(), allowed_stores, + observe_context, }) } } @@ -140,6 +143,7 @@ pub struct InstanceBuilder { store_manager: Arc, /// The allowed stores for this component instance. allowed_stores: HashSet, + observe_context: ObserveContext, } impl FactorInstanceBuilder for InstanceBuilder { @@ -149,11 +153,13 @@ impl FactorInstanceBuilder for InstanceBuilder { let Self { store_manager, allowed_stores, + observe_context, } = self; Ok(KeyValueDispatch::new_with_capacity( allowed_stores, store_manager, u32::MAX, + Some(observe_context), )) } } diff --git a/crates/factor-llm/Cargo.toml b/crates/factor-llm/Cargo.toml index ef5a985492..bfda4e7bad 100644 --- a/crates/factor-llm/Cargo.toml +++ b/crates/factor-llm/Cargo.toml @@ -17,6 +17,7 @@ llm-cublas = ["llm", "spin-llm-local/cublas"] anyhow = { workspace = true } async-trait = { workspace = true } serde = { workspace = true } +spin-factor-observe = { path = "../factor-observe" } spin-factors = { path = "../factors" } spin-llm-local = { path = "../llm-local", optional = true } spin-llm-remote-http = { path = "../llm-remote-http" } diff --git a/crates/factor-llm/src/host.rs b/crates/factor-llm/src/host.rs index 7f5e077464..cdd89efb92 100644 --- a/crates/factor-llm/src/host.rs +++ b/crates/factor-llm/src/host.rs @@ -15,6 +15,8 @@ impl v2::Host for InstanceState { prompt: String, params: Option, ) -> Result { + self.observe_context.reparent_tracing_span(); + if !self.allowed_models.contains(&model) { return Err(access_denied_error(&model)); } @@ -42,6 +44,8 @@ impl v2::Host for InstanceState { model: v1::EmbeddingModel, data: Vec, ) -> Result { + self.observe_context.reparent_tracing_span(); + if !self.allowed_models.contains(&model) { return Err(access_denied_error(&model)); } diff --git a/crates/factor-llm/src/lib.rs b/crates/factor-llm/src/lib.rs index 6491d9afcd..34f9faaf19 100644 --- a/crates/factor-llm/src/lib.rs +++ b/crates/factor-llm/src/lib.rs @@ -5,6 +5,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use async_trait::async_trait; +use spin_factor_observe::ObserveContext; use spin_factors::{ ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder, }; @@ -76,7 +77,7 @@ impl Factor for LlmFactor { fn prepare( &self, - ctx: PrepareContext, + mut ctx: PrepareContext, ) -> anyhow::Result { let allowed_models = ctx .app_state() @@ -85,10 +86,12 @@ impl Factor for LlmFactor { .cloned() .unwrap_or_default(); let engine = ctx.app_state().engine.clone(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; Ok(InstanceState { engine, allowed_models, + observe_context, }) } } @@ -103,6 +106,7 @@ pub struct AppState { pub struct InstanceState { engine: Arc>, pub allowed_models: Arc>, + observe_context: ObserveContext, } /// The runtime configuration for the LLM factor. diff --git a/crates/factor-observe/Cargo.toml b/crates/factor-observe/Cargo.toml new file mode 100644 index 0000000000..535f961328 --- /dev/null +++ b/crates/factor-observe/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "spin-factor-observe" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +indexmap = "2.2.6" +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } +spin-core = { path = "../core" } +spin-factors = { path = "../factors" } +spin-world = { path = "../world" } +table = { path = "../table" } +tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } + +[dev-dependencies] +toml = "0.5" + +[lints] +workspace = true diff --git a/crates/factor-observe/src/host.rs b/crates/factor-observe/src/host.rs new file mode 100644 index 0000000000..b4a401ebd0 --- /dev/null +++ b/crates/factor-observe/src/host.rs @@ -0,0 +1,244 @@ +use std::time::SystemTime; + +use anyhow::anyhow; +use anyhow::Result; +use opentelemetry::global::ObjectSafeSpan; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::trace::Tracer; +use opentelemetry::Context; +use spin_core::async_trait; +use spin_core::wasmtime::component::Resource; +use spin_world::wasi::observe::tracer; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::{GuestSpan, InstanceState}; + +#[async_trait] +impl tracer::Host for InstanceState { + async fn start( + &mut self, + name: String, + options: Option, + ) -> Result> { + let mut state = self.state.write().unwrap(); + let options = options.unwrap_or_default(); + + // Before we ever create any new spans make sure we track the original host span ID + if state.original_host_span_id.is_none() { + state.original_host_span_id = Some( + tracing::Span::current() + .context() + .span() + .span_context() + .span_id(), + ); + } + + // Get span's parent based on whether it's a new root and whether there are any active spans + let parent_context = match (options.new_root, state.active_spans.is_empty()) { + // Not a new root && Active spans -> Last active guest span is parent + (false, false) => { + let span_context = state + .guest_spans + .get(*state.active_spans.last().unwrap()) + .unwrap() + .inner + .span_context() + .clone(); + Context::new().with_remote_span_context(span_context) + } + // Not a new root && No active spans -> Current host span is parent + (false, true) => tracing::Span::current().context(), + // New root && n/a -> No parent + (true, _) => Context::new(), + }; + + // Create the underlying opentelemetry span + let mut builder = self.tracer.span_builder(name); + if let Some(kind) = options.span_kind { + builder = builder.with_kind(kind.into()); + } + if let Some(attributes) = options.attributes { + builder = builder.with_attributes(attributes.into_iter().map(Into::into)); + } + if let Some(links) = options.links { + builder = builder.with_links(links.into_iter().map(Into::into).collect()); + } + if let Some(timestamp) = options.timestamp { + builder = builder.with_start_time(timestamp); + } + let otel_span = builder.start_with_context(&self.tracer, &parent_context); + + // Wrap it in a GuestSpan for our own bookkeeping purposes + let guest_span = GuestSpan { inner: otel_span }; + + // Put the GuestSpan in our resource table and push it on to our stack of active spans + let resource_id = state.guest_spans.push(guest_span).unwrap(); + state.active_spans.insert(resource_id); + + Ok(Resource::new_own(resource_id)) + } +} + +#[async_trait] +impl tracer::HostSpan for InstanceState { + async fn span_context( + &mut self, + resource: Resource, + ) -> Result { + if let Some(guest_span) = self.state.read().unwrap().guest_spans.get(resource.rep()) { + Ok(guest_span.inner.span_context().clone().into()) + } else { + Err(anyhow!("BUG: cannot find resource in table")) + } + } + + async fn is_recording(&mut self, resource: Resource) -> Result { + if let Some(guest_span) = self.state.read().unwrap().guest_spans.get(resource.rep()) { + Ok(guest_span.inner.is_recording()) + } else { + Err(anyhow!("BUG: cannot find resource in table")) + } + } + + async fn set_attributes( + &mut self, + resource: Resource, + attributes: Vec, + ) -> Result<()> { + if let Some(guest_span) = self + .state + .write() + .unwrap() + .guest_spans + .get_mut(resource.rep()) + { + for attribute in attributes { + guest_span.inner.set_attribute(attribute.into()); + } + Ok(()) + } else { + Err(anyhow!("BUG: cannot find resource in table")) + } + } + + async fn add_event( + &mut self, + resource: Resource, + name: String, + timestamp: Option, + attributes: Option>, + ) -> Result<()> { + if let Some(guest_span) = self + .state + .write() + .unwrap() + .guest_spans + .get_mut(resource.rep()) + { + let timestamp = timestamp.map(Into::into).unwrap_or_else(SystemTime::now); + let attributes = if let Some(attributes) = attributes { + attributes.into_iter().map(Into::into).collect() + } else { + vec![] + }; + + guest_span + .inner + .add_event_with_timestamp(name.into(), timestamp, attributes); + + Ok(()) + } else { + Err(anyhow!("BUG: cannot find resource in table")) + } + } + + async fn add_link( + &mut self, + resource: Resource, + link: tracer::Link, + ) -> Result<()> { + if let Some(guest_span) = self + .state + .write() + .unwrap() + .guest_spans + .get_mut(resource.rep()) + { + guest_span.inner.add_link( + link.span_context.into(), + link.attributes.into_iter().map(Into::into).collect(), + ); + Ok(()) + } else { + Err(anyhow!("BUG: cannot find resource in table")) + } + } + + async fn set_status( + &mut self, + resource: Resource, + status: tracer::Status, + ) -> Result<()> { + if let Some(guest_span) = self + .state + .write() + .unwrap() + .guest_spans + .get_mut(resource.rep()) + { + guest_span.inner.set_status(status.into()); + Ok(()) + } else { + Err(anyhow!("BUG: cannot find resource in table")) + } + } + + async fn update_name(&mut self, resource: Resource, name: String) -> Result<()> { + if let Some(guest_span) = self + .state + .write() + .unwrap() + .guest_spans + .get_mut(resource.rep()) + { + guest_span.inner.update_name(name.into()); + Ok(()) + } else { + Err(anyhow!("BUG: cannot find resource in table")) + } + } + + async fn end( + &mut self, + resource: Resource, + timestamp: Option, + ) -> Result<()> { + let mut state = self.state.write().unwrap(); + if let Some(guest_span) = state.guest_spans.get_mut(resource.rep()) { + if let Some(timestamp) = timestamp { + guest_span.inner.end_with_timestamp(timestamp.into()); + } else { + guest_span.inner.end(); + } + + // Remove the span from active_spans + state.active_spans.shift_remove(&resource.rep()); + + Ok(()) + } else { + Err(anyhow!("BUG: cannot find resource in table")) + } + } + + async fn drop(&mut self, resource: Resource) -> Result<()> { + // Dropping the resource automatically calls drop on the Span which ends itself with the + // current timestamp if the Span is not already ended. + + // Ensure that the span has been removed from active_spans + let mut state = self.state.write().unwrap(); + state.active_spans.shift_remove(&resource.rep()); + + Ok(()) + } +} diff --git a/crates/factor-observe/src/lib.rs b/crates/factor-observe/src/lib.rs new file mode 100644 index 0000000000..71d8979d80 --- /dev/null +++ b/crates/factor-observe/src/lib.rs @@ -0,0 +1,177 @@ +mod host; + +use std::sync::{Arc, RwLock}; + +use indexmap::IndexSet; +use opentelemetry::{ + global::{self, BoxedTracer, ObjectSafeSpan}, + trace::{SpanId, TraceContextExt}, + Context, +}; +use spin_factors::{Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +#[derive(Default)] +pub struct ObserveFactor {} + +impl Factor for ObserveFactor { + type RuntimeConfig = (); + type AppState = (); + type InstanceBuilder = InstanceState; + + fn init( + &mut self, + mut ctx: spin_factors::InitContext, + ) -> anyhow::Result<()> { + ctx.link_bindings(spin_world::wasi::observe::tracer::add_to_linker)?; + Ok(()) + } + + fn configure_app( + &self, + _ctx: spin_factors::ConfigureAppContext, + ) -> anyhow::Result { + Ok(()) + } + + fn prepare( + &self, + ctx: spin_factors::PrepareContext, + ) -> anyhow::Result { + let tracer = global::tracer(ctx.app_component().app.id().to_string()); + Ok(InstanceState { + state: Arc::new(RwLock::new(State { + guest_spans: table::Table::new(1024), + active_spans: Default::default(), + original_host_span_id: None, + })), + tracer, + }) + } +} + +impl ObserveFactor { + pub fn new() -> Self { + Self::default() + } +} + +pub struct InstanceState { + pub(crate) state: Arc>, + pub(crate) tracer: BoxedTracer, +} + +impl SelfInstanceBuilder for InstanceState {} + +/// Internal state of the ObserveFactor instance state. +/// +/// This data lives here rather than directly on InstanceState so that we can have multiple things +/// take Arc references to it. +pub(crate) struct State { + /// A resource table that holds the guest spans. + pub(crate) guest_spans: table::Table, + + /// A stack of resource ids for all the active guest spans. The topmost span is the active span. + /// + /// When a guest span is ended it is removed from this stack (regardless of whether is the + /// active span) and all other spans are shifted back to retain relative order. + pub(crate) active_spans: IndexSet, + + /// Id of the last span emitted from within the host before entering the guest. + /// + /// We use this to avoid accidentally reparenting the original host span as a child of a guest + /// span. + pub(crate) original_host_span_id: Option, +} + +/// The WIT resource Span. Effectively wraps an [opentelemetry::global::BoxedSpan]. +pub struct GuestSpan { + /// The [opentelemetry::global::BoxedSpan] we use to do the actual tracing work. + pub inner: opentelemetry::global::BoxedSpan, +} + +/// Manages access to the ObserveFactor state for the purpose of maintaining proper span +/// parent/child relationships when WASI Observe spans are being created. +pub struct ObserveContext { + pub(crate) state: Option>>, +} + +impl ObserveContext { + /// Creates an [`ObserveContext`] from a [`PrepareContext`]. + /// + /// If [`RuntimeFactors`] does not contain an [`ObserveFactor`], then calling + /// [`ObserveContext::reparent_tracing_span`] will be a no-op. + pub fn from_prepare_context( + prepare_context: &mut PrepareContext, + ) -> anyhow::Result { + let state = match prepare_context.instance_builder::() { + Ok(instance_state) => Some(instance_state.state.clone()), + Err(spin_factors::Error::NoSuchFactor(_)) => None, + Err(e) => return Err(e.into()), + }; + Ok(Self { state }) + } + + /// Reparents the current [tracing] span to be a child of the last active guest span. + /// + /// The observe factor enables guests to emit spans that should be part of the same trace as the + /// host is producing for a request. Below is an example trace. A request is made to an app, a + /// guest span is created and then the host is re-entered to fetch a key value. + /// + /// ```text + /// | GET /... _________________________________| + /// | execute_wasm_component foo ___________| + /// | my_guest_span ___________________| + /// | spin_key_value.get | + /// ``` + /// + /// Setting the guest spans parent as the host is trivially done. However, the more difficult + /// task is having the host factor spans be children of the guest span. + /// [`ObserveContext::reparent_tracing_span`] handles this by reparenting the current span to be + /// a child of the last active guest span (which is tracked internally in the observe factor). + /// + /// Note that if the observe factor is not in your [`RuntimeFactors`] than this is effectively a + /// no-op. + /// + /// This MUST only be called from a factor host implementation function that is instrumented. + /// + /// This MUST be called at the very start of the function before any awaits. + pub fn reparent_tracing_span(&self) { + // If state is None then we want to return early b/c the factor doesn't depend on the + // Observe factor and therefore there is nothing to do + let state = if let Some(state) = self.state.as_ref() { + state.read().unwrap() + } else { + return; + }; + + // If there are no active guest spans then there is nothing to do + let Some(active_span) = state.active_spans.last() else { + return; + }; + + // Ensure that we are not reparenting the original host span + if let Some(original_host_span_id) = state.original_host_span_id { + if tracing::Span::current() + .context() + .span() + .span_context() + .span_id() + .eq(&original_host_span_id) + { + panic!("Incorrectly attempting to reparent the original host span. Likely `reparent_tracing_span` was called in an incorrect location.") + } + } + + // Now reparent the current span to the last active guest span + let span_context = state + .guest_spans + .get(*active_span) + .unwrap() + .inner + .span_context() + .clone(); + let parent_context = Context::new().with_remote_span_context(span_context); + tracing::Span::current().set_parent(parent_context); + } +} diff --git a/crates/factor-outbound-http/Cargo.toml b/crates/factor-outbound-http/Cargo.toml index 55f620f9d4..29792a73f4 100644 --- a/crates/factor-outbound-http/Cargo.toml +++ b/crates/factor-outbound-http/Cargo.toml @@ -12,6 +12,7 @@ hyper = { workspace = true } ip_network = "0.4" reqwest = { version = "0.12", features = ["gzip"] } rustls = { workspace = true } +spin-factor-observe = { path = "../factor-observe" } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factors = { path = "../factors" } spin-telemetry = { path = "../telemetry" } diff --git a/crates/factor-outbound-http/src/lib.rs b/crates/factor-outbound-http/src/lib.rs index cfcb167d5e..714ec56bc3 100644 --- a/crates/factor-outbound-http/src/lib.rs +++ b/crates/factor-outbound-http/src/lib.rs @@ -12,6 +12,7 @@ use http::{ HeaderValue, Uri, }; use intercept::OutboundHttpInterceptor; +use spin_factor_observe::ObserveContext; use spin_factor_outbound_networking::{ ComponentTlsConfigs, OutboundAllowedHosts, OutboundNetworkingFactor, }; @@ -75,6 +76,7 @@ impl Factor for OutboundHttpFactor { let outbound_networking = ctx.instance_builder::()?; let allowed_hosts = outbound_networking.allowed_hosts(); let component_tls_configs = outbound_networking.component_tls_configs().clone(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; Ok(InstanceState { wasi_http_ctx: WasiHttpCtx::new(), allowed_hosts, @@ -83,6 +85,7 @@ impl Factor for OutboundHttpFactor { self_request_origin: None, request_interceptor: None, spin_http_client: None, + observe_context, }) } } @@ -96,6 +99,7 @@ pub struct InstanceState { request_interceptor: Option>, // Connection-pooling client for 'fermyon:spin/http' interface spin_http_client: Option, + observe_context: ObserveContext, } impl InstanceState { diff --git a/crates/factor-outbound-http/src/spin.rs b/crates/factor-outbound-http/src/spin.rs index 51d173e1a7..9801ed97a9 100644 --- a/crates/factor-outbound-http/src/spin.rs +++ b/crates/factor-outbound-http/src/spin.rs @@ -16,6 +16,8 @@ impl spin_http::Host for crate::InstanceState { fields(otel.kind = "client", url.full = Empty, http.request.method = Empty, http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))] async fn send_request(&mut self, req: Request) -> Result { + self.observe_context.reparent_tracing_span(); + let span = Span::current(); record_request_fields(&span, &req); diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index a8602498f1..769acdc9c5 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -87,6 +87,8 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> { request: Request, config: wasmtime_wasi_http::types::OutgoingRequestConfig, ) -> wasmtime_wasi_http::HttpResult { + self.state.observe_context.reparent_tracing_span(); + Ok(HostFutureIncomingResponse::Pending( wasmtime_wasi::runtime::spawn( send_request_impl( diff --git a/crates/factor-outbound-mqtt/Cargo.toml b/crates/factor-outbound-mqtt/Cargo.toml index 6f8700c1ee..f217d9deae 100644 --- a/crates/factor-outbound-mqtt/Cargo.toml +++ b/crates/factor-outbound-mqtt/Cargo.toml @@ -8,6 +8,7 @@ edition = { workspace = true } anyhow = { workspace = true } rumqttc = { version = "0.24", features = ["url"] } spin-core = { path = "../core" } +spin-factor-observe = { path = "../factor-observe" } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factors = { path = "../factors" } spin-world = { path = "../world" } diff --git a/crates/factor-outbound-mqtt/src/host.rs b/crates/factor-outbound-mqtt/src/host.rs index ae3ca11181..983b333ffe 100644 --- a/crates/factor-outbound-mqtt/src/host.rs +++ b/crates/factor-outbound-mqtt/src/host.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::Result; use spin_core::{async_trait, wasmtime::component::Resource}; +use spin_factor_observe::ObserveContext; use spin_factor_outbound_networking::OutboundAllowedHosts; use spin_world::v2::mqtt::{self as v2, Connection, Error, Qos}; use tracing::{instrument, Level}; @@ -12,14 +13,20 @@ pub struct InstanceState { allowed_hosts: OutboundAllowedHosts, connections: table::Table>, create_client: Arc, + observe_context: ObserveContext, } impl InstanceState { - pub fn new(allowed_hosts: OutboundAllowedHosts, create_client: Arc) -> Self { + pub fn new( + allowed_hosts: OutboundAllowedHosts, + create_client: Arc, + observe_context: ObserveContext, + ) -> Self { Self { allowed_hosts, create_client, connections: table::Table::new(1024), + observe_context, } } } @@ -73,6 +80,8 @@ impl v2::HostConnection for InstanceState { password: String, keep_alive_interval: u64, ) -> Result, Error> { + self.observe_context.reparent_tracing_span(); + if !self .is_address_allowed(&address) .await @@ -106,6 +115,8 @@ impl v2::HostConnection for InstanceState { payload: Vec, qos: Qos, ) -> Result<(), Error> { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; conn.publish_bytes(topic, qos, payload).await?; diff --git a/crates/factor-outbound-mqtt/src/lib.rs b/crates/factor-outbound-mqtt/src/lib.rs index 79c5138e5c..d5b8fc0867 100644 --- a/crates/factor-outbound-mqtt/src/lib.rs +++ b/crates/factor-outbound-mqtt/src/lib.rs @@ -7,6 +7,7 @@ use host::other_error; use host::InstanceState; use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS}; use spin_core::async_trait; +use spin_factor_observe::ObserveContext; use spin_factor_outbound_networking::OutboundNetworkingFactor; use spin_factors::{ ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder, @@ -53,9 +54,12 @@ impl Factor for OutboundMqttFactor { let allowed_hosts = ctx .instance_builder::()? .allowed_hosts(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; + Ok(InstanceState::new( allowed_hosts, self.create_client.clone(), + observe_context, )) } } diff --git a/crates/factor-outbound-mysql/Cargo.toml b/crates/factor-outbound-mysql/Cargo.toml index 6281abeec5..283fd24700 100644 --- a/crates/factor-outbound-mysql/Cargo.toml +++ b/crates/factor-outbound-mysql/Cargo.toml @@ -19,6 +19,7 @@ mysql_common = { version = "0.32", default-features = false } spin-app = { path = "../app" } spin-core = { path = "../core" } spin-expressions = { path = "../expressions" } +spin-factor-observe = { path = "../factor-observe" } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factors = { path = "../factors" } spin-world = { path = "../world" } diff --git a/crates/factor-outbound-mysql/src/host.rs b/crates/factor-outbound-mysql/src/host.rs index 4fe46b96d6..97262d619d 100644 --- a/crates/factor-outbound-mysql/src/host.rs +++ b/crates/factor-outbound-mysql/src/host.rs @@ -40,6 +40,8 @@ impl v2::Host for InstanceState {} impl v2::HostConnection for InstanceState { #[instrument(name = "spin_outbound_mysql.open_connection", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "mysql"))] async fn open(&mut self, address: String) -> Result, v2::Error> { + self.observe_context.reparent_tracing_span(); + if !self .is_address_allowed(&address) .await @@ -59,6 +61,8 @@ impl v2::HostConnection for InstanceState { statement: String, params: Vec, ) -> Result<(), v2::Error> { + self.observe_context.reparent_tracing_span(); + Ok(self .get_client(connection) .await? @@ -73,6 +77,8 @@ impl v2::HostConnection for InstanceState { statement: String, params: Vec, ) -> Result { + self.observe_context.reparent_tracing_span(); + Ok(self .get_client(connection) .await? diff --git a/crates/factor-outbound-mysql/src/lib.rs b/crates/factor-outbound-mysql/src/lib.rs index c955a78402..65732c818a 100644 --- a/crates/factor-outbound-mysql/src/lib.rs +++ b/crates/factor-outbound-mysql/src/lib.rs @@ -3,6 +3,7 @@ mod host; use client::Client; use mysql_async::Conn as MysqlClient; +use spin_factor_observe::ObserveContext; use spin_factor_outbound_networking::{OutboundAllowedHosts, OutboundNetworkingFactor}; use spin_factors::{Factor, InitContext, RuntimeFactors, SelfInstanceBuilder}; use spin_world::v1::mysql as v1; @@ -37,9 +38,12 @@ impl Factor for OutboundMysqlFactor { let allowed_hosts = ctx .instance_builder::()? .allowed_hosts(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; + Ok(InstanceState { allowed_hosts, connections: Default::default(), + observe_context, }) } } @@ -61,6 +65,7 @@ impl OutboundMysqlFactor { pub struct InstanceState { allowed_hosts: OutboundAllowedHosts, connections: table::Table, + observe_context: ObserveContext, } impl SelfInstanceBuilder for InstanceState {} diff --git a/crates/factor-outbound-pg/Cargo.toml b/crates/factor-outbound-pg/Cargo.toml index 7aef4cd817..07b450d0db 100644 --- a/crates/factor-outbound-pg/Cargo.toml +++ b/crates/factor-outbound-pg/Cargo.toml @@ -9,6 +9,7 @@ anyhow = { workspace = true } native-tls = "0.2" postgres-native-tls = "0.5" spin-core = { path = "../core" } +spin-factor-observe = { path = "../factor-observe" } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factors = { path = "../factors" } spin-world = { path = "../world" } diff --git a/crates/factor-outbound-pg/src/host.rs b/crates/factor-outbound-pg/src/host.rs index c68eb3a189..a23c0bd7c4 100644 --- a/crates/factor-outbound-pg/src/host.rs +++ b/crates/factor-outbound-pg/src/host.rs @@ -65,6 +65,8 @@ impl v2::Host for InstanceState {} impl v2::HostConnection for InstanceState { #[instrument(name = "spin_outbound_pg.open_connection", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql"))] async fn open(&mut self, address: String) -> Result, v2::Error> { + self.observe_context.reparent_tracing_span(); + if !self .is_address_allowed(&address) .await @@ -84,6 +86,8 @@ impl v2::HostConnection for InstanceState { statement: String, params: Vec, ) -> Result { + self.observe_context.reparent_tracing_span(); + Ok(self .get_client(connection) .await? @@ -98,6 +102,8 @@ impl v2::HostConnection for InstanceState { statement: String, params: Vec, ) -> Result { + self.observe_context.reparent_tracing_span(); + Ok(self .get_client(connection) .await? diff --git a/crates/factor-outbound-pg/src/lib.rs b/crates/factor-outbound-pg/src/lib.rs index 9f044c1c0c..68ba9ae8a4 100644 --- a/crates/factor-outbound-pg/src/lib.rs +++ b/crates/factor-outbound-pg/src/lib.rs @@ -2,6 +2,7 @@ pub mod client; mod host; use client::Client; +use spin_factor_observe::ObserveContext; use spin_factor_outbound_networking::{OutboundAllowedHosts, OutboundNetworkingFactor}; use spin_factors::{ anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder, @@ -40,9 +41,12 @@ impl Factor for OutboundPgFactor { let allowed_hosts = ctx .instance_builder::()? .allowed_hosts(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; + Ok(InstanceState { allowed_hosts, connections: Default::default(), + observe_context, }) } } @@ -64,6 +68,7 @@ impl OutboundPgFactor { pub struct InstanceState { allowed_hosts: OutboundAllowedHosts, connections: table::Table, + observe_context: ObserveContext, } impl SelfInstanceBuilder for InstanceState {} diff --git a/crates/factor-outbound-redis/Cargo.toml b/crates/factor-outbound-redis/Cargo.toml index fd901e312b..d26042911d 100644 --- a/crates/factor-outbound-redis/Cargo.toml +++ b/crates/factor-outbound-redis/Cargo.toml @@ -8,6 +8,7 @@ edition = { workspace = true } anyhow = { workspace = true } redis = { version = "0.25", features = ["tokio-comp", "tokio-native-tls-comp", "aio"] } spin-core = { path = "../core" } +spin-factor-observe = { path = "../factor-observe" } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factors = { path = "../factors" } spin-world = { path = "../world" } diff --git a/crates/factor-outbound-redis/src/host.rs b/crates/factor-outbound-redis/src/host.rs index d31525d60e..1357e51f02 100644 --- a/crates/factor-outbound-redis/src/host.rs +++ b/crates/factor-outbound-redis/src/host.rs @@ -1,6 +1,7 @@ use anyhow::Result; use redis::{aio::MultiplexedConnection, AsyncCommands, FromRedisValue, Value}; use spin_core::{async_trait, wasmtime::component::Resource}; +use spin_factor_observe::ObserveContext; use spin_factor_outbound_networking::OutboundAllowedHosts; use spin_world::v1::{redis as v1, redis_types}; use spin_world::v2::redis::{ @@ -11,6 +12,7 @@ use tracing::{instrument, Level}; pub struct InstanceState { pub allowed_hosts: OutboundAllowedHosts, pub connections: table::Table, + pub observe_context: ObserveContext, } impl InstanceState { @@ -55,6 +57,8 @@ impl v2::Host for crate::InstanceState { impl v2::HostConnection for crate::InstanceState { #[instrument(name = "spin_outbound_redis.open_connection", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis"))] async fn open(&mut self, address: String) -> Result, Error> { + self.observe_context.reparent_tracing_span(); + if !self .is_address_allowed(&address) .await @@ -73,6 +77,8 @@ impl v2::HostConnection for crate::InstanceState { channel: String, payload: Vec, ) -> Result<(), Error> { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; // The `let () =` syntax is needed to suppress a warning when the result type is inferred. // You can read more about the issue here: @@ -89,6 +95,8 @@ impl v2::HostConnection for crate::InstanceState { connection: Resource, key: String, ) -> Result>, Error> { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; let value = conn.get(&key).await.map_err(other_error)?; Ok(value) @@ -101,6 +109,8 @@ impl v2::HostConnection for crate::InstanceState { key: String, value: Vec, ) -> Result<(), Error> { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; // The `let () =` syntax is needed to suppress a warning when the result type is inferred. // You can read more about the issue here: @@ -114,6 +124,8 @@ impl v2::HostConnection for crate::InstanceState { connection: Resource, key: String, ) -> Result { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; let value = conn.incr(&key, 1).await.map_err(other_error)?; Ok(value) @@ -125,6 +137,8 @@ impl v2::HostConnection for crate::InstanceState { connection: Resource, keys: Vec, ) -> Result { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; let value = conn.del(&keys).await.map_err(other_error)?; Ok(value) @@ -137,6 +151,8 @@ impl v2::HostConnection for crate::InstanceState { key: String, values: Vec, ) -> Result { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; let value = conn.sadd(&key, &values).await.map_err(|e| { if e.kind() == redis::ErrorKind::TypeError { @@ -154,6 +170,8 @@ impl v2::HostConnection for crate::InstanceState { connection: Resource, key: String, ) -> Result, Error> { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; let value = conn.smembers(&key).await.map_err(other_error)?; Ok(value) @@ -166,6 +184,8 @@ impl v2::HostConnection for crate::InstanceState { key: String, values: Vec, ) -> Result { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await.map_err(other_error)?; let value = conn.srem(&key, &values).await.map_err(other_error)?; Ok(value) @@ -178,6 +198,8 @@ impl v2::HostConnection for crate::InstanceState { command: String, arguments: Vec, ) -> Result, Error> { + self.observe_context.reparent_tracing_span(); + let conn = self.get_conn(connection).await?; let mut cmd = redis::cmd(&command); arguments.iter().for_each(|value| match value { diff --git a/crates/factor-outbound-redis/src/lib.rs b/crates/factor-outbound-redis/src/lib.rs index 625978b9a1..ef877e7487 100644 --- a/crates/factor-outbound-redis/src/lib.rs +++ b/crates/factor-outbound-redis/src/lib.rs @@ -1,6 +1,7 @@ mod host; use host::InstanceState; +use spin_factor_observe::ObserveContext; use spin_factor_outbound_networking::OutboundNetworkingFactor; use spin_factors::{ anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder, @@ -46,9 +47,12 @@ impl Factor for OutboundRedisFactor { let allowed_hosts = ctx .instance_builder::()? .allowed_hosts(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; + Ok(InstanceState { allowed_hosts, connections: table::Table::new(1024), + observe_context, }) } } diff --git a/crates/factor-sqlite/Cargo.toml b/crates/factor-sqlite/Cargo.toml index 0a88ef616f..8414c7d3ea 100644 --- a/crates/factor-sqlite/Cargo.toml +++ b/crates/factor-sqlite/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true [dependencies] async-trait = { workspace = true } serde = { workspace = true } +spin-factor-observe = { path = "../factor-observe" } spin-factors = { path = "../factors" } spin-locked-app = { path = "../locked-app" } spin-world = { path = "../world" } diff --git a/crates/factor-sqlite/src/host.rs b/crates/factor-sqlite/src/host.rs index ff4d087f03..e0b104c72b 100644 --- a/crates/factor-sqlite/src/host.rs +++ b/crates/factor-sqlite/src/host.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; +use spin_factor_observe::ObserveContext; use spin_factors::wasmtime::component::Resource; use spin_factors::{anyhow, SelfInstanceBuilder}; use spin_world::v1::sqlite as v1; @@ -18,6 +19,7 @@ pub struct InstanceState { connections: table::Table>, /// A map from database label to connection creators. connection_creators: HashMap>, + observe_context: ObserveContext, } impl InstanceState { @@ -27,11 +29,13 @@ impl InstanceState { pub fn new( allowed_databases: Arc>, connection_creators: HashMap>, + observe_context: ObserveContext, ) -> Self { Self { allowed_databases, connections: table::Table::new(256), connection_creators, + observe_context, } } @@ -64,6 +68,8 @@ impl v2::Host for InstanceState { impl v2::HostConnection for InstanceState { #[instrument(name = "spin_sqlite.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", sqlite.backend = Empty))] async fn open(&mut self, database: String) -> Result, v2::Error> { + self.observe_context.reparent_tracing_span(); + if !self.allowed_databases.contains(&database) { return Err(v2::Error::AccessDenied); } @@ -90,6 +96,8 @@ impl v2::HostConnection for InstanceState { query: String, parameters: Vec, ) -> Result { + self.observe_context.reparent_tracing_span(); + let conn = match self.get_connection(connection) { Ok(c) => c, Err(err) => return Err(err), diff --git a/crates/factor-sqlite/src/lib.rs b/crates/factor-sqlite/src/lib.rs index d7a7618c7a..fc5e756dd4 100644 --- a/crates/factor-sqlite/src/lib.rs +++ b/crates/factor-sqlite/src/lib.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use host::InstanceState; use async_trait::async_trait; +use spin_factor_observe::ObserveContext; use spin_factors::{anyhow, Factor}; use spin_locked_app::MetadataKey; use spin_world::v1::sqlite as v1; @@ -75,7 +76,7 @@ impl Factor for SqliteFactor { fn prepare( &self, - ctx: spin_factors::PrepareContext, + mut ctx: spin_factors::PrepareContext, ) -> spin_factors::anyhow::Result { let allowed_databases = ctx .app_state() @@ -83,9 +84,11 @@ impl Factor for SqliteFactor { .get(ctx.app_component().id()) .cloned() .unwrap_or_default(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; Ok(InstanceState::new( allowed_databases, ctx.app_state().connection_creators.clone(), + observe_context, )) } } diff --git a/crates/factor-variables/Cargo.toml b/crates/factor-variables/Cargo.toml index 6310d2a882..9bc57c4424 100644 --- a/crates/factor-variables/Cargo.toml +++ b/crates/factor-variables/Cargo.toml @@ -11,6 +11,7 @@ azure_security_keyvault = { git = "https://github.com/azure/azure-sdk-for-rust", dotenvy = "0.15" serde = { workspace = true } spin-expressions = { path = "../expressions" } +spin-factor-observe = { path = "../factor-observe" } spin-factors = { path = "../factors" } spin-world = { path = "../world" } tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/crates/factor-variables/src/host.rs b/crates/factor-variables/src/host.rs index bc30fe0a79..257864dd6b 100644 --- a/crates/factor-variables/src/host.rs +++ b/crates/factor-variables/src/host.rs @@ -8,6 +8,7 @@ use crate::InstanceState; impl variables::Host for InstanceState { #[instrument(name = "spin_variables.get", skip(self), err(level = Level::INFO), fields(otel.kind = "client"))] async fn get(&mut self, key: String) -> Result { + self.observe_context.reparent_tracing_span(); let key = spin_expressions::Key::new(&key).map_err(expressions_to_variables_err)?; self.expression_resolver .resolve(&self.component_id, key) diff --git a/crates/factor-variables/src/lib.rs b/crates/factor-variables/src/lib.rs index 7078c99aa4..9e47c1ab52 100644 --- a/crates/factor-variables/src/lib.rs +++ b/crates/factor-variables/src/lib.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use runtime_config::RuntimeConfig; use spin_expressions::{ProviderResolver as ExpressionResolver, Template}; +use spin_factor_observe::ObserveContext; use spin_factors::{ anyhow, ConfigureAppContext, Factor, InitContext, PrepareContext, RuntimeFactors, SelfInstanceBuilder, @@ -62,13 +63,15 @@ impl Factor for VariablesFactor { fn prepare( &self, - ctx: PrepareContext, + mut ctx: PrepareContext, ) -> anyhow::Result { let component_id = ctx.app_component().id().to_string(); let expression_resolver = ctx.app_state().expression_resolver.clone(); + let observe_context = ObserveContext::from_prepare_context(&mut ctx)?; Ok(InstanceState { component_id, expression_resolver, + observe_context, }) } } @@ -90,6 +93,7 @@ impl AppState { pub struct InstanceState { component_id: String, expression_resolver: Arc, + observe_context: ObserveContext, } impl InstanceState { diff --git a/crates/runtime-config/Cargo.toml b/crates/runtime-config/Cargo.toml index 288e75f379..688f2be945 100644 --- a/crates/runtime-config/Cargo.toml +++ b/crates/runtime-config/Cargo.toml @@ -13,6 +13,7 @@ anyhow = { workspace = true } spin-common = { path = "../common" } spin-factor-key-value = { path = "../factor-key-value" } spin-factor-llm = { path = "../factor-llm" } +spin-factor-observe = { path = "../factor-observe" } spin-factor-outbound-http = { path = "../factor-outbound-http" } spin-factor-outbound-mqtt = { path = "../factor-outbound-mqtt" } spin-factor-outbound-mysql = { path = "../factor-outbound-mysql" } diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index 4507506115..8673b6f17d 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -5,6 +5,7 @@ use spin_common::ui::quoted_path; use spin_factor_key_value::runtime_config::spin::{self as key_value}; use spin_factor_key_value::KeyValueFactor; use spin_factor_llm::{spin as llm, LlmFactor}; +use spin_factor_observe::ObserveFactor; use spin_factor_outbound_http::OutboundHttpFactor; use spin_factor_outbound_mqtt::OutboundMqttFactor; use spin_factor_outbound_mysql::OutboundMysqlFactor; @@ -366,6 +367,12 @@ impl FactorRuntimeConfigSource for TomlRuntimeConfigSource<'_, '_> } } +impl FactorRuntimeConfigSource for TomlRuntimeConfigSource<'_, '_> { + fn get_runtime_config(&mut self) -> anyhow::Result> { + Ok(None) + } +} + impl RuntimeConfigSourceFinalizer for TomlRuntimeConfigSource<'_, '_> { fn finalize(&mut self) -> anyhow::Result<()> { Ok(self.toml.validate_all_keys_used()?) diff --git a/crates/runtime-factors/Cargo.toml b/crates/runtime-factors/Cargo.toml index 7bea71664a..3cef50a1d6 100644 --- a/crates/runtime-factors/Cargo.toml +++ b/crates/runtime-factors/Cargo.toml @@ -19,6 +19,7 @@ clap = { version = "3.1.18", features = ["derive", "env"] } spin-common = { path = "../common" } spin-factor-key-value = { path = "../factor-key-value" } spin-factor-llm = { path = "../factor-llm" } +spin-factor-observe = { path = "../factor-observe" } spin-factor-outbound-http = { path = "../factor-outbound-http" } spin-factor-outbound-mqtt = { path = "../factor-outbound-mqtt" } spin-factor-outbound-mysql = { path = "../factor-outbound-mysql" } diff --git a/crates/runtime-factors/src/lib.rs b/crates/runtime-factors/src/lib.rs index c091e58162..06b710fc53 100644 --- a/crates/runtime-factors/src/lib.rs +++ b/crates/runtime-factors/src/lib.rs @@ -8,6 +8,7 @@ use anyhow::Context as _; use spin_common::arg_parser::parse_kv; use spin_factor_key_value::KeyValueFactor; use spin_factor_llm::LlmFactor; +use spin_factor_observe::ObserveFactor; use spin_factor_outbound_http::OutboundHttpFactor; use spin_factor_outbound_mqtt::{NetworkedMqttClient, OutboundMqttFactor}; use spin_factor_outbound_mysql::OutboundMysqlFactor; @@ -22,6 +23,7 @@ use spin_runtime_config::{ResolvedRuntimeConfig, TomlRuntimeConfigSource}; #[derive(RuntimeFactors)] pub struct TriggerFactors { + pub observe: ObserveFactor, pub wasi: WasiFactor, pub variables: VariablesFactor, pub key_value: KeyValueFactor, @@ -42,6 +44,7 @@ impl TriggerFactors { allow_transient_writes: bool, ) -> anyhow::Result { Ok(Self { + observe: ObserveFactor::new(), wasi: wasi_factor(working_dir, allow_transient_writes), variables: VariablesFactor::default(), key_value: KeyValueFactor::new(), diff --git a/crates/telemetry/src/metrics.rs b/crates/telemetry/src/metrics.rs index 200a786a27..58e62826f5 100644 --- a/crates/telemetry/src/metrics.rs +++ b/crates/telemetry/src/metrics.rs @@ -1,6 +1,7 @@ use std::time::Duration; use anyhow::{bail, Result}; +use opentelemetry::global; use opentelemetry_otlp::MetricsExporterBuilder; use opentelemetry_sdk::{ metrics::{ @@ -57,6 +58,8 @@ pub(crate) fn otel_metrics_layer LookupSpan<'span>>( .with_resource(resource) .build(); + global::set_meter_provider(meter_provider.clone()); + Ok(MetricsLayer::new(meter_provider)) } diff --git a/crates/telemetry/src/traces.rs b/crates/telemetry/src/traces.rs index 926d355c3f..9ec38f2b40 100644 --- a/crates/telemetry/src/traces.rs +++ b/crates/telemetry/src/traces.rs @@ -1,7 +1,7 @@ use std::time::Duration; use anyhow::bail; -use opentelemetry::trace::TracerProvider; +use opentelemetry::{global, trace::TracerProvider}; use opentelemetry_otlp::SpanExporterBuilder; use opentelemetry_sdk::{ resource::{EnvResourceDetector, TelemetryResourceDetector}, @@ -38,7 +38,7 @@ pub(crate) fn otel_tracing_layer LookupSpan<'span>>( // currently default to using the HTTP exporter but in the future we could select off of the // combination of OTEL_EXPORTER_OTLP_PROTOCOL and OTEL_EXPORTER_OTLP_TRACES_PROTOCOL to // determine whether we should use http/protobuf or grpc. - let exporter: SpanExporterBuilder = match OtlpProtocol::traces_protocol_from_env() { + let exporter_builder: SpanExporterBuilder = match OtlpProtocol::traces_protocol_from_env() { OtlpProtocol::Grpc => opentelemetry_otlp::new_exporter().tonic().into(), OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter().http().into(), OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"), @@ -46,10 +46,12 @@ pub(crate) fn otel_tracing_layer LookupSpan<'span>>( let tracer_provider = opentelemetry_otlp::new_pipeline() .tracing() - .with_exporter(exporter) + .with_exporter(exporter_builder) .with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(resource)) .install_batch(opentelemetry_sdk::runtime::Tokio)?; + global::set_tracer_provider(tracer_provider.clone()); + let env_filter = match EnvFilter::try_from_env("SPIN_OTEL_TRACING_LEVEL") { Ok(filter) => filter, // If it isn't set or it fails to parse default to info diff --git a/crates/trigger-http/Cargo.toml b/crates/trigger-http/Cargo.toml index a7f95eec29..cb982c9c8c 100644 --- a/crates/trigger-http/Cargo.toml +++ b/crates/trigger-http/Cargo.toml @@ -34,6 +34,7 @@ spin-http = { path = "../http" } spin-telemetry = { path = "../telemetry" } spin-trigger = { path = "../trigger" } spin-world = { path = "../world" } +spin-factor-observe = { path = "../factor-observe" } terminal = { path = "../terminal" } tokio = { workspace = true, features = ["full"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "tls12"] } diff --git a/crates/trigger-http/src/wasi.rs b/crates/trigger-http/src/wasi.rs index 2724f21da8..24a81b985e 100644 --- a/crates/trigger-http/src/wasi.rs +++ b/crates/trigger-http/src/wasi.rs @@ -100,7 +100,6 @@ impl HttpExecutor for WasiHttpExecutor { HandlerType::Wagi => unreachable!("should have used WagiExecutor instead"), }; - let span = tracing::debug_span!("execute_wasi"); let handle = task::spawn( async move { let result = match handler { @@ -108,20 +107,13 @@ impl HttpExecutor for WasiHttpExecutor { handler .wasi_http_incoming_handler() .call_handle(&mut store, request, response) - .instrument(span) .await } Handler::Handler2023_10_18(handler) => { - handler - .call_handle(&mut store, request, response) - .instrument(span) - .await + handler.call_handle(&mut store, request, response).await } Handler::Handler2023_11_10(handler) => { - handler - .call_handle(&mut store, request, response) - .instrument(span) - .await + handler.call_handle(&mut store, request, response).await } }; diff --git a/crates/world/Cargo.toml b/crates/world/Cargo.toml index fd4763100d..77f26a7a05 100644 --- a/crates/world/Cargo.toml +++ b/crates/world/Cargo.toml @@ -5,5 +5,7 @@ authors = { workspace = true } edition = { workspace = true } [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } +opentelemetry = { workspace = true } wasmtime = { workspace = true } diff --git a/crates/world/src/conversions.rs b/crates/world/src/conversions.rs index 29b21f408a..c22c4f7964 100644 --- a/crates/world/src/conversions.rs +++ b/crates/world/src/conversions.rs @@ -218,3 +218,179 @@ mod llm { } } } + +mod observe { + use super::*; + use opentelemetry::StringValue; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + use wasi::observe::tracer; + + impl From for opentelemetry::Value { + fn from(value: tracer::Value) -> Self { + match value { + tracer::Value::String(v) => v.into(), + tracer::Value::Bool(v) => v.into(), + tracer::Value::Float64(v) => v.into(), + tracer::Value::S64(v) => v.into(), + tracer::Value::StringArray(v) => opentelemetry::Value::Array( + v.into_iter() + .map(StringValue::from) + .collect::>() + .into(), + ), + tracer::Value::BoolArray(v) => opentelemetry::Value::Array(v.into()), + tracer::Value::Float64Array(v) => opentelemetry::Value::Array(v.into()), + tracer::Value::S64Array(v) => opentelemetry::Value::Array(v.into()), + } + } + } + + impl From for opentelemetry::KeyValue { + fn from(kv: tracer::KeyValue) -> Self { + opentelemetry::KeyValue::new(kv.key, kv.value) + } + } + + impl From for opentelemetry::trace::TraceFlags { + fn from(flags: tracer::TraceFlags) -> Self { + Self::new(flags.as_array()[0] as u8) + } + } + + impl From for tracer::TraceFlags { + fn from(flags: opentelemetry::trace::TraceFlags) -> Self { + if flags.is_sampled() { + tracer::TraceFlags::SAMPLED + } else { + tracer::TraceFlags::empty() + } + } + } + + impl From for opentelemetry::trace::SpanContext { + fn from(sc: tracer::SpanContext) -> Self { + // TODO(Reviewer): Should this be try_from instead an propagate this error out of the WIT? + let trace_id = opentelemetry::trace::TraceId::from_hex(&sc.trace_id) + .unwrap_or(opentelemetry::trace::TraceId::INVALID); + let span_id = opentelemetry::trace::SpanId::from_hex(&sc.span_id) + .unwrap_or(opentelemetry::trace::SpanId::INVALID); + let trace_state = opentelemetry::trace::TraceState::from_key_value(sc.trace_state) + .unwrap_or_else(|_| opentelemetry::trace::TraceState::default()); + Self::new( + trace_id, + span_id, + sc.trace_flags.into(), + sc.is_remote, + trace_state, + ) + } + } + + impl From for tracer::SpanContext { + fn from(sc: opentelemetry::trace::SpanContext) -> Self { + Self { + trace_id: format!("{:x}", sc.trace_id()), + span_id: format!("{:x}", sc.span_id()), + trace_flags: sc.trace_flags().into(), + is_remote: sc.is_remote(), + trace_state: sc + .trace_state() + .header() + .split(',') + // TODO(Reviewer): Should this be try_from instead an propagate this error out of the WIT? + .filter_map(|s| { + if let Some((key, value)) = s.split_once('=') { + Some((key.to_string(), value.to_string())) + } else { + None + } + }) + .collect(), + } + } + } + + impl From for opentelemetry::trace::Status { + fn from(status: tracer::Status) -> Self { + match status { + tracer::Status::Unset => Self::Unset, + tracer::Status::Ok => Self::Ok, + tracer::Status::Error(s) => Self::Error { + description: s.into(), + }, + } + } + } + + impl From for opentelemetry::trace::SpanKind { + fn from(kind: tracer::SpanKind) -> Self { + match kind { + tracer::SpanKind::Client => opentelemetry::trace::SpanKind::Client, + tracer::SpanKind::Server => opentelemetry::trace::SpanKind::Server, + tracer::SpanKind::Producer => opentelemetry::trace::SpanKind::Producer, + tracer::SpanKind::Consumer => opentelemetry::trace::SpanKind::Consumer, + tracer::SpanKind::Internal => opentelemetry::trace::SpanKind::Internal, + } + } + } + + impl From for opentelemetry::trace::Link { + fn from(link: tracer::Link) -> Self { + Self::new( + link.span_context.into(), + link.attributes.into_iter().map(Into::into).collect(), + 0, + ) + } + } + + impl From for SystemTime { + fn from(timestamp: tracer::Datetime) -> Self { + UNIX_EPOCH + + Duration::from_secs(timestamp.seconds) + + Duration::from_nanos(timestamp.nanoseconds as u64) + } + } + + #[allow(clippy::derivable_impls)] + impl Default for tracer::StartOptions { + fn default() -> Self { + Self { + new_root: false, + span_kind: None, + attributes: None, + links: None, + timestamp: None, + } + } + } + + mod test { + #[test] + fn trace_flags() { + let flags = opentelemetry::trace::TraceFlags::SAMPLED; + let flags2 = crate::wasi::observe::tracer::TraceFlags::from(flags); + let flags3 = opentelemetry::trace::TraceFlags::from(flags2); + assert_eq!(flags, flags3); + } + + #[test] + fn span_context() { + let sc = opentelemetry::trace::SpanContext::new( + opentelemetry::trace::TraceId::from_hex("4fb34cb4484029f7881399b149e41e98") + .unwrap(), + opentelemetry::trace::SpanId::from_hex("9ffd58d3cd4dd90b").unwrap(), + opentelemetry::trace::TraceFlags::SAMPLED, + false, + opentelemetry::trace::TraceState::from_key_value(vec![ + ("foo", "bar"), + ("baz", "qux"), + ]) + .unwrap(), + ); + let sc2 = crate::wasi::observe::tracer::SpanContext::from(sc.clone()); + let sc3 = opentelemetry::trace::SpanContext::from(sc2); + assert_eq!(sc, sc3); + } + } +} diff --git a/tests/integration.rs b/tests/integration.rs index 4b14a1e8a9..0a293b738f 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,6 +1,7 @@ -mod testcases; +pub mod testcases; mod integration_tests { + use anyhow::Context; use sha2::Digest; use std::collections::HashMap; use test_environment::{ @@ -9,13 +10,12 @@ mod integration_tests { }; use testing_framework::runtimes::{spin_cli::SpinConfig, SpinAppType}; - use super::testcases::{ + pub use super::testcases::{ assert_spin_request, bootstap_env, http_smoke_test_template, run_test, spin_binary, }; - use anyhow::Context; - /// Helper macro to assert that a condition is true eventually #[cfg(feature = "extern-dependencies-tests")] + /// Helper macro to assert that a condition is true eventually macro_rules! assert_eventually { ($e:expr, $t:expr) => { let mut i = 0; @@ -153,72 +153,6 @@ mod integration_tests { Ok(()) } - #[test] - #[cfg(feature = "extern-dependencies-tests")] - /// Test that basic otel tracing works - fn otel_smoke_test() -> anyhow::Result<()> { - use anyhow::Context; - - use crate::testcases::run_test_inited; - run_test_inited( - "otel-smoke-test", - SpinConfig { - binary_path: spin_binary(), - spin_up_args: Vec::new(), - app_type: SpinAppType::Http, - }, - ServicesConfig::new(vec!["jaeger"])?, - |env| { - let otel_port = env - .services_mut() - .get_port(4318)? - .context("expected a port for Jaeger")?; - env.set_env_var( - "OTEL_EXPORTER_OTLP_ENDPOINT", - format!("http://localhost:{}", otel_port), - ); - Ok(()) - }, - move |env| { - let spin = env.runtime_mut(); - assert_spin_request( - spin, - Request::new(Method::Get, "/hello"), - Response::new_with_body(200, "Hello, Fermyon!\n"), - )?; - - assert_eventually!( - { - let jaeger_port = env - .services_mut() - .get_port(16686)? - .context("no jaeger port was exposed by test services")?; - let url = format!("http://localhost:{jaeger_port}/api/traces?service=spin"); - match reqwest::blocking::get(&url).context("failed to get jaeger traces")? { - resp if resp.status().is_success() => { - let traces: serde_json::Value = - resp.json().context("failed to parse jaeger traces")?; - let traces = - traces.get("data").context("jaeger traces has no data")?; - let traces = - traces.as_array().context("jaeger traces is not an array")?; - !traces.is_empty() - } - _resp => { - eprintln!("failed to get jaeger traces:"); - false - } - } - }, - 20 - ); - Ok(()) - }, - )?; - - Ok(()) - } - #[test] /// Test dynamic environment variables fn dynamic_env_test() -> anyhow::Result<()> { @@ -1458,3 +1392,573 @@ route = "/..." Ok(()) } } + +// TODO(Reviewer): How can I move this to a new file? I wasn't able to get the imports to work out. +mod otel_integration_tests { + use fake_opentelemetry_collector::FakeCollectorServer; + use std::time::Duration; + use test_environment::{ + http::{Method, Request, Response}, + services::ServicesConfig, + }; + use testing_framework::runtimes::{spin_cli::SpinConfig, SpinAppType}; + + use crate::testcases::run_test_inited; + + use super::testcases::{assert_spin_request, spin_binary}; + + #[test] + // Test that basic otel tracing and inbound/outbound context propagation works + fn otel_smoke_test() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "otel-smoke-test", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request(spin, Request::new(Method::Get, "/one"), Response::new(200))?; + + let spans = rt.block_on(collector.exported_spans(5, Duration::from_secs(5))); + + assert_eq!(spans.len(), 5); + + // They're all part of the same trace which implies context propagation is working + assert!(spans + .iter() + .map(|s| s.trace_id.clone()) + .all(|t| t == spans[0].trace_id)); + + Ok(()) + }, + )?; + + Ok(()) + } + + #[test] + fn wasi_observe_nested_spans() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/nested-spans"), + Response::new(200), + )?; + + let spans = rt.block_on(collector.exported_spans(4, Duration::from_secs(5))); + + assert_eq!(spans.len(), 4); + + let handle_request_span = spans + .iter() + .find(|s| s.name == "GET /...") + .expect("'GET /...' span should exist"); + let exec_component_span = spans + .iter() + .find(|s| s.name == "execute_wasm_component wasi-observe-tracing") + .expect("'execute_wasm_component wasi-observe-tracing' span should exist"); + let outer_span = spans + .iter() + .find(|s| s.name == "outer_func") + .expect("'outer_func' span should exist"); + let inner_span = spans + .iter() + .find(|s| s.name == "inner_func") + .expect("'inner_func' span should exist"); + + assert!( + handle_request_span.trace_id == exec_component_span.trace_id + && exec_component_span.trace_id == outer_span.trace_id + && outer_span.trace_id == inner_span.trace_id + ); + assert_eq!( + exec_component_span.parent_span_id, + handle_request_span.span_id + ); + assert_eq!(outer_span.parent_span_id, exec_component_span.span_id); + assert_eq!(inner_span.parent_span_id, outer_span.span_id); + + Ok(()) + }, + )?; + + Ok(()) + } + + #[test] + fn wasi_observe_drop_semantics() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/drop-semantics"), + Response::new(200), + )?; + + let spans = rt.block_on(collector.exported_spans(3, Duration::from_secs(5))); + + assert_eq!(spans.len(), 3); + + let handle_request_span = spans + .iter() + .find(|s| s.name == "GET /...") + .expect("'GET /...' span should exist"); + let exec_component_span = spans + .iter() + .find(|s| s.name == "execute_wasm_component wasi-observe-tracing") + .expect("'execute_wasm_component wasi-observe-tracing' span should exist"); + let drop_span = spans + .iter() + .find(|s| s.name == "drop_semantics") + .expect("'drop_semantics' span should exist"); + + assert!( + handle_request_span.trace_id == exec_component_span.trace_id + && exec_component_span.trace_id == drop_span.trace_id + ); + assert_eq!( + exec_component_span.parent_span_id, + handle_request_span.span_id + ); + assert_eq!(drop_span.parent_span_id, exec_component_span.span_id); + assert!(drop_span.end_time_unix_nano < exec_component_span.end_time_unix_nano); + + Ok(()) + }, + )?; + + Ok(()) + } + + #[test] + fn wasi_observe_setting_attributes() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/setting-attributes"), + Response::new(200), + )?; + + let spans = rt.block_on(collector.exported_spans(3, Duration::from_secs(5))); + + assert_eq!(spans.len(), 3); + + let attr_span = spans + .iter() + .find(|s| s.name == "setting_attributes") + .expect("'setting_attributes' span should exist"); + + // There are some other attributes already set on the span + assert_eq!(attr_span.attributes.len(), 2); + + assert_eq!( + attr_span + .attributes + .get("foo") + .expect("'foo' attribute should exist"), + "Some(AnyValue { value: Some(StringValue(\"baz\")) })" + ); + assert_eq!( + attr_span.attributes.get("qux").expect("'qux' attribute should exist"), + "Some(AnyValue { value: Some(ArrayValue(ArrayValue { values: [AnyValue { value: Some(StringValue(\"qaz\")) }, AnyValue { value: Some(StringValue(\"thud\")) }] })) })" + ); + + Ok(()) + }, + )?; + + Ok(()) + } + + #[test] + fn wasi_observe_host_guest_host() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/host-guest-host"), + Response::new(200), + )?; + + let spans = rt.block_on(collector.exported_spans(4, Duration::from_secs(5))); + + assert_eq!(spans.len(), 4); + + assert!(spans + .iter() + .map(|s| s.trace_id.clone()) + .all(|t| t == spans[0].trace_id)); + + let exec_component_span = spans + .iter() + .find(|s| s.name == "execute_wasm_component wasi-observe-tracing") + .expect("'execute_wasm_component wasi-observe-tracing' span should exist"); + let guest_span = spans + .iter() + .find(|s| s.name == "guest") + .expect("'guest' span should exist"); + let get_span = spans + .iter() + .find(|s| s.name == "GET") + .expect("'GET' span should exist"); + + assert_eq!(guest_span.parent_span_id, exec_component_span.span_id); + assert_eq!(get_span.parent_span_id, guest_span.span_id); + + Ok(()) + }, + )?; + + Ok(()) + } + + #[test] + fn wasi_observe_events() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/events"), + Response::new(200), + )?; + + let spans = rt.block_on(collector.exported_spans(3, Duration::from_secs(5))); + + assert_eq!(spans.len(), 3); + + let event_span = spans + .iter() + .find(|s| s.name == "events") + .expect("'events' span should exist"); + + let events = event_span.events.clone(); + assert_eq!(events.len(), 3); + + let basic_event = events + .iter() + .find(|e| e.name == "basic-event") + .expect("'basic' event should exist"); + let event_with_attributes = events + .iter() + .find(|e| e.name == "event-with-attributes") + .expect("'event_with_attributes' event should exist"); + let event_with_timestamp = events + .iter() + .find(|e| e.name == "event-with-timestamp") + .expect("'event_with_timestamp' event should exist"); + + assert!(basic_event.time_unix_nano < event_with_attributes.time_unix_nano); + assert_eq!(event_with_attributes.attributes.len(), 1); + assert!(event_with_attributes.time_unix_nano < event_with_timestamp.time_unix_nano); + + Ok(()) + }, + )?; + + Ok(()) + } + + #[test] + fn wasi_observe_links() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/links"), + Response::new(200), + )?; + + let spans = rt.block_on(collector.exported_spans(4, Duration::from_secs(5))); + + assert_eq!(spans.len(), 4); + + let first_span = spans + .iter() + .find(|s| s.name == "first") + .expect("'first' span should exist"); + let second_span = spans + .iter() + .find(|s| s.name == "second") + .expect("'second' span should exist"); + + assert_eq!(first_span.links.len(), 0); + assert_eq!(second_span.links.len(), 1); + assert_eq!( + second_span.links.first().unwrap().span_id, + first_span.span_id + ); + assert_eq!(second_span.links.first().unwrap().attributes.len(), 1); + + Ok(()) + }, + )?; + + Ok(()) + } + + #[test] + fn wasi_observe_child_outlives_parent() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/child-outlives-parent"), + Response::new(200), + )?; + + let spans = rt.block_on(collector.exported_spans(5, Duration::from_secs(5))); + + assert_eq!(spans.len(), 5); + + let parent_span = spans + .iter() + .find(|s| s.name == "parent") + .expect("'parent' span should exist"); + let child_span = spans + .iter() + .find(|s| s.name == "child") + .expect("'child' span should exist"); + let get_span = spans + .iter() + .find(|s| s.name == "GET") + .expect("'GET' span should exist"); + assert_eq!(child_span.parent_span_id, parent_span.span_id); + assert_eq!(get_span.parent_span_id, child_span.span_id); + assert!(child_span.end_time_unix_nano > parent_span.end_time_unix_nano); + + Ok(()) + }, + )?; + + Ok(()) + } + + #[test] + fn wasi_observe_root_span() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + let mut collector = rt + .block_on(FakeCollectorServer::start()) + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/root-span"), + Response::new(200), + )?; + + let spans = rt.block_on(collector.exported_spans(7, Duration::from_secs(5))); + + assert_eq!(spans.len(), 7); + + let parent_span = spans + .iter() + .find(|s| s.name == "parent") + .expect("'parent' span should exist"); + let request_one = spans + .iter() + .find(|s| s.name == "GET") + .expect("first 'GET' span should exist"); + let root_span = spans + .iter() + .find(|s| s.name == "root") + .expect("'root' span should exist"); + let request_two = spans + .iter() + .filter(|s| s.name == "GET") + .nth(1) + .expect("second 'GET' span should exist"); + let request_three = spans + .iter() + .filter(|s| s.name == "GET") + .nth(2) + .expect("third 'GET' span should exist"); + + assert_eq!(parent_span.trace_id, request_one.trace_id); + assert_ne!(root_span.trace_id, parent_span.trace_id); + assert_eq!(root_span.trace_id, request_two.trace_id); + assert_eq!(parent_span.trace_id, request_three.trace_id); + assert_eq!(root_span.parent_span_id, "".to_string()); + assert_eq!(request_two.parent_span_id, root_span.span_id); + assert_ne!(request_three.parent_span_id, parent_span.span_id); + + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/tests/test-components/components/Cargo.lock b/tests/test-components/components/Cargo.lock index c8a2224121..737014ab96 100644 --- a/tests/test-components/components/Cargo.lock +++ b/tests/test-components/components/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "ai" version = "0.1.0" @@ -214,6 +226,9 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", +] [[package]] name = "head-rust-sdk-redis" @@ -242,6 +257,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hello-world" version = "0.1.0" @@ -484,6 +505,15 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "otel-smoke-test" +version = "0.1.0" +dependencies = [ + "anyhow", + "http 0.2.11", + "spin-sdk 2.2.0", +] + [[package]] name = "outbound-http-component" version = "0.1.0" @@ -543,6 +573,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "prettyplease" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" +dependencies = [ + "proc-macro2", + "syn 2.0.74", +] + [[package]] name = "proc-macro2" version = "1.0.86" @@ -938,6 +978,16 @@ dependencies = [ "wit-bindgen 0.16.0", ] +[[package]] +name = "wasi-observe-tracing" +version = "0.1.0" +dependencies = [ + "anyhow", + "http 0.2.11", + "spin-sdk 2.2.0", + "wit-bindgen 0.30.0", +] + [[package]] name = "wasm-encoder" version = "0.36.2" @@ -965,6 +1015,16 @@ dependencies = [ "leb128", ] +[[package]] +name = "wasm-encoder" +version = "0.215.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb56df3e06b8e6b77e37d2969a50ba51281029a9aeb3855e76b7f49b6418847" +dependencies = [ + "leb128", + "wasmparser 0.215.0", +] + [[package]] name = "wasm-metadata" version = "0.10.20" @@ -981,6 +1041,22 @@ dependencies = [ "wasmparser 0.121.2", ] +[[package]] +name = "wasm-metadata" +version = "0.215.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6bb07c5576b608f7a2a9baa2294c1a3584a249965d695a9814a496cb6d232f" +dependencies = [ + "anyhow", + "indexmap", + "serde", + "serde_derive", + "serde_json", + "spdx", + "wasm-encoder 0.215.0", + "wasmparser 0.215.0", +] + [[package]] name = "wasmparser" version = "0.116.1" @@ -1012,6 +1088,19 @@ dependencies = [ "semver", ] +[[package]] +name = "wasmparser" +version = "0.215.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53fbde0881f24199b81cf49b6ff8f9c145ac8eb1b7fc439adb5c099734f7d90e" +dependencies = [ + "ahash", + "bitflags", + "hashbrown", + "indexmap", + "semver", +] + [[package]] name = "wit-bindgen" version = "0.13.1" @@ -1032,6 +1121,16 @@ dependencies = [ "wit-bindgen-rust-macro 0.16.0", ] +[[package]] +name = "wit-bindgen" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4bac478334a647374ff24a74b66737a4cb586dc8288bc3080a93252cd1105c" +dependencies = [ + "wit-bindgen-rt", + "wit-bindgen-rust-macro 0.30.0", +] + [[package]] name = "wit-bindgen-core" version = "0.13.1" @@ -1054,6 +1153,26 @@ dependencies = [ "wit-parser 0.13.0", ] +[[package]] +name = "wit-bindgen-core" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb7e3df01cd43cfa1cb52602e4fc05cb2b62217655f6705639b6953eb0a3fed2" +dependencies = [ + "anyhow", + "heck 0.5.0", + "wit-parser 0.215.0", +] + +[[package]] +name = "wit-bindgen-rt" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2de7a3b06b9725d129b5cbd1beca968feed919c433305a23da46843185ecdd6" +dependencies = [ + "bitflags", +] + [[package]] name = "wit-bindgen-rust" version = "0.13.2" @@ -1061,8 +1180,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7200e565124801e01b7b5ddafc559e1da1b2e1bed5364d669cd1d96fb88722" dependencies = [ "anyhow", - "heck", - "wasm-metadata", + "heck 0.4.1", + "wasm-metadata 0.10.20", "wit-bindgen-core 0.13.1", "wit-component 0.17.0", ] @@ -1074,12 +1193,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a01ff9cae7bf5736750d94d91eb8a49f5e3a04aff1d1a3218287d9b2964510f8" dependencies = [ "anyhow", - "heck", - "wasm-metadata", + "heck 0.4.1", + "wasm-metadata 0.10.20", "wit-bindgen-core 0.16.0", "wit-component 0.18.2", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a767d1a8eb4e908bfc53febc48b87ada545703b16fe0148ee7736a29a01417" +dependencies = [ + "anyhow", + "heck 0.5.0", + "indexmap", + "prettyplease", + "syn 2.0.74", + "wasm-metadata 0.215.0", + "wit-bindgen-core 0.30.0", + "wit-component 0.215.0", +] + [[package]] name = "wit-bindgen-rust-macro" version = "0.13.1" @@ -1110,6 +1245,21 @@ dependencies = [ "wit-component 0.18.2", ] +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b185c342d0d27bd83d4080f5a66cf3b4f247fa49d679bceb66e11cc7eb58b99" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.74", + "wit-bindgen-core 0.30.0", + "wit-bindgen-rust 0.30.0", +] + [[package]] name = "wit-component" version = "0.17.0" @@ -1124,7 +1274,7 @@ dependencies = [ "serde_derive", "serde_json", "wasm-encoder 0.36.2", - "wasm-metadata", + "wasm-metadata 0.10.20", "wasmparser 0.116.1", "wit-parser 0.12.2", ] @@ -1143,11 +1293,30 @@ dependencies = [ "serde_derive", "serde_json", "wasm-encoder 0.38.1", - "wasm-metadata", + "wasm-metadata 0.10.20", "wasmparser 0.118.1", "wit-parser 0.13.0", ] +[[package]] +name = "wit-component" +version = "0.215.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f725e3885fc5890648be5c5cbc1353b755dc932aa5f1aa7de968b912a3280743" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder 0.215.0", + "wasm-metadata 0.215.0", + "wasmparser 0.215.0", + "wit-parser 0.215.0", +] + [[package]] name = "wit-parser" version = "0.12.2" @@ -1181,3 +1350,41 @@ dependencies = [ "serde_json", "unicode-xid", ] + +[[package]] +name = "wit-parser" +version = "0.215.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "935a97eaffd57c3b413aa510f8f0b550a4a9fe7d59e79cd8b89a83dcb860321f" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser 0.215.0", +] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] diff --git a/tests/test-components/components/otel-smoke-test/Cargo.toml b/tests/test-components/components/otel-smoke-test/Cargo.toml new file mode 100644 index 0000000000..521468b790 --- /dev/null +++ b/tests/test-components/components/otel-smoke-test/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "otel-smoke-test" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1" +http = "0.2" +spin-sdk = "2.2.0" diff --git a/tests/test-components/components/otel-smoke-test/src/lib.rs b/tests/test-components/components/otel-smoke-test/src/lib.rs new file mode 100644 index 0000000000..85c4326102 --- /dev/null +++ b/tests/test-components/components/otel-smoke-test/src/lib.rs @@ -0,0 +1,22 @@ +use spin_sdk::{ + http::{Method, Params, Request, Response, Router}, + http_component, +}; + +#[http_component] +fn handle(req: http::Request<()>) -> Response { + let mut router = Router::new(); + router.get_async("/one", one); + router.get_async("/two", two); + router.handle(req) +} + +async fn one(_req: Request, _params: Params) -> Response { + let req = Request::builder().method(Method::Get).uri("/two").build(); + let _res: Response = spin_sdk::http::send(req).await.unwrap(); + Response::new(200, "") +} + +async fn two(_req: Request, _params: Params) -> Response { + Response::new(201, "") +} diff --git a/tests/test-components/components/wasi-observe-tracing/Cargo.toml b/tests/test-components/components/wasi-observe-tracing/Cargo.toml new file mode 100644 index 0000000000..3b4eb0c449 --- /dev/null +++ b/tests/test-components/components/wasi-observe-tracing/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "wasi-observe-tracing" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1" +http = "0.2" +spin-sdk = "2.2.0" +wit-bindgen = "0.30.0" diff --git a/tests/test-components/components/wasi-observe-tracing/src/lib.rs b/tests/test-components/components/wasi-observe-tracing/src/lib.rs new file mode 100644 index 0000000000..89ee43d2be --- /dev/null +++ b/tests/test-components/components/wasi-observe-tracing/src/lib.rs @@ -0,0 +1,147 @@ +wit_bindgen::generate!({ + path: "../../../../wit", + world: "wasi:observe/imports@0.2.0-draft", + generate_all, +}); + +use spin_sdk::{ + http::{Method, Params, Request, Response, Router}, + http_component, +}; +use wasi::{ + clocks0_2_0::wall_clock::now, + observe::tracer::{self, KeyValue, Link, StartOptions, Value}, +}; + +#[http_component] +fn handle(req: http::Request<()>) -> Response { + let mut router = Router::new(); + router.get("/nested-spans", nested_spans); + router.get("/drop-semantics", drop_semantics); + router.get("/setting-attributes", setting_attributes); + router.get_async("/host-guest-host", host_guest_host); + router.get("/events", events); + router.get_async("/child-outlives-parent", child_outlives_parent); + router.get("/links", links); + router.get_async("/root-span", root_span); + router.handle(req) +} + +fn nested_spans(_req: Request, _params: Params) -> Response { + let span = tracer::start("outer_func", None); + inner_func(); + span.end(None); + Response::new(200, "") +} + +fn inner_func() { + let span = tracer::start("inner_func", None); + span.end(None); +} + +fn drop_semantics(_req: Request, _params: Params) -> Response { + let _span = tracer::start("drop_semantics", None); + Response::new(200, "") + // _span will drop here and should be ended +} + +fn setting_attributes(_req: Request, _params: Params) -> Response { + let span = tracer::start("setting_attributes", None); + span.set_attributes(&[KeyValue { + key: "foo".to_string(), + value: Value::String("bar".to_string()), + }]); + span.set_attributes(&[ + KeyValue { + key: "foo".to_string(), + value: Value::String("baz".to_string()), + }, + KeyValue { + key: "qux".to_string(), + value: Value::StringArray(vec!["qaz".to_string(), "thud".to_string()]), + }, + ]); + span.end(None); + Response::new(200, "") +} + +async fn host_guest_host(_req: Request, _params: Params) -> Response { + let span = tracer::start("guest", None); + + make_request().await; + span.end(None); + + Response::new(200, "") +} + +fn events(_req: Request, _params: Params) -> Response { + let span = tracer::start("events", None); + span.add_event("basic-event", None, None); + span.add_event( + "event-with-attributes", + None, + Some(&[KeyValue { + key: "foo".to_string(), + value: Value::Bool(true), + }]), + ); + let mut now_plus = now(); + now_plus.seconds += 1; + span.add_event("event-with-timestamp", Some(now_plus), None); + span.end(None); + Response::new(200, "") +} + +async fn child_outlives_parent(_req: Request, _params: Params) -> Response { + let span = tracer::start("parent", None); + let span2 = tracer::start("child", None); + span.end(None); + // Make a host call to test span reparenting when we're messing with the active span stack + make_request().await; + + span2.end(None); + Response::new(200, "") +} + +fn links(_req: Request, _params: Params) -> Response { + let first = tracer::start("first", None); + first.end(None); + let second = tracer::start("second", None); + second.add_link(&Link { + span_context: first.span_context(), + attributes: vec![KeyValue { + key: "foo".to_string(), + value: Value::String("bar".to_string()), + }], + }); + second.end(None); + Response::new(200, "") +} + +async fn root_span(_req: Request, _params: Params) -> Response { + let span1 = tracer::start("parent", None); + make_request().await; + let root = tracer::start( + "root", + Some(&StartOptions { + new_root: true, + span_kind: None, + attributes: None, + links: None, + timestamp: None, + }), + ); + make_request().await; + root.end(None); + span1.end(None); + make_request().await; + Response::new(200, "") +} + +async fn make_request() { + let req = Request::builder() + .method(Method::Get) + .uri("https://asdf.com") + .build(); + let _res: Response = spin_sdk::http::send(req).await.unwrap(); +} diff --git a/tests/testcases/otel-smoke-test/spin.toml b/tests/testcases/otel-smoke-test/spin.toml index a4eb09f671..909149421f 100644 --- a/tests/testcases/otel-smoke-test/spin.toml +++ b/tests/testcases/otel-smoke-test/spin.toml @@ -1,12 +1,13 @@ spin_version = "1" authors = ["Fermyon Engineering "] -description = "A simple application that returns hello and goodbye." -name = "head-rust-sdk-http" +description = "A simple application that tests otel." +name = "otel-smoke-test" trigger = { type = "http" } version = "1.0.0" [[component]] -id = "hello" -source = "%{source=hello-world}" +id = "otel" +source = "%{source=otel-smoke-test}" +allowed_outbound_hosts = ["http://self"] [component.trigger] -route = "/hello/..." +route = "/..." diff --git a/tests/testcases/wasi-observe-tracing/spin.toml b/tests/testcases/wasi-observe-tracing/spin.toml new file mode 100644 index 0000000000..cc4698b5f9 --- /dev/null +++ b/tests/testcases/wasi-observe-tracing/spin.toml @@ -0,0 +1,19 @@ +spin_manifest_version = 2 + +[application] +authors = ["Fermyon Engineering "] +description = "An application to exercise wasi-observe tracing functionality." +name = "wasi-observe-tracing" +version = "1.0.0" + +[[trigger.http]] +route = "/..." +component = "wasi-observe-tracing" + +[component.wasi-observe-tracing] +source = "%{source=wasi-observe-tracing}" +key_value_stores = ["default"] +allowed_outbound_hosts = ["http://self", "https://asdf.com"] +[component.wasi-observe-tracing.build] +command = "cargo build --target wasm32-wasi --release" +watch = ["src/**/*.rs", "Cargo.toml"] diff --git a/wit/deps/observe/tracer.wit b/wit/deps/observe/tracer.wit new file mode 100644 index 0000000000..cac11286c3 --- /dev/null +++ b/wit/deps/observe/tracer.wit @@ -0,0 +1,151 @@ +interface tracer { + use wasi:clocks/wall-clock@0.2.0.{datetime}; + + /// Starts a new `span` with the given name and options. + start: func(name: string, options: option) -> span; + + /// Represents a unit of work or operation. + resource span { + /// Get the `span-context` for this `span`. + span-context: func() -> span-context; + + /// Returns true when the data provided to this `span` is captured in some form. If it returns false then any data provided is discarded. + is-recording: func() -> bool; + + /// Set attributes of this span. + /// + /// If a key already exists for an attribute of the Span it will be overwritten with the corresponding new value. + set-attributes: func(attributes: list); + + /// Adds an event with the provided name at the curent timestamp. + /// + /// Optionally an alternative timestamp may be provided. You may also provide attributes of this event. + add-event: func(name: string, timestamp: option, attributes: option>); + + /// Associates this `span` with another. + add-link: func(link: link); + + /// Override the default `span` status, which is unset. + set-status: func(status: status); + + /// Updates the `span` name. + update-name: func(name: string); + + /// Signals that the operation described by this span has now ended. + /// + /// If a timestamp is not provided then it is treated equivalent to passing the current time. Dropping this resource is the equivalent of calling `end(none)`. + end: func(timestamp: option); + } + + /// Configuration for starting a `span`. + record start-options { + /// Whether this span should act as the root of a new trace. + new-root: bool, + /// `span-kind` for the new `span`. + span-kind: option, + /// Attributes for the new `span`. + attributes: option>, + /// `link`'s for the new `span`. + links: option>, + /// When the `span` should begin. If this is not provided it defaults to the current time. + timestamp: option, + } + + /// Describes a relationship to another `span`. + record link { + /// Denotes which `span` to link to. + span-context: span-context, + /// Attributes describing the link. + attributes: list, + } + + /// Describes the relationship between the Span, its parents, and its children in a trace. + enum span-kind { + /// Indicates that the span describes a request to some remote service. This span is usually the parent of a remote server span and does not end until the response is received. + client, + /// Indicates that the span covers server-side handling of a synchronous RPC or other remote request. This span is often the child of a remote client span that was expected to wait for a response. + server, + /// Indicates that the span describes the initiators of an asynchronous request. This parent span will often end before the corresponding child consumer span, possibly even before the child span starts. In messaging scenarios with batching, tracing individual messages requires a new producer span per message to be created. + producer, + /// Indicates that the span describes a child of an asynchronous consumer request. + consumer, + /// Default value. Indicates that the span represents an internal operation within an application, as opposed to an operations with remote parents or children. + internal + } + + /// The `status` of a `span`. + variant status { + /// The default status. + unset, + /// The operation has been validated by an Application developer or Operator to have completed successfully. + ok, + /// The operation contains an error with a description. + error(string), + } + + /// A key-value pair describing an attribute. + record key-value { + /// The attribute name. + key: key, + /// The attribute value. + value: value, + } + + /// The key part of attribute `key-value` pairs. + type key = string; + + /// The value part of attribute `key-value` pairs. + variant value { + /// A string value. + %string(string), + /// A boolean value. + %bool(bool), + /// A double precision floating point value. + %float64(float64), + /// A signed 64 bit integer value. + %s64(s64), + /// A homogeneous array of string values. + string-array(list), + /// A homogeneous array of boolean values. + bool-array(list), + /// A homogeneous array of double precision floating point values. + float64-array(list), + /// A homogeneous array of 64 bit integer values. + s64-array(list), + } + + /// Identifying trace information about a span that can be serialized and propagated. + record span-context { + /// The `trace-id` for this `span-context`. + trace-id: trace-id, + /// The `span-id` for this `span-context`. + span-id: span-id, + /// The `trace-flags` for this `span-context`. + trace-flags: trace-flags, + /// Whether this `span-context` was propagated from a remote parent. + is-remote: bool, + /// The `trace-state` for this `span-context`. + trace-state: trace-state, + } + + /// The trace that this `span-context` belongs to. + /// + /// 16 bytes encoded as a hexadecimal string. + type trace-id = string; + + /// The id of this `span-context`. + /// + /// 8 bytes encoded as a hexadecimal string. + type span-id = string; + + /// Flags that can be set on a `span-context`. + flags trace-flags { + /// Whether the `span` should be sampled or not. + sampled, + } + + /// Carries system-specific configuration data, represented as a list of key-value pairs. `trace-state` allows multiple tracing systems to participate in the same trace. + /// + /// If any invalid keys or values are provided then the `trace-state` will be treated as an empty list. + type trace-state = list>; +} diff --git a/wit/deps/observe/world.wit b/wit/deps/observe/world.wit new file mode 100644 index 0000000000..063b1ac821 --- /dev/null +++ b/wit/deps/observe/world.wit @@ -0,0 +1,8 @@ +package wasi:observe@0.2.0-draft; + +world imports { + import tracer; +} + +// TODO(Reviewer): Should we make this an experimental wasi package or a Spin package +// TODO(Reviewer): Would adding a metrics interface to this in a future version be a breaking change? diff --git a/wit/world.wit b/wit/world.wit index b16e429054..585eb59770 100644 --- a/wit/world.wit +++ b/wit/world.wit @@ -24,6 +24,7 @@ world platform { import sqlite; import key-value; import variables; + include wasi:observe/imports@0.2.0-draft; } /// Like `platform`, but using WASI 0.2.0-rc-2023-10-18