Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer-alt] Add indexer_latest_ingested_checkpoint metric #20188

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions crates/sui-indexer-alt/src/ingestion/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use backoff::backoff::Constant;
use futures::{future::try_join_all, TryStreamExt};
use mysten_metrics::spawn_monitored_task;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
Expand All @@ -32,13 +32,16 @@ pub(super) fn broadcaster(
) -> JoinHandle<()> {
spawn_monitored_task!(async move {
info!("Starting ingestion broadcaster");
let latest_ingested_checkpoint = Arc::new(AtomicU64::new(0));

match ReceiverStream::new(checkpoint_rx)
.map(Ok)
.try_for_each_concurrent(/* limit */ config.ingest_concurrency, |cp| {
let client = client.clone();
let metrics = metrics.clone();
let metrics_clone = metrics.clone();
let subscribers = subscribers.clone();
let latest_ingested_checkpoint = latest_ingested_checkpoint.clone();

// One clone is for the supervisor to signal a cancel if it detects a
// subscriber that wants to wind down ingestion, and the other is to pass to
Expand All @@ -51,7 +54,7 @@ pub(super) fn broadcaster(
let backoff = Constant::new(config.retry_interval);
let fetch = move || {
let client = client.clone();
let metrics = metrics.clone();
let metrics = metrics_clone.clone();
let cancel = cancel.clone();

async move {
Expand All @@ -73,6 +76,11 @@ pub(super) fn broadcaster(

async move {
let checkpoint = backoff::future::retry(backoff, fetch).await?;
let new_seq = checkpoint.checkpoint_summary.sequence_number;
let old_seq = latest_ingested_checkpoint.fetch_max(new_seq, Ordering::Relaxed);
if new_seq > old_seq {
metrics.latest_ingested_checkpoint.set(new_seq as i64);
}
let futures = subscribers.iter().map(|s| s.send(checkpoint.clone()));

if try_join_all(futures).await.is_err() {
Expand Down
12 changes: 10 additions & 2 deletions crates/sui-indexer-alt/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use prometheus::{
proto::{Counter, Gauge, LabelPair, Metric, MetricFamily, MetricType, Summary},
register_histogram_vec_with_registry, register_histogram_with_registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_vec_with_registry, Histogram, HistogramVec, IntCounter, IntCounterVec,
IntGaugeVec, Registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -61,6 +61,8 @@ pub struct IndexerMetrics {
pub total_ingested_transient_retries: IntCounterVec,
pub total_ingested_not_found_retries: IntCounter,

pub latest_ingested_checkpoint: IntGauge,

pub ingested_checkpoint_latency: Histogram,

// Statistics related to individual ingestion pipelines' handlers.
Expand Down Expand Up @@ -200,6 +202,12 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
latest_ingested_checkpoint: register_int_gauge_with_registry!(
"indexer_latest_ingested_checkpoint",
"Latest checkpoint sequence number fetched from the remote store",
registry,
)
.unwrap(),
ingested_checkpoint_latency: register_histogram_with_registry!(
"indexer_ingested_checkpoint_latency",
"Time taken to fetch a checkpoint from the remote store, including retries",
Expand Down
Loading