Skip to content

Commit

Permalink
fix tracing (#136)
Browse files Browse the repository at this point in the history
* fix tracing

* fix config

* cleanup
  • Loading branch information
ermalkaleci authored Nov 17, 2023
1 parent b908466 commit 9d67055
Show file tree
Hide file tree
Showing 14 changed files with 303 additions and 253 deletions.
100 changes: 51 additions & 49 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ http = "0.2.8"
hyper = "0.14.23"
log = "0.4.17"
moka = { version = "0.11.0", features = ["future"] }
opentelemetry = { version = "0.20.0", features = ["rt-tokio"] }
opentelemetry-datadog = { version = "0.8.0", features = ["reqwest-client"] }
opentelemetry-jaeger = { version = "0.19.0", features = ["rt-tokio"] }
opentelemetry = { version = "0.21.0" }
opentelemetry-datadog = { version = "0.9.0", features = ["reqwest-client"] }
opentelemetry-jaeger = { version = "0.20.0", features = ["rt-tokio"] }
opentelemetry_sdk = { version = "0.21.1", features = ["rt-tokio", "trace"] }

rand = "0.8.5"
serde = "1.0.152"
serde_json = "1.0.92"
Expand Down
2 changes: 1 addition & 1 deletion config.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
extensions:
extensions:
client:
endpoints:
- wss://acala-rpc.dwellir.com
Expand Down
7 changes: 4 additions & 3 deletions src/extensions/telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::env;

use async_trait::async_trait;
use opentelemetry::{global, sdk::trace::Tracer, trace::TraceError};
use opentelemetry::{global, trace::TraceError};
use opentelemetry_sdk::trace::Tracer;
use serde::Deserialize;

use super::{Extension, ExtensionRegistry};
Expand Down Expand Up @@ -64,7 +65,7 @@ pub fn setup_telemetry(options: &TelemetryConfig) -> Result<Option<Tracer>, Trac
tracer = tracer.with_endpoint(agent_endpoint.clone());
}

let tracer = tracer.install_batch(opentelemetry::runtime::Tokio)?;
let tracer = tracer.install_batch(opentelemetry_sdk::runtime::Tokio)?;

Some(tracer)
}
Expand All @@ -80,7 +81,7 @@ pub fn setup_telemetry(options: &TelemetryConfig) -> Result<Option<Tracer>, Trac
tracer = tracer.with_agent_endpoint(agent_endpoint);
}

let tracer = tracer.install_batch(opentelemetry::runtime::Tokio)?;
let tracer = tracer.install_batch(opentelemetry_sdk::runtime::Tokio)?;

Some(tracer)
}
Expand Down
11 changes: 8 additions & 3 deletions src/middlewares/methods/block_tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::sync::Arc;

use async_trait::async_trait;
use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned};
use opentelemetry::trace::FutureExt;

use crate::{
extensions::api::EthApi,
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod},
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER},
utils::{TypeRegistry, TypeRegistryRef},
};

Expand Down Expand Up @@ -97,8 +98,12 @@ impl Middleware<CallRequest, Result<JsonValue, ErrorObjectOwned>> for BlockTagMi
context: TypeRegistry,
next: NextFn<CallRequest, Result<JsonValue, ErrorObjectOwned>>,
) -> Result<JsonValue, ErrorObjectOwned> {
let (request, context) = self.replace(request, context).await;
next(request, context).await
async move {
let (request, context) = self.replace(request, context).await;
next(request, context).await
}
.with_context(TRACER.context("block_tag"))
.await
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/middlewares/methods/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use opentelemetry::trace::FutureExt;
use crate::{
config::CacheParams,
extensions::cache::Cache as CacheExtension,
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod},
utils::{telemetry, Cache, CacheKey, TypeRegistry, TypeRegistryRef},
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER},
utils::{Cache, CacheKey, TypeRegistry, TypeRegistryRef},
};

pub struct BypassCache(pub bool);
Expand Down Expand Up @@ -62,8 +62,6 @@ impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for CacheMiddleware {
}
}

const TRACER: telemetry::Tracer = telemetry::Tracer::new("cache-middleware");

#[async_trait]
impl Middleware<CallRequest, Result<JsonValue, ErrorObjectOwned>> for CacheMiddleware {
async fn call(
Expand Down Expand Up @@ -95,7 +93,7 @@ impl Middleware<CallRequest, Result<JsonValue, ErrorObjectOwned>> for CacheMiddl

result
}
.with_context(TRACER.context("call"))
.with_context(TRACER.context("cache"))
.await
}
}
Expand Down
11 changes: 8 additions & 3 deletions src/middlewares/methods/delay.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::time::Duration;

use async_trait::async_trait;
use opentelemetry::trace::FutureExt;

use crate::{
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod},
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER},
utils::{TypeRegistry, TypeRegistryRef},
};

Expand Down Expand Up @@ -39,7 +40,11 @@ impl Middleware<CallRequest, CallResult> for DelayMiddleware {
context: TypeRegistry,
next: NextFn<CallRequest, CallResult>,
) -> CallResult {
tokio::time::sleep(self.delay).await;
next(request, context).await
async move {
tokio::time::sleep(self.delay).await;
next(request, context).await
}
.with_context(TRACER.context("delay"))
.await
}
}
Loading

0 comments on commit 9d67055

Please sign in to comment.