Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics improvement #31

Merged
merged 8 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ serde = "1.0.176"
reqwest = "0.11.7"
tokio = { version = "1.29.1", features = ["full"] }
tracing = "0.1.40"
metrics = "0.22.1"

canister-utils = { path = "src/canister-utils" }
ic-identity = { path = "src/ic-identity" }
1 change: 1 addition & 0 deletions src/gateway-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ dashmap = "5.5.3"
tokio = { workspace = true }
tracing = { workspace = true }

metrics = { workspace = true }
canister-utils = { workspace = true }
52 changes: 37 additions & 15 deletions src/gateway-state/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use canister_utils::{ClientKey, IcWsCanisterMessage};
use std::sync::Arc;

use dashmap::{mapref::entry::Entry, DashMap};
use ic_agent::export::Principal;
use std::sync::Arc;
use metrics::gauge;
use tokio::sync::mpsc::Sender;
use tracing::Span;
use tracing::{debug, Span};

use canister_utils::{ClientKey, IcWsCanisterMessage};

/// State of the WS Gateway that can be shared between threads
#[derive(Clone)]
Expand Down Expand Up @@ -59,6 +62,12 @@ impl GatewayState {
span: client_session_span,
},
);

// Increment the number of clients connected to the canister
let clients_connected = poller_state.len();
debug!("Clients connected: {}", clients_connected);
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);
// the poller shall not be started again
None
},
Expand All @@ -74,6 +83,12 @@ impl GatewayState {
},
);
entry.insert(Arc::clone(&poller_state));

// Increment the number of clients connected to the canister
let clients_connected = poller_state.len();
debug!("Clients connected: {}", clients_connected);
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);
// the poller shall be started
Some(poller_state)
},
Expand Down Expand Up @@ -104,6 +119,12 @@ impl GatewayState {
// if this is encountered it might indicate a race condition
unreachable!("Client key not found in poller state");
}

// Decrement the number of clients connected to the canister
let clients_connected = poller_state.len();
debug!("Clients connected: {}", clients_connected);
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);
// even if this is the last client session for the canister, do not remove the canister from the gateway state
// this will be done by the poller task
}
Expand All @@ -115,14 +136,6 @@ impl GatewayState {
// therefore there is no need to do anything else here
}

pub fn get_clients_count(&self, canister_id: CanisterPrincipal) -> usize {
if let Some(poller_state) = self.inner.data.get(&canister_id) {
poller_state.len()
} else {
0
}
}

pub fn get_active_pollers_count(&self) -> usize {
self.inner.data.len()
}
Expand Down Expand Up @@ -155,7 +168,15 @@ impl GatewayState {
// returns 'ClientRemovalResult::Removed' if the client was removed, 'ClientRemovalResult::Vacant' if there was no such client
return {
match poller_state.remove(&client_key) {
Some(_) => ClientRemovalResult::Removed(client_key),
Some(_) => {
// Decrement the number of clients connected to the canister
let clients_connected = poller_state.len();
debug!("Clients connected: {}", clients_connected);
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);

ClientRemovalResult::Removed(client_key)
},
None => ClientRemovalResult::Vacant,
}
};
Expand Down Expand Up @@ -262,14 +283,15 @@ pub type CanisterPrincipal = Principal;

