Skip to content

Commit

Permalink
indexer-alt: index chain identifier (#20148)
Browse files Browse the repository at this point in the history
## Description

Index the chain identifier and initial protocol version in a
`kv_genesis` table. This takes the place of chain identifier indexing
and will also be used by indexers for protocol configs and feature
flags, which need to know the initial protocol version to index and the
chain identifier.

## Test plan

Run the indexer twice, and note from the tracing messages that the first
time, the indexer is bootstrapped, and the second time it reads from the
bootstrapped table.

## Stack

- #20118 
- #20132 
- #20147

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn authored Nov 4, 2024
1 parent e5b7727 commit c151a95
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ url.workspace = true
mysten-metrics.workspace = true
sui-field-count.workspace = true
sui-pg-temp-db.workspace = true
sui-protocol-config.workspace = true
sui-storage.workspace = true
sui-types.workspace = true

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS kv_genesis;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Stores information related to to the genesis checkpoint.
CREATE TABLE IF NOT EXISTS kv_genesis
(
-- The checkpoint digest of the genesis checkpoint
genesis_digest BYTEA PRIMARY KEY,
-- The protocol version from the gensis system state
initial_protocol_version BIGINT NOT NULL
);

-- Index to ensure there can only be one row in the genesis table.
CREATE UNIQUE INDEX IF NOT EXISTS kv_genesis_unique
ON kv_genesis ((0));
108 changes: 108 additions & 0 deletions crates/sui-indexer-alt/src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use anyhow::{bail, Context, Result};
use diesel::{OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
use sui_types::{
full_checkpoint_content::CheckpointData,
sui_system_state::{get_sui_system_state, SuiSystemStateTrait},
transaction::{TransactionDataAPI, TransactionKind},
};
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::{
models::checkpoints::StoredGenesis, schema::kv_genesis, task::graceful_shutdown, Indexer,
};

/// Ensures the genesis table has been populated before the rest of the indexer is run, and returns
/// the information stored there. If the database has been bootstrapped before, this function will
/// simply read the previously bootstrapped information. Otherwise, it will wait until the first
/// checkpoint is available and extract the necessary information from there.
///
/// Can be cancelled via the `cancel` token, or through an interrupt signal (which will also cancel
/// the token).
pub async fn bootstrap(
indexer: &Indexer,
retry_interval: Duration,
cancel: CancellationToken,
) -> Result<StoredGenesis> {
let Ok(mut conn) = indexer.db().connect().await else {
bail!("Bootstrap failed to get connection for DB");
};

// If the row has already been written, return it.
if let Some(genesis) = kv_genesis::table
.select(StoredGenesis::as_select())
.first(&mut conn)
.await
.optional()?
{
info!(
chain = genesis.chain()?.as_str(),
protocol = ?genesis.initial_protocol_version(),
"Indexer already bootstrapped",
);

return Ok(genesis);
}

// Otherwise, extract the necessary information from the genesis checkpoint:
//
// - Get the Genesis system transaction from the genesis checkpoint.
// - Get the system state object that was written out by the system transaction.
let ingestion_client = indexer.ingestion_client().clone();
let wait_cancel = cancel.clone();
let genesis = tokio::spawn(async move {
ingestion_client
.wait_for(0, retry_interval, &wait_cancel)
.await
});

let Some(genesis_checkpoint) = graceful_shutdown(vec![genesis], cancel).await.pop() else {
bail!("Bootstrap cancelled");
};

let genesis_checkpoint = genesis_checkpoint.context("Failed to fetch genesis checkpoint")?;

let CheckpointData {
checkpoint_summary,
transactions,
..
} = genesis_checkpoint.as_ref();

let Some(genesis_transaction) = transactions.iter().find(|tx| {
matches!(
tx.transaction.intent_message().value.kind(),
TransactionKind::Genesis(_)
)
}) else {
bail!("Could not find Genesis transaction");
};

let system_state = get_sui_system_state(&genesis_transaction.output_objects.as_slice())
.context("Failed to get Genesis SystemState")?;

let genesis = StoredGenesis {
genesis_digest: checkpoint_summary.digest().inner().to_vec(),
initial_protocol_version: system_state.protocol_version() as i64,
};

info!(
chain = genesis.chain()?.as_str(),
protocol = ?genesis.initial_protocol_version(),
"Bootstrapped indexer",
);

diesel::insert_into(kv_genesis::table)
.values(&genesis)
.on_conflict_do_nothing()
.execute(&mut conn)
.await
.context("Failed to write genesis record")?;

Ok(genesis)
}
10 changes: 5 additions & 5 deletions crates/sui-indexer-alt/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ pub struct IngestionService {
pub struct IngestionConfig {
/// Remote Store to fetch checkpoints from.
#[arg(long, required = true, group = "source")]
remote_store_url: Option<Url>,
pub remote_store_url: Option<Url>,

/// Path to the local ingestion directory.
/// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used.
#[arg(long, required = true, group = "source")]
local_ingestion_path: Option<PathBuf>,
pub local_ingestion_path: Option<PathBuf>,

/// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
#[arg(long, default_value_t = 5000)]
checkpoint_buffer_size: usize,
pub checkpoint_buffer_size: usize,

/// Maximum number of checkpoints to attempt to fetch concurrently.
#[arg(long, default_value_t = 200)]
ingest_concurrency: usize,
pub ingest_concurrency: usize,

/// Polling interval to retry fetching checkpoints that do not exist.
#[arg(
Expand All @@ -63,7 +63,7 @@ pub struct IngestionConfig {
value_name = "MILLISECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_millis)
)]
retry_interval: Duration,
pub retry_interval: Duration,
}

impl IngestionService {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio_util::sync::CancellationToken;
use tracing::info;

pub mod args;
pub mod bootstrap;
pub mod db;
pub mod handlers;
pub mod ingestion;
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use anyhow::{Context, Result};
use clap::Parser;
use sui_indexer_alt::args::Command;
use sui_indexer_alt::bootstrap::bootstrap;
use sui_indexer_alt::db::reset_database;
use sui_indexer_alt::{
args::Args,
Expand Down Expand Up @@ -36,8 +37,11 @@ async fn main() -> Result<()> {
indexer,
consistent_range: lag,
} => {
let retry_interval = indexer.ingestion_config.retry_interval;
let mut indexer = Indexer::new(args.db_config, indexer, cancel.clone()).await?;

bootstrap(&indexer, retry_interval, cancel.clone()).await?;

indexer.concurrent_pipeline::<EvEmitMod>().await?;
indexer.concurrent_pipeline::<EvStructInst>().await?;
indexer.concurrent_pipeline::<KvCheckpoints>().await?;
Expand Down
35 changes: 34 additions & 1 deletion crates/sui-indexer-alt/src/models/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::kv_checkpoints;
use anyhow::{anyhow, Result};
use diesel::prelude::*;
use sui_field_count::FieldCount;
use sui_protocol_config::{Chain, ProtocolVersion};
use sui_types::digests::{ChainIdentifier, CheckpointDigest};

use crate::schema::{kv_checkpoints, kv_genesis};

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = kv_checkpoints)]
Expand All @@ -14,3 +18,32 @@ pub struct StoredCheckpoint {
/// BCS serialized CheckpointContents
pub checkpoint_contents: Vec<u8>,
}

#[derive(Insertable, Selectable, Queryable, Debug, Clone)]
#[diesel(table_name = kv_genesis)]
pub struct StoredGenesis {
pub genesis_digest: Vec<u8>,
pub initial_protocol_version: i64,
}

impl StoredGenesis {
/// Try and identify the chain that this indexer is idnexing based on its genesis checkpoint
/// digest.
pub fn chain(&self) -> Result<Chain> {
let bytes: [u8; 32] = self
.genesis_digest
.clone()
.try_into()
.map_err(|_| anyhow!("Bad genesis digest"))?;

let digest = CheckpointDigest::new(bytes);
let identifier = ChainIdentifier::from(digest);

Ok(identifier.chain())
}

/// The protocol version that the chain was started at.
pub fn initial_protocol_version(&self) -> ProtocolVersion {
ProtocolVersion::new(self.initial_protocol_version as u64)
}
}
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ diesel::table! {
}
}

diesel::table! {
kv_genesis (genesis_digest) {
genesis_digest -> Bytea,
initial_protocol_version -> Int8,
}
}

diesel::table! {
kv_objects (object_id, object_version) {
object_id -> Bytea,
Expand Down Expand Up @@ -190,6 +197,7 @@ diesel::allow_tables_to_appear_in_same_query!(
ev_emit_mod,
ev_struct_inst,
kv_checkpoints,
kv_genesis,
kv_objects,
kv_transactions,
obj_versions,
Expand Down

0 comments on commit c151a95

Please sign in to comment.