-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
telemetry worker: flush data after stops, and two other fixes #515
Changes from 3 commits
363d27d
b80b607
b439363
6b45e4a
67da02b
cbce086
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -241,10 +241,10 @@ impl TelemetryWorker { | |
Lifecycle(Start) => { | ||
if !self.data.started { | ||
self.deadlines | ||
.schedule_event(LifecycleAction::FlushData) | ||
.schedule_event(LifecycleAction::FlushMetricAggr) | ||
.unwrap(); | ||
self.deadlines | ||
.schedule_event(LifecycleAction::FlushMetricAggr) | ||
.schedule_event(LifecycleAction::FlushData) | ||
.unwrap(); | ||
self.data.started = true; | ||
} | ||
|
@@ -265,7 +265,7 @@ impl TelemetryWorker { | |
.unwrap(); | ||
} | ||
Lifecycle(FlushData) => { | ||
if !self.data.started { | ||
if !(self.data.started || self.config.restartable) { | ||
return CONTINUE; | ||
} | ||
let batch = self.build_observability_batch(); | ||
|
@@ -296,7 +296,9 @@ impl TelemetryWorker { | |
self.log_err(&e); | ||
} | ||
self.data.started = false; | ||
self.deadlines.clear_pending(); | ||
if !self.config.restartable { | ||
self.deadlines.clear_pending(); | ||
} | ||
return BREAK; | ||
} | ||
CollectStats(stats_sender) => { | ||
|
@@ -341,10 +343,10 @@ impl TelemetryWorker { | |
Err(err) => self.log_err(&err), | ||
} | ||
self.deadlines | ||
.schedule_event(LifecycleAction::FlushData) | ||
.schedule_event(LifecycleAction::FlushMetricAggr) | ||
cataphract marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.unwrap(); | ||
self.deadlines | ||
.schedule_event(LifecycleAction::FlushMetricAggr) | ||
.schedule_event(LifecycleAction::FlushData) | ||
.unwrap(); | ||
self.data.started = true; | ||
} | ||
|
@@ -368,7 +370,7 @@ impl TelemetryWorker { | |
.unwrap(); | ||
} | ||
Lifecycle(FlushData) => { | ||
if !self.data.started { | ||
if !(self.data.started || self.config.restartable) { | ||
return CONTINUE; | ||
} | ||
let mut batch = self.build_app_events_batch(); | ||
|
@@ -458,7 +460,9 @@ impl TelemetryWorker { | |
.await; | ||
|
||
self.data.started = false; | ||
self.deadlines.clear_pending(); | ||
if !self.config.restartable { | ||
bantonsson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.deadlines.clear_pending(); | ||
} | ||
return BREAK; | ||
} | ||
CollectStats(stats_sender) => { | ||
|
@@ -470,27 +474,32 @@ impl TelemetryWorker { | |
} | ||
|
||
// Builds telemetry payloads containing lifecycle events | ||
fn build_app_events_batch(&self) -> Vec<Payload> { | ||
fn build_app_events_batch(&mut self) -> Vec<Payload> { | ||
let mut payloads = Vec::new(); | ||
|
||
if self.data.dependencies.flush_not_empty() { | ||
payloads.push(data::Payload::AppDependenciesLoaded( | ||
data::AppDependenciesLoaded { | ||
dependencies: self.data.dependencies.unflushed().cloned().collect(), | ||
dependencies: self.data.dependencies.drain_unflushed().cloned().collect(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't these already be removed on a successful send by the call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a new commit that uses the existing method. |
||
}, | ||
)) | ||
} | ||
if self.data.integrations.flush_not_empty() { | ||
payloads.push(data::Payload::AppIntegrationsChange( | ||
data::AppIntegrationsChange { | ||
integrations: self.data.integrations.unflushed().cloned().collect(), | ||
integrations: self.data.integrations.drain_unflushed().cloned().collect(), | ||
}, | ||
)) | ||
} | ||
if self.data.configurations.flush_not_empty() { | ||
payloads.push(data::Payload::AppClientConfigurationChange( | ||
data::AppClientConfigurationChange { | ||
configuration: self.data.configurations.unflushed().cloned().collect(), | ||
configuration: self | ||
.data | ||
.configurations | ||
.drain_unflushed() | ||
.cloned() | ||
.collect(), | ||
}, | ||
)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be enough to only include
self.data.started = false;
inside the if statement as well, and leave the exit early checks?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it depends. Do we want Start, Stop, Stop to generate two stops? Because that's what PHP ends up generating. The second stop is a noop, but if I moved the assignment
self.data.started = false
under the conditionif !self.config.restartable
, then the stops would be effectiveThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the exit early checks are still in the code, then how would the second
Stop
be effective?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand, what you're proposing is that I reset
started = false
only if!restartable
. In that casestarted
would stay true forever once there is a start. So the early checkwould never be hit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. I was confusing the early checks.
There is something in the logic that feels a bit broken. The
FlushData
is also protected by this!self.data.started
check. Should it work after a restartable stop?The things that
Stop
does, should they happen when the first request ends, or when the worker stops?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess is yes, otherwise there is no way to send the data that's collected after the stop (
build_observability_batch
is called only from the handlers of Stop and FlushData), at least not without an intervening start+stop.That is a good point. The final flush of the metrics should happen when the worker stops, not when handling Stop. I guess at some point they were the same, but then the restart thing was introduced. But regardless, once we have a way to send metrics after a Stop, for that happen periodic flushes should still happen. So FlushData shouldn't be skipped or unscheduled after a Stop.