From 363d27d3b3f9e20b55928e9570d9d3157599d5b1 Mon Sep 17 00:00:00 2001 From: Gustavo Lopes Date: Mon, 1 Jul 2024 14:46:07 +0100 Subject: [PATCH 1/5] telemetry worker: flush data after stops Telemetry workers are functionally dead after a Stop lifecycle action, provided there's no intervening Start. While AddPoint actions are still processed, their data is never flushed, since the Stop action handler unschedules FlushMetrics and FlushData actions. PHP sends a Stop action at the end of every request via ddog_sidecar_telemetry_end(), but a Start action is only generated just after a telemetry worker is spawned. It is not clear to me whether the intention is to a Start/Stop pair on every PHP requests (where Stop flushes the metrics) or if the intention is to to have only such a pair in the first request, with the Stop event generated by ddog_sidecar_telemetry_end() effectively a noop. It would appear, judging by [this comment](https://github.com/DataDog/libdatadog/pull/391): > Also allow the telemetry worker to have a mode where it's continuing execution after a start-stop cycle, otherwise it won't send any more metrics afterwards. that the intention is to keep sending metrics after a Start/Stop pair. In that case: * The Stop action handler should not unschedule FlushData and FlushMetrics events and * FlushData, if called outside a Start-Stop pair, should not be a noop. Finally: swap the order in which FlushData and FlushMetrics are scheduled so that FlushMetrics runs first and therefore its generated data can be sent by the next FlushData. --- ddtelemetry/src/worker/mod.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index df5a12dcf..0b2bebe36 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -241,10 +241,10 @@ impl TelemetryWorker { Lifecycle(Start) => { if !self.data.started { self.deadlines - .schedule_event(LifecycleAction::FlushData) + .schedule_event(LifecycleAction::FlushMetricAggr) .unwrap(); self.deadlines - .schedule_event(LifecycleAction::FlushMetricAggr) + .schedule_event(LifecycleAction::FlushData) .unwrap(); self.data.started = true; } @@ -265,7 +265,7 @@ impl TelemetryWorker { .unwrap(); } Lifecycle(FlushData) => { - if !self.data.started { + if !(self.data.started || self.config.restartable) { return CONTINUE; } let batch = self.build_observability_batch(); @@ -296,7 +296,9 @@ impl TelemetryWorker { self.log_err(&e); } self.data.started = false; - self.deadlines.clear_pending(); + if !self.config.restartable { + self.deadlines.clear_pending(); + } return BREAK; } CollectStats(stats_sender) => { @@ -341,10 +343,10 @@ impl TelemetryWorker { Err(err) => self.log_err(&err), } self.deadlines - .schedule_event(LifecycleAction::FlushData) + .schedule_event(LifecycleAction::FlushMetricAggr) .unwrap(); self.deadlines - .schedule_event(LifecycleAction::FlushMetricAggr) + .schedule_event(LifecycleAction::FlushData) .unwrap(); self.data.started = true; } @@ -368,7 +370,7 @@ impl TelemetryWorker { .unwrap(); } Lifecycle(FlushData) => { - if !self.data.started { + if !(self.data.started || self.config.restartable) { return CONTINUE; } let mut batch = self.build_app_events_batch(); @@ -458,7 +460,9 @@ impl TelemetryWorker { .await; self.data.started = false; - self.deadlines.clear_pending(); + if !self.config.restartable { + self.deadlines.clear_pending(); + } return BREAK; } CollectStats(stats_sender) => { From b80b607a88e2efbb1376191feb226083b949ea68 Mon Sep 17 00:00:00 2001 From: Gustavo Lopes Date: Thu, 5 Dec 2024 09:17:06 -0300 Subject: [PATCH 2/5] Fix configurations, integrations, deps being relayed over and over --- ddtelemetry/src/worker/mod.rs | 13 +++++++++---- ddtelemetry/src/worker/store.rs | 6 ++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index 0b2bebe36..d742a4165 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -474,27 +474,32 @@ impl TelemetryWorker { } // Builds telemetry payloads containing lifecycle events - fn build_app_events_batch(&self) -> Vec { + fn build_app_events_batch(&mut self) -> Vec { let mut payloads = Vec::new(); if self.data.dependencies.flush_not_empty() { payloads.push(data::Payload::AppDependenciesLoaded( data::AppDependenciesLoaded { - dependencies: self.data.dependencies.unflushed().cloned().collect(), + dependencies: self.data.dependencies.drain_unflushed().cloned().collect(), }, )) } if self.data.integrations.flush_not_empty() { payloads.push(data::Payload::AppIntegrationsChange( data::AppIntegrationsChange { - integrations: self.data.integrations.unflushed().cloned().collect(), + integrations: self.data.integrations.drain_unflushed().cloned().collect(), }, )) } if self.data.configurations.flush_not_empty() { payloads.push(data::Payload::AppClientConfigurationChange( data::AppClientConfigurationChange { - configuration: self.data.configurations.unflushed().cloned().collect(), + configuration: self + .data + .configurations + .drain_unflushed() + .cloned() + .collect(), }, )) } diff --git a/ddtelemetry/src/worker/store.rs b/ddtelemetry/src/worker/store.rs index 82b2f29ed..348ea1bd8 100644 --- a/ddtelemetry/src/worker/store.rs +++ b/ddtelemetry/src/worker/store.rs @@ -204,6 +204,12 @@ where pub fn len_stored(&self) -> usize { self.items.len() } + + pub fn drain_unflushed(&mut self) -> impl Iterator { + self.unflushed + .drain(..) + .flat_map(|i| self.items.get_idx(i).map(|(k, _)| k)) + } } impl Extend for Store From b439363699147decfa020b97cb48ebd4b6612954 Mon Sep 17 00:00:00 2001 From: Gustavo Lopes Date: Thu, 5 Dec 2024 09:20:56 -0300 Subject: [PATCH 3/5] Fix actions not being relayed in some circumstances EnqueuedTelemetryData has actions stored directly in an "actions" field, but also dependencies, configurations and integrations in separate fields; extract_telemetry_actions() converts from the latter into proper TelemetryActions. However, this method was being called only from a branch of register_service_and_flush_queued_actions() that would be hit when RuntimeInfo::apps does not already contain the key corresponding to the (service, env) in question (so, essentially, deps/confs/integrations would be lost on all but the first call). --- sidecar/src/service/sidecar_server.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index fd31fa9da..364584083 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -644,6 +644,13 @@ impl SidecarInterface for SidecarServer { } app.telemetry.send_msgs(actions).await.ok(); + + let mut extracted_actions: Vec = vec![]; + enqueued_data + .extract_telemetry_actions(&mut extracted_actions) + .await; + app.telemetry.send_msgs(extracted_actions).await.ok(); + // Ok, we dequeued all messages, now new enqueue_actions calls can handle it completer.complete((service_name, env_name)).await; } From 6b45e4ab3ce840c4520ebbea63a3781b0e8590db Mon Sep 17 00:00:00 2001 From: Gustavo Lopes Date: Thu, 5 Dec 2024 15:19:31 -0300 Subject: [PATCH 4/5] Alternative method to prevent repetition of confs, integrations, deps --- ddtelemetry/src/worker/mod.rs | 69 ++++++++++++++------------------- ddtelemetry/src/worker/store.rs | 6 --- 2 files changed, 30 insertions(+), 45 deletions(-) diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index d742a4165..e6e2b5e05 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -429,40 +429,36 @@ impl TelemetryWorker { let obsevability_events = self.build_observability_batch(); - future::join_all( - [ - Some(self.build_request(&data::Payload::MessageBatch(app_events))), - if obsevability_events.is_empty() { - None - } else { - Some( - self.build_request(&data::Payload::MessageBatch( - obsevability_events, - )), - ) - }, - ] - .into_iter() - .flatten() - .filter_map(|r| match r { - Ok(r) => Some(r), - Err(e) => { - self.log_err(&e); - None - } - }) - .map(|r| async { - if let Err(e) = self.send_request(r).await { - self.log_err(&e); + let mut payloads = vec![data::Payload::MessageBatch(app_events)]; + if !obsevability_events.is_empty() { + payloads.push(data::Payload::MessageBatch(obsevability_events)); + } + + let self_arc = Arc::new(tokio::sync::RwLock::new(self)); + let futures = payloads.into_iter().map(|payload| { + let self_arc = self_arc.clone(); + async move { + // This is different from the non-functional: + // match self_arc.read().await.send_payload(&payload).await { ... } + // presumably because the temp read guard would live till end of match + let res = { + let self_rguard = self_arc.read().await; + self_rguard.send_payload(&payload).await + }; + match res { + Ok(()) => self_arc.write().await.payload_sent_success(&payload), + Err(err) => self_arc.read().await.log_err(&err), } - }), - ) - .await; + } + }); + future::join_all(futures).await; - self.data.started = false; - if !self.config.restartable { - self.deadlines.clear_pending(); + let mut self_lock = self_arc.write().await; + self_lock.data.started = false; + if !self_lock.config.restartable { + self_lock.deadlines.clear_pending(); } + return BREAK; } CollectStats(stats_sender) => { @@ -480,26 +476,21 @@ impl TelemetryWorker { if self.data.dependencies.flush_not_empty() { payloads.push(data::Payload::AppDependenciesLoaded( data::AppDependenciesLoaded { - dependencies: self.data.dependencies.drain_unflushed().cloned().collect(), + dependencies: self.data.dependencies.unflushed().cloned().collect(), }, )) } if self.data.integrations.flush_not_empty() { payloads.push(data::Payload::AppIntegrationsChange( data::AppIntegrationsChange { - integrations: self.data.integrations.drain_unflushed().cloned().collect(), + integrations: self.data.integrations.unflushed().cloned().collect(), }, )) } if self.data.configurations.flush_not_empty() { payloads.push(data::Payload::AppClientConfigurationChange( data::AppClientConfigurationChange { - configuration: self - .data - .configurations - .drain_unflushed() - .cloned() - .collect(), + configuration: self.data.configurations.unflushed().cloned().collect(), }, )) } diff --git a/ddtelemetry/src/worker/store.rs b/ddtelemetry/src/worker/store.rs index 348ea1bd8..82b2f29ed 100644 --- a/ddtelemetry/src/worker/store.rs +++ b/ddtelemetry/src/worker/store.rs @@ -204,12 +204,6 @@ where pub fn len_stored(&self) -> usize { self.items.len() } - - pub fn drain_unflushed(&mut self) -> impl Iterator { - self.unflushed - .drain(..) - .flat_map(|i| self.items.get_idx(i).map(|(k, _)| k)) - } } impl Extend for Store From 67da02b878e8b6bddd04abd0385c0e77e2b51538 Mon Sep 17 00:00:00 2001 From: Gustavo Lopes Date: Thu, 5 Dec 2024 15:21:22 -0300 Subject: [PATCH 5/5] Add comment --- ddtelemetry/src/worker/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index e6e2b5e05..2b51028e5 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -345,6 +345,7 @@ impl TelemetryWorker { self.deadlines .schedule_event(LifecycleAction::FlushMetricAggr) .unwrap(); + // flush data should be last to previously flushed metrics are sent self.deadlines .schedule_event(LifecycleAction::FlushData) .unwrap();