#[cfg(test)]
mod tests {
use tokio::sync::mpsc::{self, Receiver};

use super::*;
use std::{
thread,
time::{Duration, Instant},
};

use tokio::sync::mpsc::{self, Receiver};

use super::*;

#[tokio::test]
async fn should_insert_new_client_channels_and_get_new_poller_state_once() {
let clients_count = 1000;
Expand Down
2 changes: 1 addition & 1 deletion src/ic-websocket-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ opentelemetry = { version = "0.21" }
opentelemetry-otlp = { version = "0.14.0" }
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
rand = "0.8"
metrics = "0.22.1"
metrics = { workspace = true }
metrics-exporter-prometheus = "0.13.1"
metrics-util = "0.16.2"

Expand Down
12 changes: 11 additions & 1 deletion src/ic-websocket-gateway/src/canister_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use gateway_state::{
CanisterPrincipal, CanisterRemovalResult, ClientSender, GatewayState, PollerState,
};
use ic_agent::{agent::RejectCode, Agent, AgentError};
use metrics::{gauge, histogram};
use std::{sync::Arc, time::Duration};
use tokio::{sync::mpsc::Sender, time::timeout};
use tracing::{error, span, trace, warn, Instrument, Level, Span};
Expand Down Expand Up @@ -77,6 +78,12 @@ impl CanisterPoller {
// this enables to crawl polling iterations in reverse chronological order
polling_iteration_span.follows_from(previous_polling_iteration_span.id());
}

// register the number of active clients
let clients_connected = self.poller_state.len();
gauge!("clients_connected", "canister_id" => self.canister_id.to_string())
.set(clients_connected as f64);

if let Err(e) = self
.poll_and_relay()
.instrument(polling_iteration_span.clone())
Expand Down Expand Up @@ -140,7 +147,10 @@ impl CanisterPoller {
PollingStatus::NoMessagesPolled => (),
}

// compute the amout of time to sleep for before polling again
// record the time it took to poll the canister
let delta = start_polling_instant.elapsed();
histogram!("poller_duration", "canister_id" => self.canister_id.to_string()).record(delta);

let effective_polling_interval =
self.compute_effective_polling_interval(start_polling_instant);
// if no messages are returned or if the queue is fully drained, sleep for 'effective_polling_interval' before polling again
Expand Down
21 changes: 2 additions & 19 deletions src/ic-websocket-gateway/src/client_session_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use canister_utils::{ws_close, CanisterWsCloseArguments, ClientKey, IcWsCanister
use futures_util::StreamExt;
use gateway_state::{CanisterPrincipal, ClientRemovalResult, GatewayState, PollerState};
use ic_agent::Agent;
use metrics::{gauge, histogram};
use metrics::{counter, gauge, histogram};
use std::sync::Arc;
use std::time::Instant;
use tokio::{
Expand Down Expand Up @@ -189,14 +189,9 @@ impl ClientSessionHandler {
client_session_span.in_scope(|| {
debug!("Client session opened");

let canister_id = self.get_canister_id(&client_session);
let client_key = self.get_client_key(&client_session);

// Clients connection metrics
let clients_connected = self.gateway_state.get_clients_count(canister_id);
debug!("Clients connected: {}", clients_connected.to_string());
gauge!("clients_connected", "canister_id" => canister_id.to_string()).set(clients_connected as f64);

counter!("client_connected_count", "client_key" => client_key.to_string()).absolute(1);
// Calculate the time it took to open the connection and record it using the timer started in ws_listener.rs
let delta = self.start_connection_time.elapsed();
histogram!("connection_opening_time", "client_key" => client_key.to_string()).record(delta);
Expand All @@ -215,12 +210,6 @@ impl ClientSessionHandler {
.remove_client(canister_id, client_key.clone());
debug!("Client removed from gateway state");

// Clients connection metrics
let clients_connected = self.gateway_state.get_clients_count(canister_id);
debug!("Clients connected: {}", clients_connected.to_string());
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);

let delta = client_start_session_time.elapsed();
histogram!("connection_duration", "client_key" => client_key.to_string())
.record(delta);
Expand Down Expand Up @@ -256,12 +245,6 @@ impl ClientSessionHandler {
{
debug!("Client removed from gateway state");

// Clients connection metrics
let clients_connected = self.gateway_state.get_clients_count(canister_id);
debug!("Clients connected: {}", clients_connected.to_string());
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);

let delta = client_start_session_time.elapsed();
histogram!("connection_duration", "client_key" => client_key.to_string())
.record(delta);
Expand Down
11 changes: 8 additions & 3 deletions src/ic-websocket-gateway/src/gateway_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use metrics::{describe_gauge, describe_histogram, gauge};
use metrics::{describe_counter, describe_gauge, describe_histogram, gauge};
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask;
use std::error::Error;
Expand All @@ -9,12 +9,16 @@ use std::time::Duration;
pub fn init_metrics(address: &str) -> Result<(), Box<dyn Error>> {
let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::from_str(address)?);

// Set the idle timeout for counters and histograms to 30 seconds then the metrics are removed from the registry
// Set the idle timeout for counters and histograms to 10 seconds then the metrics are removed from the registry
builder
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(30)))
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(10)))
.install()
.expect("failed to install Prometheus recorder");

describe_counter!(
"client_connected_count",
"Each time that a client connects it emits a point"
);
describe_gauge!(
"clients_connected",
"The number of clients currently connected"
Expand All @@ -32,6 +36,7 @@ pub fn init_metrics(address: &str) -> Result<(), Box<dyn Error>> {
"connection_opening_time",
"The time it takes to open a connection"
);
describe_histogram!("poller_duration", "The time it takes to poll the canister");

gauge!("active_pollers").set(0.0);

Expand Down
Loading