Skip to content

Commit

Permalink
Try out measured
Browse files Browse the repository at this point in the history
Signed-off-by: clux <[email protected]>

fmt

Signed-off-by: clux <[email protected]>

fix names being different from before

Signed-off-by: clux <[email protected]>

make tests work using some hacky inspection of metric

Signed-off-by: clux <[email protected]>

avoid using Vecs with fake EmptyLabels (i misunderstood the lib)

Signed-off-by: clux <[email protected]>

don't need to set label_set

Signed-off-by: clux <[email protected]>

use measured's namespace on a metric struct

Signed-off-by: clux <[email protected]>

keep main metrics struct name

Signed-off-by: clux <[email protected]>

fmt

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux committed Jun 25, 2024
1 parent 6635a3a commit 0dc7640
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 118 deletions.
98 changes: 76 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ schemars = { version = "0.8.12", features = ["chrono"] }
serde = { version = "1.0.185", features = ["derive"] }
serde_json = "1.0.105"
serde_yaml = "0.9.25"
prometheus = "0.13.3"
chrono = { version = "0.4.26", features = ["serde"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
Expand All @@ -44,6 +43,8 @@ opentelemetry-otlp = { version = "0.13.0", features = ["tokio"], optional = true
tonic = { version = "0.9", optional = true }
thiserror = "1.0.47"
anyhow = "1.0.75"
measured = { version = "0.0.21", features = ["lasso"] }
lasso = "0.7.2"

[dev-dependencies]
assert-json-diff = "2.0.2"
Expand Down
34 changes: 15 additions & 19 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub struct Context {
/// Diagnostics read by the web server
pub diagnostics: Arc<RwLock<Diagnostics>>,
/// Prometheus metrics
pub metrics: Metrics,
pub metrics: Arc<Metrics>,
}

#[instrument(skip(ctx, doc), fields(trace_id))]
async fn reconcile(doc: Arc<Document>, ctx: Arc<Context>) -> Result<Action> {
let trace_id = telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));
let _timer = ctx.metrics.count_and_measure();
let _timer = ctx.metrics.app.reconciler.count_and_measure();
ctx.diagnostics.write().await.last_event = Utc::now();
let ns = doc.namespace().unwrap(); // doc is namespace scoped
let docs: Api<Document> = Api::namespaced(ctx.client.clone(), &ns);
Expand All @@ -78,7 +78,7 @@ async fn reconcile(doc: Arc<Document>, ctx: Arc<Context>) -> Result<Action> {

fn error_policy(doc: Arc<Document>, error: &Error, ctx: Arc<Context>) -> Action {
warn!("reconcile failed: {:?}", error);
ctx.metrics.reconcile_failure(&doc, error);
ctx.metrics.app.reconciler.set_failure(&doc, error);
Action::requeue(Duration::from_secs(5 * 60))
}

Expand Down Expand Up @@ -171,15 +171,15 @@ impl Diagnostics {
pub struct State {
/// Diagnostics populated by the reconciler
diagnostics: Arc<RwLock<Diagnostics>>,
/// Metrics registry
registry: prometheus::Registry,
/// Metrics and encoder
metrics: Arc<Metrics>,
}

/// State wrapper around the controller outputs for the web server
impl State {
/// Metrics getter
pub fn metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
pub fn metrics(&self) -> Arc<Metrics> {
self.metrics.clone()
}

/// State getter
Expand All @@ -191,7 +191,7 @@ impl State {
pub fn to_context(&self, client: Client) -> Arc<Context> {
Arc::new(Context {
client,
metrics: Metrics::default().register(&self.registry).unwrap(),
metrics: self.metrics.clone(),
diagnostics: self.diagnostics.clone(),
})
}
Expand Down Expand Up @@ -223,7 +223,7 @@ mod test {

#[tokio::test]
async fn documents_without_finalizer_gets_a_finalizer() {
let (testctx, fakeserver, _) = Context::test();
let (testctx, fakeserver) = Context::test();
let doc = Document::test();
let mocksrv = fakeserver.run(Scenario::FinalizerCreation(doc.clone()));
reconcile(Arc::new(doc), testctx).await.expect("reconciler");
Expand All @@ -232,7 +232,7 @@ mod test {

#[tokio::test]
async fn finalized_doc_causes_status_patch() {
let (testctx, fakeserver, _) = Context::test();
let (testctx, fakeserver) = Context::test();
let doc = Document::test().finalized();
let mocksrv = fakeserver.run(Scenario::StatusPatch(doc.clone()));
reconcile(Arc::new(doc), testctx).await.expect("reconciler");
Expand All @@ -241,7 +241,7 @@ mod test {

#[tokio::test]
async fn finalized_doc_with_hide_causes_event_and_hide_patch() {
let (testctx, fakeserver, _) = Context::test();
let (testctx, fakeserver) = Context::test();
let doc = Document::test().finalized().needs_hide();
let scenario = Scenario::EventPublishThenStatusPatch("HideRequested".into(), doc.clone());
let mocksrv = fakeserver.run(scenario);
Expand All @@ -251,7 +251,7 @@ mod test {

#[tokio::test]
async fn finalized_doc_with_delete_timestamp_causes_delete() {
let (testctx, fakeserver, _) = Context::test();
let (testctx, fakeserver) = Context::test();
let doc = Document::test().finalized().needs_delete();
let mocksrv = fakeserver.run(Scenario::Cleanup("DeleteRequested".into(), doc.clone()));
reconcile(Arc::new(doc), testctx).await.expect("reconciler");
Expand All @@ -260,7 +260,7 @@ mod test {

#[tokio::test]
async fn illegal_doc_reconcile_errors_which_bumps_failure_metric() {
let (testctx, fakeserver, _registry) = Context::test();
let (testctx, fakeserver) = Context::test();
let doc = Arc::new(Document::illegal().finalized());
let mocksrv = fakeserver.run(Scenario::RadioSilence);
let res = reconcile(doc.clone(), testctx.clone()).await;
Expand All @@ -270,12 +270,8 @@ mod test {
assert!(err.to_string().contains("IllegalDocument"));
// calling error policy with the reconciler error should cause the correct metric to be set
error_policy(doc.clone(), &err, testctx.clone());
//dbg!("actual metrics: {}", registry.gather());
let failures = testctx
.metrics
.failures
.with_label_values(&["illegal", "finalizererror(applyfailed(illegaldocument))"])
.get();
let metrics = &testctx.metrics.app.reconciler;
let failures = metrics.get_failures("illegal", "finalizererror(applyfailed(illegaldocument))");
assert_eq!(failures, 1);
}

Expand Down
10 changes: 4 additions & 6 deletions src/fixtures.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Helper methods only available for tests
use crate::{Context, Document, DocumentSpec, DocumentStatus, Metrics, Result, DOCUMENT_FINALIZER};
use crate::{Context, Document, DocumentSpec, DocumentStatus, Result, DOCUMENT_FINALIZER};
use assert_json_diff::assert_json_include;
use http::{Request, Response};
use kube::{client::Body, Client, Resource, ResourceExt};
use prometheus::Registry;
use std::sync::Arc;

impl Document {
Expand Down Expand Up @@ -208,15 +207,14 @@ impl ApiServerVerifier {

impl Context {
// Create a test context with a mocked kube client, locally registered metrics and default diagnostics
pub fn test() -> (Arc<Self>, ApiServerVerifier, Registry) {
pub fn test() -> (Arc<Self>, ApiServerVerifier) {
let (mock_service, handle) = tower_test::mock::pair::<Request<Body>, Response<Body>>();
let mock_client = Client::new(mock_service, "default");
let registry = Registry::default();
let ctx = Self {
client: mock_client,
metrics: Metrics::default().register(&registry).unwrap(),
metrics: Arc::default(),
diagnostics: Arc::default(),
};
(Arc::new(ctx), ApiServerVerifier(handle), registry)
(Arc::new(ctx), ApiServerVerifier(handle))
}
}
14 changes: 7 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#![allow(unused_imports, unused_variables)]
use actix_web::{get, middleware, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder};
pub use controller::{self, telemetry, State};
use prometheus::{Encoder, TextEncoder};
pub use controller::{self, telemetry, Metrics, State};

#[get("/metrics")]
async fn metrics(c: Data<State>, _req: HttpRequest) -> impl Responder {
let metrics = c.metrics();
let encoder = TextEncoder::new();
let mut buffer = vec![];
encoder.encode(&metrics, &mut buffer).unwrap();
HttpResponse::Ok().body(buffer)
use measured::metric::group::MetricGroup;
let Metrics { encoder, app } = &*c.metrics();
let mut encoder = encoder.lock().await;
app.collect_group_into(&mut *encoder).unwrap();
let bytes = encoder.finish();
HttpResponse::Ok().body(bytes)
}

#[get("/health")]
Expand Down
Loading

0 comments on commit 0dc7640

Please sign in to comment.