Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry worker: flush data after stops, and two other fixes #515

Merged
merged 6 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 39 additions & 38 deletions ddtelemetry/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ impl TelemetryWorker {
Lifecycle(Start) => {
if !self.data.started {
self.deadlines
.schedule_event(LifecycleAction::FlushData)
.schedule_event(LifecycleAction::FlushMetricAggr)
.unwrap();
self.deadlines
.schedule_event(LifecycleAction::FlushMetricAggr)
.schedule_event(LifecycleAction::FlushData)
.unwrap();
self.data.started = true;
}
Expand All @@ -265,7 +265,7 @@ impl TelemetryWorker {
.unwrap();
}
Lifecycle(FlushData) => {
if !self.data.started {
if !(self.data.started || self.config.restartable) {
return CONTINUE;
}
let batch = self.build_observability_batch();
Expand Down Expand Up @@ -296,7 +296,9 @@ impl TelemetryWorker {
self.log_err(&e);
}
self.data.started = false;
self.deadlines.clear_pending();
if !self.config.restartable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be enough to only include self.data.started = false; inside the if statement as well, and leave the exit early checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it depends. Do we want Start, Stop, Stop to generate two stops? Because that's what PHP ends up generating. The second stop is a noop, but if I moved the assignment self.data.started = false under the condition if !self.config.restartable, then the stops would be effective

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the exit early checks are still in the code, then how would the second Stop be effective?

Copy link
Contributor Author

@cataphract cataphract Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, what you're proposing is that I reset started = false only if !restartable. In that case started would stay true forever once there is a start. So the early check

 if !self.data.started {
   return BREAK;
 }

would never be hit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. I was confusing the early checks.

There is something in the logic that feels a bit broken. The FlushData is also protected by this !self.data.started check. Should it work after a restartable stop?

The things that Stop does, should they happen when the first request ends, or when the worker stops?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should [FlushData] work after a restartable stop?

My guess is yes, otherwise there is no way to send the data that's collected after the stop (build_observability_batch is called only from the handlers of Stop and FlushData), at least not without an intervening start+stop.

The things that Stop does, should they happen when the first request ends, or when the worker stops?

That is a good point. The final flush of the metrics should happen when the worker stops, not when handling Stop. I guess at some point they were the same, but then the restart thing was introduced. But regardless, once we have a way to send metrics after a Stop, for that happen periodic flushes should still happen. So FlushData shouldn't be skipped or unscheduled after a Stop.

self.deadlines.clear_pending();
}
return BREAK;
}
CollectStats(stats_sender) => {
Expand Down Expand Up @@ -341,10 +343,11 @@ impl TelemetryWorker {
Err(err) => self.log_err(&err),
}
self.deadlines
.schedule_event(LifecycleAction::FlushData)
.schedule_event(LifecycleAction::FlushMetricAggr)
cataphract marked this conversation as resolved.
Show resolved Hide resolved
.unwrap();
// flush data should be last to previously flushed metrics are sent
self.deadlines
.schedule_event(LifecycleAction::FlushMetricAggr)
.schedule_event(LifecycleAction::FlushData)
.unwrap();
self.data.started = true;
}
Expand All @@ -368,7 +371,7 @@ impl TelemetryWorker {
.unwrap();
}
Lifecycle(FlushData) => {
if !self.data.started {
if !(self.data.started || self.config.restartable) {
return CONTINUE;
}
let mut batch = self.build_app_events_batch();
Expand Down Expand Up @@ -427,38 +430,36 @@ impl TelemetryWorker {

let obsevability_events = self.build_observability_batch();

future::join_all(
[
Some(self.build_request(&data::Payload::MessageBatch(app_events))),
if obsevability_events.is_empty() {
None
} else {
Some(
self.build_request(&data::Payload::MessageBatch(
obsevability_events,
)),
)
},
]
.into_iter()
.flatten()
.filter_map(|r| match r {
Ok(r) => Some(r),
Err(e) => {
self.log_err(&e);
None
}
})
.map(|r| async {
if let Err(e) = self.send_request(r).await {
self.log_err(&e);
let mut payloads = vec![data::Payload::MessageBatch(app_events)];
if !obsevability_events.is_empty() {
payloads.push(data::Payload::MessageBatch(obsevability_events));
}

let self_arc = Arc::new(tokio::sync::RwLock::new(self));
let futures = payloads.into_iter().map(|payload| {
let self_arc = self_arc.clone();
async move {
// This is different from the non-functional:
// match self_arc.read().await.send_payload(&payload).await { ... }
// presumably because the temp read guard would live till end of match
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting find

let res = {
let self_rguard = self_arc.read().await;
self_rguard.send_payload(&payload).await
};
match res {
Ok(()) => self_arc.write().await.payload_sent_success(&payload),
Err(err) => self_arc.read().await.log_err(&err),
}
}),
)
.await;
}
});
future::join_all(futures).await;

let mut self_lock = self_arc.write().await;
self_lock.data.started = false;
if !self_lock.config.restartable {
self_lock.deadlines.clear_pending();
}

self.data.started = false;
self.deadlines.clear_pending();
return BREAK;
}
CollectStats(stats_sender) => {
Expand All @@ -470,7 +471,7 @@ impl TelemetryWorker {
}

// Builds telemetry payloads containing lifecycle events
fn build_app_events_batch(&self) -> Vec<Payload> {
fn build_app_events_batch(&mut self) -> Vec<Payload> {
let mut payloads = Vec::new();

if self.data.dependencies.flush_not_empty() {
Expand Down
7 changes: 7 additions & 0 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,13 @@ impl SidecarInterface for SidecarServer {
}

app.telemetry.send_msgs(actions).await.ok();

let mut extracted_actions: Vec<TelemetryActions> = vec![];
enqueued_data
.extract_telemetry_actions(&mut extracted_actions)
.await;
app.telemetry.send_msgs(extracted_actions).await.ok();

// Ok, we dequeued all messages, now new enqueue_actions calls can handle it
completer.complete((service_name, env_name)).await;
}
Expand Down
Loading