diff --git a/Cargo.lock b/Cargo.lock index 9336d9ccb9c4f..85d22bac7e4f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13498,6 +13498,7 @@ dependencies = [ "clap", "const-str", "diesel", + "diesel-async", "downcast", "either", "expect-test", diff --git a/crates/sui-graphql-rpc/Cargo.toml b/crates/sui-graphql-rpc/Cargo.toml index 0f63d54a09594..35d832c36da77 100644 --- a/crates/sui-graphql-rpc/Cargo.toml +++ b/crates/sui-graphql-rpc/Cargo.toml @@ -20,6 +20,7 @@ chrono.workspace = true clap.workspace = true const-str.workspace = true diesel = { workspace = true, features = ["i-implement-a-third-party-backend-and-opt-into-breaking-changes"] } +diesel-async = { workspace = true, features = ["postgres"] } either.workspace = true fastcrypto = { workspace = true, features = ["copy_key"] } fastcrypto-zkp.workspace = true diff --git a/crates/sui-graphql-rpc/src/data.rs b/crates/sui-graphql-rpc/src/data.rs index 04e096061c116..e7126653901ee 100644 --- a/crates/sui-graphql-rpc/src/data.rs +++ b/crates/sui-graphql-rpc/src/data.rs @@ -12,10 +12,12 @@ use async_graphql::dataloader::DataLoader as AGDataLoader; use async_trait::async_trait; use diesel::{ query_builder::{BoxedSelectStatement, FromClause, QueryFragment, QueryId}, - query_dsl::{methods::LimitDsl, LoadQuery}, + query_dsl::methods::LimitDsl, QueryResult, }; +use diesel_async::{methods::LoadQuery, scoped_futures::ScopedBoxFuture}; + use crate::error::Error; /// Database Backend in use -- abstracting a specific implementation. @@ -45,7 +47,7 @@ pub(crate) type Query = #[async_trait] pub(crate) trait QueryExecutor { type Backend: diesel::backend::Backend; - type Connection: diesel::Connection; + type Connection: diesel_async::AsyncConnection; type DbConnection<'c>: DbConnection where @@ -53,9 +55,13 @@ pub(crate) trait QueryExecutor { /// Execute `txn` with read committed isolation. `txn` is supplied a database connection to /// issue queries over. - async fn execute(&self, txn: T) -> Result + async fn execute<'c, T, U, E>(&self, txn: T) -> Result where - T: FnOnce(&mut Self::DbConnection<'_>) -> Result, + T: for<'r> FnOnce( + &'r mut Self::DbConnection<'_>, + ) -> ScopedBoxFuture<'static, 'r, Result> + + Send + + 'c, E: From + std::error::Error, T: Send + 'static, U: Send + 'static, @@ -64,44 +70,55 @@ pub(crate) trait QueryExecutor { /// Execute `txn` with repeatable reads and no phantom reads -- multiple calls to the same query /// should produce the same results. `txn` is supplied a database connection to issue queries /// over. - async fn execute_repeatable(&self, txn: T) -> Result + async fn execute_repeatable<'c, T, U, E>(&self, txn: T) -> Result where - T: FnOnce(&mut Self::DbConnection<'_>) -> Result, + T: for<'r> FnOnce( + &'r mut Self::DbConnection<'_>, + ) -> ScopedBoxFuture<'static, 'r, Result> + + Send + + 'c, E: From + std::error::Error, T: Send + 'static, U: Send + 'static, E: Send + 'static; } +#[async_trait] pub(crate) trait DbConnection { type Backend: diesel::backend::Backend; - type Connection: diesel::Connection; + type Connection: diesel_async::AsyncConnection; /// Run a query that fetches a single value. `query` is a thunk that returns a query when /// called. - fn result(&mut self, query: impl Fn() -> Q) -> QueryResult + async fn result(&mut self, query: T) -> QueryResult where - Q: diesel::query_builder::Query, + T: Fn() -> Q + Send, + Q: diesel::query_builder::Query + Send + 'static, Q: LoadQuery<'static, Self::Connection, U>, - Q: QueryId + QueryFragment; + Q: QueryId + QueryFragment, + U: Send; /// Run a query that fetches multiple values. `query` is a thunk that returns a query when /// called. - fn results(&mut self, query: impl Fn() -> Q) -> QueryResult> + async fn results(&mut self, query: T) -> QueryResult> where - Q: diesel::query_builder::Query, + T: Fn() -> Q + Send, + Q: diesel::query_builder::Query + Send + 'static, Q: LoadQuery<'static, Self::Connection, U>, - Q: QueryId + QueryFragment; + Q: QueryId + QueryFragment, + U: Send; /// Helper to limit a query that fetches multiple values to return only its first value. `query` /// is a thunk that returns a query when called. - fn first(&mut self, query: impl Fn() -> Q) -> QueryResult + async fn first(&mut self, query: T) -> QueryResult where - ::Output: diesel::query_builder::Query, + T: Fn() -> Q + Send, + ::Output: diesel::query_builder::Query + Send, ::Output: LoadQuery<'static, Self::Connection, U>, - ::Output: QueryId + QueryFragment, + ::Output: QueryId + QueryFragment + 'static, + U: Send, { - self.result(move || query().limit(1i64)) + self.result(move || query().limit(1i64)).await } } diff --git a/crates/sui-graphql-rpc/src/data/package_resolver.rs b/crates/sui-graphql-rpc/src/data/package_resolver.rs index 6a1d2956663ef..da5b38115a72a 100644 --- a/crates/sui-graphql-rpc/src/data/package_resolver.rs +++ b/crates/sui-graphql-rpc/src/data/package_resolver.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use async_graphql::dataloader::Loader; use async_trait::async_trait; use diesel::{ExpressionMethods, QueryDsl}; +use diesel_async::scoped_futures::ScopedFutureExt; use move_core_types::account_address::AccountAddress; use sui_indexer::models::packages::StoredPackage; use sui_indexer::schema::packages; @@ -59,9 +60,13 @@ impl Loader for Db { let ids: BTreeSet<_> = keys.iter().map(|PackageKey(id)| id.to_vec()).collect(); let stored_packages: Vec = self .execute(move |conn| { - conn.results(move || { - dsl::packages.filter(dsl::package_id.eq_any(ids.iter().cloned())) - }) + async move { + conn.results(move || { + dsl::packages.filter(dsl::package_id.eq_any(ids.iter().cloned())) + }) + .await + } + .scope_boxed() }) .await .map_err(|e| PackageResolverError::Store { diff --git a/crates/sui-graphql-rpc/src/data/pg.rs b/crates/sui-graphql-rpc/src/data/pg.rs index 9d5aadcba5158..469fabf20ce56 100644 --- a/crates/sui-graphql-rpc/src/data/pg.rs +++ b/crates/sui-graphql-rpc/src/data/pg.rs @@ -7,14 +7,14 @@ use async_trait::async_trait; use diesel::{ pg::Pg, query_builder::{Query, QueryFragment, QueryId}, - query_dsl::LoadQuery, - QueryResult, RunQueryDsl, + QueryResult, }; +use diesel_async::{methods::LoadQuery, scoped_futures::ScopedBoxFuture}; +use diesel_async::{scoped_futures::ScopedFutureExt, RunQueryDsl}; use std::fmt; use std::time::Instant; use sui_indexer::indexer_reader::IndexerReader; -use sui_indexer::{run_query_async, run_query_repeatable_async, spawn_read_only_blocking}; use tracing::error; #[derive(Clone)] @@ -26,7 +26,7 @@ pub(crate) struct PgExecutor { pub(crate) struct PgConnection<'c> { max_cost: u32, - conn: &'c mut diesel::PgConnection, + conn: &'c mut diesel_async::AsyncPgConnection, } pub(crate) struct ByteaLiteral<'a>(pub &'a [u8]); @@ -43,13 +43,17 @@ impl PgExecutor { #[async_trait] impl QueryExecutor for PgExecutor { - type Connection = diesel::PgConnection; + type Connection = diesel_async::AsyncPgConnection; type Backend = Pg; type DbConnection<'c> = PgConnection<'c>; - async fn execute(&self, txn: T) -> Result + async fn execute<'c, T, U, E>(&self, txn: T) -> Result where - T: FnOnce(&mut Self::DbConnection<'_>) -> Result, + T: for<'r> FnOnce( + &'r mut Self::DbConnection<'_>, + ) -> ScopedBoxFuture<'static, 'r, Result> + + Send + + 'c, E: From + std::error::Error, T: Send + 'static, U: Send + 'static, @@ -57,8 +61,25 @@ impl QueryExecutor for PgExecutor { { let max_cost = self.limits.max_db_query_cost; let instant = Instant::now(); - let pool = self.inner.get_pool(); - let result = run_query_async!(&pool, move |conn| txn(&mut PgConnection { max_cost, conn })); + let mut connection = self + .inner + .pool() + .get() + .await + .map_err(|e| Error::Internal(e.to_string()))?; + + let result = connection + .build_transaction() + .read_only() + .run(|conn| { + async move { + let mut connection = PgConnection { max_cost, conn }; + txn(&mut connection).await + } + .scope_boxed() + }) + .await; + self.metrics .observe_db_data(instant.elapsed(), result.is_ok()); if let Err(e) = &result { @@ -67,9 +88,13 @@ impl QueryExecutor for PgExecutor { result.map_err(|e| Error::Internal(e.to_string())) } - async fn execute_repeatable(&self, txn: T) -> Result + async fn execute_repeatable<'c, T, U, E>(&self, txn: T) -> Result where - T: FnOnce(&mut Self::DbConnection<'_>) -> Result, + T: for<'r> FnOnce( + &'r mut Self::DbConnection<'_>, + ) -> ScopedBoxFuture<'static, 'r, Result> + + Send + + 'c, E: From + std::error::Error, T: Send + 'static, U: Send + 'static, @@ -77,11 +102,27 @@ impl QueryExecutor for PgExecutor { { let max_cost = self.limits.max_db_query_cost; let instant = Instant::now(); - let pool = self.inner.get_pool(); - let result = run_query_repeatable_async!(&pool, move |conn| txn(&mut PgConnection { - max_cost, - conn - })); + + let mut connection = self + .inner + .pool() + .get() + .await + .map_err(|e| Error::Internal(e.to_string()))?; + + let result = connection + .build_transaction() + .read_only() + .repeatable_read() + .run(|conn| { + async move { + // + txn(&mut PgConnection { max_cost, conn }).await + } + .scope_boxed() + }) + .await; + self.metrics .observe_db_data(instant.elapsed(), result.is_ok()); if let Err(e) = &result { @@ -91,28 +132,33 @@ impl QueryExecutor for PgExecutor { } } +#[async_trait] impl<'c> super::DbConnection for PgConnection<'c> { - type Connection = diesel::PgConnection; + type Connection = diesel_async::AsyncPgConnection; type Backend = Pg; - fn result(&mut self, query: impl Fn() -> Q) -> QueryResult + async fn result(&mut self, query: T) -> QueryResult where - Q: diesel::query_builder::Query, + T: Fn() -> Q + Send, + Q: diesel::query_builder::Query + Send + 'static, Q: LoadQuery<'static, Self::Connection, U>, Q: QueryId + QueryFragment, + U: Send, { - query_cost::log(self.conn, self.max_cost, query()); - query().get_result(self.conn) + query_cost::log(self.conn, self.max_cost, query()).await; + query().get_result(self.conn).await } - fn results(&mut self, query: impl Fn() -> Q) -> QueryResult> + async fn results(&mut self, query: T) -> QueryResult> where - Q: diesel::query_builder::Query, + T: Fn() -> Q + Send, + Q: diesel::query_builder::Query + Send + 'static, Q: LoadQuery<'static, Self::Connection, U>, Q: QueryId + QueryFragment, + U: Send, { - query_cost::log(self.conn, self.max_cost, query()); - query().get_results(self.conn) + query_cost::log(self.conn, self.max_cost, query()).await; + query().get_results(self.conn).await } } @@ -130,7 +176,8 @@ pub(crate) fn bytea_literal(slice: &[u8]) -> ByteaLiteral<'_> { mod query_cost { use super::*; - use diesel::{query_builder::AstPass, sql_types::Text, PgConnection, QueryResult}; + use diesel::{query_builder::AstPass, sql_types::Text, QueryResult}; + use diesel_async::AsyncPgConnection; use serde_json::Value; use tap::{TapFallible, TapOptional}; use tracing::{debug, info, warn}; @@ -144,8 +191,6 @@ mod query_cost { type SqlType = Text; } - impl RunQueryDsl for Explained {} - impl> QueryFragment for Explained { fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { out.push_sql("EXPLAIN (FORMAT JSON) "); @@ -155,13 +200,13 @@ mod query_cost { } /// Run `EXPLAIN` on the `query`, and log the estimated cost. - pub(crate) fn log(conn: &mut PgConnection, max_db_query_cost: u32, query: Q) + pub(crate) async fn log(conn: &mut AsyncPgConnection, max_db_query_cost: u32, query: Q) where - Q: Query + QueryId + QueryFragment + RunQueryDsl, + Q: Query + QueryId + QueryFragment + RunQueryDsl + Send, { debug!("Estimating: {}", diesel::debug_query(&query).to_string()); - let Some(cost) = explain(conn, query) else { + let Some(cost) = explain(conn, query).await else { warn!("Failed to extract cost from EXPLAIN."); return; }; @@ -173,12 +218,13 @@ mod query_cost { } } - pub(crate) fn explain(conn: &mut PgConnection, query: Q) -> Option + pub(crate) async fn explain(conn: &mut AsyncPgConnection, query: Q) -> Option where - Q: Query + QueryId + QueryFragment + RunQueryDsl, + Q: Query + QueryId + QueryFragment + RunQueryDsl + Send, { let result: String = Explained { query } .get_result(conn) + .await .tap_err(|e| warn!("Failed to run EXPLAIN: {e}")) .ok()?; @@ -200,6 +246,7 @@ mod tests { use diesel::QueryDsl; use sui_framework::BuiltInFramework; use sui_indexer::{ + database::Connection, db::{get_pool_connection, new_connection_pool, reset_database, ConnectionPoolConfig}, models::objects::StoredObject, schema::objects, @@ -215,6 +262,9 @@ mod tests { let pool = new_connection_pool(database.database().url().as_str(), &pool_config).unwrap(); let mut conn = get_pool_connection(&pool).unwrap(); reset_database(&mut conn).await.unwrap(); + let mut connection = Connection::dedicated(database.database().url()) + .await + .unwrap(); let objects: Vec = BuiltInFramework::iter_system_packages() .map(|pkg| IndexedObject::from_object(1, pkg.genesis_object(), None).into()) @@ -223,7 +273,8 @@ mod tests { let expect = objects.len(); let actual = diesel::insert_into(objects::dsl::objects) .values(objects) - .execute(&mut conn) + .execute(&mut connection) + .await .unwrap(); assert_eq!(expect, actual, "Failed to write objects"); @@ -233,8 +284,12 @@ mod tests { let query_all = dsl::objects.select(dsl::objects.star()); // Test estimating query costs - let cost_one = query_cost::explain(&mut conn, query_one).unwrap(); - let cost_all = query_cost::explain(&mut conn, query_all).unwrap(); + let cost_one = query_cost::explain(&mut connection, query_one) + .await + .unwrap(); + let cost_all = query_cost::explain(&mut connection, query_all) + .await + .unwrap(); assert!( cost_one < cost_all, diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index 6b77fddf14b89..be9423c8f703b 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -88,7 +88,7 @@ impl Server { pub async fn run(mut self) -> Result<(), Error> { get_or_init_server_start_time().await; - let mut connection = get_pool_connection(&self.db_reader.inner.get_pool())?; + let mut connection = get_pool_connection(&self.db_reader.inner.get_blocking_pool())?; check_db_migration_consistency(&mut connection)?; // A handle that spawns a background task to periodically update the `Watermark`, which diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index 0f4d94532476a..ca7bb81e8ea70 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -7,6 +7,7 @@ use crate::metrics::Metrics; use crate::types::chain_identifier::ChainIdentifier; use async_graphql::ServerError; use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel_async::scoped_futures::ScopedFutureExt; use std::mem; use std::sync::Arc; use std::time::Duration; @@ -164,12 +165,16 @@ impl Watermark { use checkpoints::dsl; let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db .execute(move |conn| { - conn.first(move || { - dsl::checkpoints - .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) - .order_by(dsl::sequence_number.desc()) - }) - .optional() + async { + conn.first(move || { + dsl::checkpoints + .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) + .order_by(dsl::sequence_number.desc()) + }) + .await + .optional() + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))? diff --git a/crates/sui-graphql-rpc/src/types/available_range.rs b/crates/sui-graphql-rpc/src/types/available_range.rs index ba879e6bbc0c7..a9f209e58ef3d 100644 --- a/crates/sui-graphql-rpc/src/types/available_range.rs +++ b/crates/sui-graphql-rpc/src/types/available_range.rs @@ -7,6 +7,7 @@ use crate::error::Error; use super::checkpoint::{Checkpoint, CheckpointId}; use async_graphql::*; use diesel::{CombineDsl, ExpressionMethods, QueryDsl, QueryResult}; +use diesel_async::scoped_futures::ScopedFutureExt; use sui_indexer::schema::{checkpoints, objects_snapshot}; #[derive(Clone, Debug, PartialEq, Eq, Copy)] @@ -35,7 +36,9 @@ impl AvailableRange { /// Look up the available range when viewing the data consistently at `checkpoint_viewed_at`. pub(crate) async fn query(db: &Db, checkpoint_viewed_at: u64) -> Result { let Some(range): Option = db - .execute(move |conn| Self::result(conn, checkpoint_viewed_at)) + .execute(move |conn| { + async move { Self::result(conn, checkpoint_viewed_at).await }.scope_boxed() + }) .await .map_err(|e| Error::Internal(format!("Failed to fetch available range: {e}")))? else { @@ -54,24 +57,29 @@ impl AvailableRange { /// Returns an error if there was an issue querying the database, Ok(None) if the checkpoint /// being viewed is not in the database's available range, or Ok(Some(AvailableRange)) /// otherwise. - pub(crate) fn result(conn: &mut Conn, checkpoint_viewed_at: u64) -> QueryResult> { + pub(crate) async fn result( + conn: &mut Conn<'_>, + checkpoint_viewed_at: u64, + ) -> QueryResult> { use checkpoints::dsl as checkpoints; use objects_snapshot::dsl as snapshots; - let checkpoint_range: Vec = conn.results(move || { - let rhs = checkpoints::checkpoints - .select(checkpoints::sequence_number) - .order(checkpoints::sequence_number.desc()) - .limit(1); + let checkpoint_range: Vec = conn + .results(move || { + let rhs = checkpoints::checkpoints + .select(checkpoints::sequence_number) + .order(checkpoints::sequence_number.desc()) + .limit(1); - let lhs = snapshots::objects_snapshot - .select(snapshots::checkpoint_sequence_number) - .order(snapshots::checkpoint_sequence_number.desc()) - .limit(1); + let lhs = snapshots::objects_snapshot + .select(snapshots::checkpoint_sequence_number) + .order(snapshots::checkpoint_sequence_number.desc()) + .limit(1); - // We need to use `union_all` in case `lhs` and `rhs` have the same value. - lhs.union_all(rhs) - })?; + // We need to use `union_all` in case `lhs` and `rhs` have the same value. + lhs.union_all(rhs) + }) + .await?; let (first, mut last) = match checkpoint_range.as_slice() { [] => (0, 0), diff --git a/crates/sui-graphql-rpc/src/types/balance.rs b/crates/sui-graphql-rpc/src/types/balance.rs index 57eb83935d407..78312699491ab 100644 --- a/crates/sui-graphql-rpc/src/types/balance.rs +++ b/crates/sui-graphql-rpc/src/types/balance.rs @@ -16,6 +16,7 @@ use diesel::{ sql_types::{BigInt as SqlBigInt, Nullable, Text}, OptionalExtension, QueryableByName, }; +use diesel_async::scoped_futures::ScopedFutureExt; use serde::{Deserialize, Serialize}; use std::str::FromStr; use sui_indexer::types::OwnerType; @@ -69,14 +70,19 @@ impl Balance { ) -> Result, Error> { let stored: Option = db .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - conn.result(move || { - balance_query(address, Some(coin_type.clone()), range).into_boxed() - }) - .optional() + async move { + let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await? + else { + return Ok::<_, diesel::result::Error>(None); + }; + + conn.result(move || { + balance_query(address, Some(coin_type.clone()), range).into_boxed() + }) + .await + .optional() + } + .scope_boxed() }) .await?; @@ -99,17 +105,23 @@ impl Balance { let Some((prev, next, results)) = db .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - let result = page.paginate_raw_query::( - conn, - checkpoint_viewed_at, - balance_query(address, None, range), - )?; - - Ok(Some(result)) + async move { + let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await? + else { + return Ok::<_, diesel::result::Error>(None); + }; + + let result = page + .paginate_raw_query::( + conn, + checkpoint_viewed_at, + balance_query(address, None, range), + ) + .await?; + + Ok(Some(result)) + } + .scope_boxed() }) .await? else { diff --git a/crates/sui-graphql-rpc/src/types/chain_identifier.rs b/crates/sui-graphql-rpc/src/types/chain_identifier.rs index 0565845b45d23..6cfa481ee7451 100644 --- a/crates/sui-graphql-rpc/src/types/chain_identifier.rs +++ b/crates/sui-graphql-rpc/src/types/chain_identifier.rs @@ -7,6 +7,7 @@ use crate::{ }; use async_graphql::*; use diesel::{OptionalExtension, QueryDsl}; +use diesel_async::scoped_futures::ScopedFutureExt; use sui_indexer::schema::chain_identifier; use sui_types::{ digests::ChainIdentifier as NativeChainIdentifier, messages_checkpoint::CheckpointDigest, @@ -22,8 +23,12 @@ impl ChainIdentifier { let Some(digest_bytes) = db .execute(move |conn| { - conn.first(move || dsl::chain_identifier.select(dsl::checkpoint_digest)) - .optional() + async { + conn.first(move || dsl::chain_identifier.select(dsl::checkpoint_digest)) + .await + .optional() + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch genesis digest: {e}")))? diff --git a/crates/sui-graphql-rpc/src/types/checkpoint.rs b/crates/sui-graphql-rpc/src/types/checkpoint.rs index 852c492967f21..6b58513536e72 100644 --- a/crates/sui-graphql-rpc/src/types/checkpoint.rs +++ b/crates/sui-graphql-rpc/src/types/checkpoint.rs @@ -24,6 +24,7 @@ use async_graphql::{ *, }; use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel_async::scoped_futures::ScopedFutureExt; use fastcrypto::encoding::{Base58, Encoding}; use serde::{Deserialize, Serialize}; use sui_indexer::{models::checkpoints::StoredCheckpoint, schema::checkpoints}; @@ -261,12 +262,16 @@ impl Checkpoint { let stored: Option = db .execute(move |conn| { - conn.first(move || { - dsl::checkpoints - .filter(dsl::sequence_number.le(checkpoint_viewed_at as i64)) - .order_by(dsl::sequence_number.desc()) - }) - .optional() + async move { + conn.first(move || { + dsl::checkpoints + .filter(dsl::sequence_number.le(checkpoint_viewed_at as i64)) + .order_by(dsl::sequence_number.desc()) + }) + .await + .optional() + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?; @@ -279,17 +284,19 @@ impl Checkpoint { /// Look up a `Checkpoint` in the database and retrieve its `timestamp_ms` field. This method /// takes a connection, so that it can be used within a transaction. - pub(crate) fn query_timestamp( - conn: &mut Conn, + pub(crate) async fn query_timestamp( + conn: &mut Conn<'_>, seq_num: u64, ) -> Result { use checkpoints::dsl; - let stored: i64 = conn.first(move || { - dsl::checkpoints - .select(dsl::timestamp_ms) - .filter(dsl::sequence_number.eq(seq_num as i64)) - })?; + let stored: i64 = conn + .first(move || { + dsl::checkpoints + .select(dsl::timestamp_ms) + .filter(dsl::sequence_number.eq(seq_num as i64)) + }) + .await?; Ok(stored as u64) } @@ -318,18 +325,23 @@ impl Checkpoint { let (prev, next, results) = db .execute(move |conn| { - page.paginate_query::( - conn, - checkpoint_viewed_at, - move || { - let mut query = dsl::checkpoints.into_boxed(); - query = query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64)); - if let Some(epoch) = filter { - query = query.filter(dsl::epoch.eq(epoch as i64)); - } - query - }, - ) + async move { + page.paginate_query::( + conn, + checkpoint_viewed_at, + move || { + let mut query = dsl::checkpoints.into_boxed(); + query = + query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64)); + if let Some(epoch) = filter { + query = query.filter(dsl::epoch.eq(epoch as i64)); + } + query + }, + ) + .await + } + .scope_boxed() }) .await?; @@ -407,10 +419,14 @@ impl Loader for Db { let checkpoints: Vec = self .execute(move |conn| { - conn.results(move || { - dsl::checkpoints - .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned())) - }) + async move { + conn.results(move || { + dsl::checkpoints + .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned())) + }) + .await + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?; @@ -452,9 +468,14 @@ impl Loader for Db { let checkpoints: Vec = self .execute(move |conn| { - conn.results(move || { - dsl::checkpoints.filter(dsl::checkpoint_digest.eq_any(digests.iter().cloned())) - }) + async move { + conn.results(move || { + dsl::checkpoints + .filter(dsl::checkpoint_digest.eq_any(digests.iter().cloned())) + }) + .await + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?; diff --git a/crates/sui-graphql-rpc/src/types/coin.rs b/crates/sui-graphql-rpc/src/types/coin.rs index d9654bbe87f10..fdc63dd611e81 100644 --- a/crates/sui-graphql-rpc/src/types/coin.rs +++ b/crates/sui-graphql-rpc/src/types/coin.rs @@ -28,6 +28,7 @@ use super::uint53::UInt53; use async_graphql::*; use async_graphql::connection::{Connection, CursorType, Edge}; +use diesel_async::scoped_futures::ScopedFutureExt; use sui_indexer::models::objects::StoredHistoryObject; use sui_indexer::types::OwnerType; use sui_types::coin::Coin as NativeCoin; @@ -335,15 +336,22 @@ impl Coin { let Some((prev, next, results)) = db .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - Ok(Some(page.paginate_raw_query::( - conn, - checkpoint_viewed_at, - coins_query(coin_type, owner, range, &page), - )?)) + async move { + let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await? + else { + return Ok::<_, diesel::result::Error>(None); + }; + + Ok(Some( + page.paginate_raw_query::( + conn, + checkpoint_viewed_at, + coins_query(coin_type, owner, range, &page), + ) + .await?, + )) + } + .scope_boxed() }) .await? else { diff --git a/crates/sui-graphql-rpc/src/types/cursor.rs b/crates/sui-graphql-rpc/src/types/cursor.rs index 868bfed21388b..24bfa2b7ec246 100644 --- a/crates/sui-graphql-rpc/src/types/cursor.rs +++ b/crates/sui-graphql-rpc/src/types/cursor.rs @@ -8,9 +8,10 @@ use async_graphql::{ *, }; use diesel::{ - deserialize::FromSqlRow, query_builder::QueryFragment, query_dsl::LoadQuery, - sql_types::Untyped, QueryDsl, QueryResult, QuerySource, + deserialize::FromSqlRow, query_builder::QueryFragment, sql_types::Untyped, QueryDsl, + QueryResult, QuerySource, }; +use diesel_async::methods::LoadQuery; use fastcrypto::encoding::{Base64, Encoding}; use serde::{de::DeserializeOwned, Serialize}; @@ -289,7 +290,7 @@ impl Page { /// /// `checkpoint_viewed_at` is a required parameter to and passed to each element to construct a /// consistent cursor. - pub(crate) fn paginate_query( + pub(crate) async fn paginate_query( &self, conn: &mut Conn<'_>, checkpoint_viewed_at: u64, @@ -318,7 +319,7 @@ impl Page { } // Load extra rows to detect the existence of pages on either side. - query = query.limit(page.limit() as i64 + 2); + query = query.limit(Page::limit(&page) as i64 + 2); T::order(page.is_from_front(), query) }; @@ -326,7 +327,7 @@ impl Page { // Avoid the database roundtrip in the degenerate case. vec![] } else { - let mut results = conn.results(query)?; + let mut results = conn.results(query).await?; if !self.is_from_front() { results.reverse(); } @@ -347,7 +348,7 @@ impl Page { /// /// `checkpoint_viewed_at` is a required parameter to and passed to each element to construct a /// consistent cursor. - pub(crate) fn paginate_raw_query( + pub(crate) async fn paginate_raw_query( &self, conn: &mut Conn<'_>, checkpoint_viewed_at: u64, @@ -365,7 +366,7 @@ impl Page { // Avoid the database roundtrip in the degenerate case. vec![] } else { - let mut results: Vec = conn.results(new_query)?; + let mut results: Vec = conn.results(new_query).await?; if !self.is_from_front() { results.reverse(); } diff --git a/crates/sui-graphql-rpc/src/types/display.rs b/crates/sui-graphql-rpc/src/types/display.rs index 67fe2fd413c15..2f9dd1cfcabe4 100644 --- a/crates/sui-graphql-rpc/src/types/display.rs +++ b/crates/sui-graphql-rpc/src/types/display.rs @@ -4,6 +4,7 @@ use async_graphql::*; use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel_async::scoped_futures::ScopedFutureExt; use move_core_types::annotated_value::{MoveStruct, MoveValue}; use sui_indexer::{models::display::StoredDisplay, schema::display}; use sui_types::TypeTag; @@ -50,13 +51,17 @@ impl Display { pub(crate) async fn query(db: &Db, type_: TypeTag) -> Result, Error> { let stored: Option = db .execute(move |conn| { - conn.first(move || { - use display::dsl; - dsl::display.filter( - dsl::object_type.eq(type_.to_canonical_string(/* with_prefix */ true)), - ) - }) - .optional() + async move { + conn.first(move || { + use display::dsl; + dsl::display.filter( + dsl::object_type.eq(type_.to_canonical_string(/* with_prefix */ true)), + ) + }) + .await + .optional() + } + .scope_boxed() }) .await?; diff --git a/crates/sui-graphql-rpc/src/types/dynamic_field.rs b/crates/sui-graphql-rpc/src/types/dynamic_field.rs index 60e05ed610512..71999cb710693 100644 --- a/crates/sui-graphql-rpc/src/types/dynamic_field.rs +++ b/crates/sui-graphql-rpc/src/types/dynamic_field.rs @@ -3,6 +3,7 @@ use async_graphql::connection::{Connection, CursorType, Edge}; use async_graphql::*; +use diesel_async::scoped_futures::ScopedFutureExt; use move_core_types::annotated_value::{self as A, MoveStruct}; use sui_indexer::models::objects::StoredHistoryObject; use sui_indexer::types::OwnerType; @@ -201,15 +202,22 @@ impl DynamicField { let Some((prev, next, results)) = db .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - Ok(Some(page.paginate_raw_query::( - conn, - checkpoint_viewed_at, - dynamic_fields_query(parent, parent_version, range, &page), - )?)) + async move { + let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await? + else { + return Ok::<_, diesel::result::Error>(None); + }; + + Ok(Some( + page.paginate_raw_query::( + conn, + checkpoint_viewed_at, + dynamic_fields_query(parent, parent_version, range, &page), + ) + .await?, + )) + } + .scope_boxed() }) .await? else { diff --git a/crates/sui-graphql-rpc/src/types/epoch.rs b/crates/sui-graphql-rpc/src/types/epoch.rs index 915493217d1f7..36839af4001ae 100644 --- a/crates/sui-graphql-rpc/src/types/epoch.rs +++ b/crates/sui-graphql-rpc/src/types/epoch.rs @@ -22,6 +22,7 @@ use async_graphql::connection::Connection; use async_graphql::dataloader::Loader; use async_graphql::*; use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper}; +use diesel_async::scoped_futures::ScopedFutureExt; use fastcrypto::encoding::{Base58, Encoding}; use sui_indexer::models::epoch::QueryableEpochInfo; use sui_indexer::schema::epochs; @@ -318,16 +319,20 @@ impl Epoch { let stored: Option = db .execute(move |conn| { - conn.first(move || { - // Bound the query on `checkpoint_viewed_at` by filtering for the epoch - // whose `first_checkpoint_id <= checkpoint_viewed_at`, selecting the epoch - // with the largest `first_checkpoint_id` among the filtered set. - dsl::epochs - .select(QueryableEpochInfo::as_select()) - .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64)) - .order_by(dsl::first_checkpoint_id.desc()) - }) - .optional() + async move { + conn.first(move || { + // Bound the query on `checkpoint_viewed_at` by filtering for the epoch + // whose `first_checkpoint_id <= checkpoint_viewed_at`, selecting the epoch + // with the largest `first_checkpoint_id` among the filtered set. + dsl::epochs + .select(QueryableEpochInfo::as_select()) + .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64)) + .order_by(dsl::first_checkpoint_id.desc()) + }) + .await + .optional() + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?; @@ -350,11 +355,15 @@ impl Loader for Db { let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect(); let epochs: Vec = self .execute_repeatable(move |conn| { - conn.results(move || { - dsl::epochs - .select(QueryableEpochInfo::as_select()) - .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned())) - }) + async move { + conn.results(move || { + dsl::epochs + .select(QueryableEpochInfo::as_select()) + .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned())) + }) + .await + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?; diff --git a/crates/sui-graphql-rpc/src/types/event/mod.rs b/crates/sui-graphql-rpc/src/types/event/mod.rs index c3c290e6bb6da..e5eee0f241d4d 100644 --- a/crates/sui-graphql-rpc/src/types/event/mod.rs +++ b/crates/sui-graphql-rpc/src/types/event/mod.rs @@ -15,6 +15,7 @@ use async_graphql::connection::{Connection, CursorType, Edge}; use async_graphql::*; use cursor::EvLookup; use diesel::{ExpressionMethods, QueryDsl}; +use diesel_async::scoped_futures::ScopedFutureExt; use lookups::{add_bounds, select_emit_module, select_event_type, select_sender}; use sui_indexer::models::{events::StoredEvent, transactions::StoredTransaction}; use sui_indexer::schema::{checkpoints, events}; @@ -134,18 +135,18 @@ impl Event { use checkpoints::dsl; let (prev, next, results) = db - .execute(move |conn| { + .execute(move |conn| async move { let tx_hi: i64 = conn.first(move || { dsl::checkpoints.select(dsl::network_total_transactions) .filter(dsl::sequence_number.eq(checkpoint_viewed_at as i64)) - })?; + }).await?; let (prev, next, mut events): (bool, bool, Vec) = if let Some(filter_query) = query_constraint { let query = add_bounds(filter_query, &filter.transaction_digest, &page, tx_hi); let (prev, next, results) = - page.paginate_raw_query::(conn, checkpoint_viewed_at, query)?; + page.paginate_raw_query::(conn, checkpoint_viewed_at, query).await?; let ev_lookups = results .into_iter() @@ -171,13 +172,13 @@ impl Event { .join(" UNION ALL "); query!(query_string).into_boxed() - })?; + }).await?; (prev, next, events) } else { // No filter is provided so we add bounds to the basic `SELECT * FROM // events` query and call it a day. let query = add_bounds(query!("SELECT * FROM events"), &filter.transaction_digest, &page, tx_hi); - let (prev, next, events_iter) = page.paginate_raw_query::(conn, checkpoint_viewed_at, query)?; + let (prev, next, events_iter) = page.paginate_raw_query::(conn, checkpoint_viewed_at, query).await?; let events = events_iter.collect::>(); (prev, next, events) }; @@ -191,7 +192,7 @@ impl Event { Ok::<_, diesel::result::Error>((prev, next, events)) - }) + }.scope_boxed()) .await?; let mut conn = Connection::new(prev, next); diff --git a/crates/sui-graphql-rpc/src/types/move_package.rs b/crates/sui-graphql-rpc/src/types/move_package.rs index a85bc75d8a661..6a8b0573cb154 100644 --- a/crates/sui-graphql-rpc/src/types/move_package.rs +++ b/crates/sui-graphql-rpc/src/types/move_package.rs @@ -30,6 +30,7 @@ use async_graphql::dataloader::Loader; use async_graphql::*; use diesel::prelude::QueryableByName; use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Selectable}; +use diesel_async::scoped_futures::ScopedFutureExt; use serde::{Deserialize, Serialize}; use sui_indexer::models::objects::StoredHistoryObject; use sui_indexer::schema::packages; @@ -707,8 +708,9 @@ impl MovePackage { let (prev, next, results) = db .execute(move |conn| { - let mut q = query!( - r#" + async move { + let mut q = query!( + r#" SELECT p.original_id, o.* @@ -721,17 +723,20 @@ impl MovePackage { AND p.package_version = o.object_version AND p.checkpoint_sequence_number = o.checkpoint_sequence_number "# - ); + ); + + q = filter!( + q, + format!("o.checkpoint_sequence_number < {before_checkpoint}") + ); + if let Some(after) = after_checkpoint { + q = filter!(q, format!("{after} < o.checkpoint_sequence_number")); + } - q = filter!( - q, - format!("o.checkpoint_sequence_number < {before_checkpoint}") - ); - if let Some(after) = after_checkpoint { - q = filter!(q, format!("{after} < o.checkpoint_sequence_number")); + page.paginate_raw_query::(conn, checkpoint_viewed_at, q) + .await } - - page.paginate_raw_query::(conn, checkpoint_viewed_at, q) + .scope_boxed() }) .await?; @@ -772,15 +777,19 @@ impl MovePackage { let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at); let (prev, next, results) = db .execute(move |conn| { - page.paginate_raw_query::( - conn, - checkpoint_viewed_at, - if is_system_package(package) { - system_package_version_query(package, filter) - } else { - user_package_version_query(package, filter) - }, - ) + async move { + page.paginate_raw_query::( + conn, + checkpoint_viewed_at, + if is_system_package(package) { + system_package_version_query(package, filter) + } else { + user_package_version_query(package, filter) + }, + ) + .await + } + .scope_boxed() }) .await?; @@ -901,26 +910,32 @@ impl Loader for Db { let stored_packages: Vec<(Vec, i64, Vec)> = self .execute(move |conn| { - conn.results(|| { - let mut query = dsl::packages - .inner_join(other.on(dsl::original_id.eq(other.field(dsl::original_id)))) - .select(( - dsl::package_id, - other.field(dsl::package_version), - other.field(dsl::package_id), - )) - .into_boxed(); - - for (id, version) in id_versions.iter().cloned() { - query = query.or_filter( - dsl::package_id - .eq(id) - .and(other.field(dsl::package_version).eq(version)), - ); - } + async move { + conn.results(|| { + let mut query = dsl::packages + .inner_join( + other.on(dsl::original_id.eq(other.field(dsl::original_id))), + ) + .select(( + dsl::package_id, + other.field(dsl::package_version), + other.field(dsl::package_id), + )) + .into_boxed(); + + for (id, version) in id_versions.iter().cloned() { + query = query.or_filter( + dsl::package_id + .eq(id) + .and(other.field(dsl::package_version).eq(version)), + ); + } - query - }) + query + }) + .await + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to load packages: {e}")))?; @@ -962,28 +977,33 @@ impl Loader for Db { .into_iter() .map(|(checkpoint_viewed_at, ids)| { self.execute(move |conn| { - let results: Vec<(Vec, Vec)> = conn.results(|| { - let o_original_id = other.field(dsl::original_id); - let o_package_id = other.field(dsl::package_id); - let o_cp_seq_num = other.field(dsl::checkpoint_sequence_number); - let o_version = other.field(dsl::package_version); - - let query = dsl::packages - .inner_join(other.on(dsl::original_id.eq(o_original_id))) - .select((dsl::package_id, o_package_id)) - .filter(dsl::package_id.eq_any(ids.iter().cloned())) - .filter(o_cp_seq_num.le(checkpoint_viewed_at as i64)) - .order_by((dsl::package_id, dsl::original_id, o_version.desc())) - .distinct_on((dsl::package_id, dsl::original_id)); - query - })?; - - Ok::<_, diesel::result::Error>( - results - .into_iter() - .map(|(p, latest)| (checkpoint_viewed_at, p, latest)) - .collect::>(), - ) + async move { + let results: Vec<(Vec, Vec)> = conn + .results(|| { + let o_original_id = other.field(dsl::original_id); + let o_package_id = other.field(dsl::package_id); + let o_cp_seq_num = other.field(dsl::checkpoint_sequence_number); + let o_version = other.field(dsl::package_version); + + let query = dsl::packages + .inner_join(other.on(dsl::original_id.eq(o_original_id))) + .select((dsl::package_id, o_package_id)) + .filter(dsl::package_id.eq_any(ids.iter().cloned())) + .filter(o_cp_seq_num.le(checkpoint_viewed_at as i64)) + .order_by((dsl::package_id, dsl::original_id, o_version.desc())) + .distinct_on((dsl::package_id, dsl::original_id)); + query + }) + .await?; + + Ok::<_, diesel::result::Error>( + results + .into_iter() + .map(|(p, latest)| (checkpoint_viewed_at, p, latest)) + .collect::>(), + ) + } + .scope_boxed() }) }); diff --git a/crates/sui-graphql-rpc/src/types/object.rs b/crates/sui-graphql-rpc/src/types/object.rs index eebe0b86447ed..96c7c298b7530 100644 --- a/crates/sui-graphql-rpc/src/types/object.rs +++ b/crates/sui-graphql-rpc/src/types/object.rs @@ -37,6 +37,7 @@ use async_graphql::connection::{CursorType, Edge}; use async_graphql::dataloader::Loader; use async_graphql::{connection::Connection, *}; use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; +use diesel_async::scoped_futures::ScopedFutureExt; use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout}; use move_core_types::language_storage::StructTag; use serde::{Deserialize, Serialize}; @@ -806,15 +807,22 @@ impl Object { let Some((prev, next, results)) = db .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - Ok(Some(page.paginate_raw_query::( - conn, - checkpoint_viewed_at, - objects_query(&filter, range, &page), - )?)) + async move { + let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await? + else { + return Ok::<_, diesel::result::Error>(None); + }; + + Ok(Some( + page.paginate_raw_query::( + conn, + checkpoint_viewed_at, + objects_query(&filter, range, &page), + ) + .await?, + )) + } + .scope_boxed() }) .await? else { @@ -1232,24 +1240,28 @@ impl Loader for Db { let objects: Vec = self .execute(move |conn| { - conn.results(move || { - let mut query = h::objects_history - .inner_join( - v::objects_version.on(v::cp_sequence_number - .eq(h::checkpoint_sequence_number) - .and(v::object_id.eq(h::object_id)) - .and(v::object_version.eq(h::object_version))), - ) - .select(StoredHistoryObject::as_select()) - .into_boxed(); + async { + conn.results(move || { + let mut query = h::objects_history + .inner_join( + v::objects_version.on(v::cp_sequence_number + .eq(h::checkpoint_sequence_number) + .and(v::object_id.eq(h::object_id)) + .and(v::object_version.eq(h::object_version))), + ) + .select(StoredHistoryObject::as_select()) + .into_boxed(); - for (id, version) in id_versions.iter().cloned() { - query = - query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version))); - } + for (id, version) in id_versions.iter().cloned() { + query = query + .or_filter(v::object_id.eq(id).and(v::object_version.eq(version))); + } - query - }) + query + }) + .await + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?; @@ -1321,32 +1333,37 @@ impl Loader for Db { .into_iter() .map(|(group_key, ids)| { self.execute(move |conn| { - let stored: Vec = conn.results(move || { - use objects_history::dsl as h; - use objects_version::dsl as v; - - h::objects_history - .inner_join( - v::objects_version.on(v::cp_sequence_number - .eq(h::checkpoint_sequence_number) - .and(v::object_id.eq(h::object_id)) - .and(v::object_version.eq(h::object_version))), - ) - .select(StoredHistoryObject::as_select()) - .filter(v::object_id.eq_any(ids.iter().cloned())) - .filter(v::object_version.le(group_key.parent_version as i64)) - .distinct_on(v::object_id) - .order_by(v::object_id) - .then_order_by(v::object_version.desc()) - .into_boxed() - })?; - - Ok::<_, diesel::result::Error>( - stored - .into_iter() - .map(|stored| (group_key, stored)) - .collect::>(), - ) + async move { + let stored: Vec = conn + .results(move || { + use objects_history::dsl as h; + use objects_version::dsl as v; + + h::objects_history + .inner_join( + v::objects_version.on(v::cp_sequence_number + .eq(h::checkpoint_sequence_number) + .and(v::object_id.eq(h::object_id)) + .and(v::object_version.eq(h::object_version))), + ) + .select(StoredHistoryObject::as_select()) + .filter(v::object_id.eq_any(ids.iter().cloned())) + .filter(v::object_version.le(group_key.parent_version as i64)) + .distinct_on(v::object_id) + .order_by(v::object_id) + .then_order_by(v::object_version.desc()) + .into_boxed() + }) + .await?; + + Ok::<_, diesel::result::Error>( + stored + .into_iter() + .map(|stored| (group_key, stored)) + .collect::>(), + ) + } + .scope_boxed() }) }); @@ -1408,32 +1425,37 @@ impl Loader for Db { .into_iter() .map(|(checkpoint_viewed_at, ids)| { self.execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? - else { - return Ok::, diesel::result::Error>( - vec![], - ); - }; - - let filter = ObjectFilter { - object_ids: Some(ids.iter().cloned().collect()), - ..Default::default() - }; - - Ok(conn - .results(move || { - build_objects_query( - View::Consistent, - range, - &Page::bounded(ids.len() as u64), - |q| filter.apply(q), - |q| q, - ) - .into_boxed() - })? - .into_iter() - .map(|r| (checkpoint_viewed_at, r)) - .collect()) + async move { + let Some(range) = + AvailableRange::result(conn, checkpoint_viewed_at).await? + else { + return Ok::, diesel::result::Error>( + vec![], + ); + }; + + let filter = ObjectFilter { + object_ids: Some(ids.iter().cloned().collect()), + ..Default::default() + }; + + Ok(conn + .results(move || { + build_objects_query( + View::Consistent, + range, + &Page::bounded(ids.len() as u64), + |q| filter.apply(q), + |q| q, + ) + .into_boxed() + }) + .await? + .into_iter() + .map(|r| (checkpoint_viewed_at, r)) + .collect()) + } + .scope_boxed() }) }); diff --git a/crates/sui-graphql-rpc/src/types/protocol_config.rs b/crates/sui-graphql-rpc/src/types/protocol_config.rs index f4f8fd9caf771..9eacf9e4183b6 100644 --- a/crates/sui-graphql-rpc/src/types/protocol_config.rs +++ b/crates/sui-graphql-rpc/src/types/protocol_config.rs @@ -5,6 +5,7 @@ use std::collections::BTreeMap; use async_graphql::*; use diesel::{ExpressionMethods, QueryDsl}; +use diesel_async::scoped_futures::ScopedFutureExt; use sui_indexer::schema::{epochs, feature_flags, protocol_configs}; use crate::{ @@ -94,11 +95,15 @@ impl ProtocolConfigs { } else { let latest_version: i64 = db .execute(move |conn| { - conn.first(move || { - e::epochs - .select(e::protocol_version) - .order_by(e::epoch.desc()) - }) + async move { + conn.first(move || { + e::epochs + .select(e::protocol_version) + .order_by(e::epoch.desc()) + }) + .await + } + .scope_boxed() }) .await .map_err(|e| { @@ -112,11 +117,15 @@ impl ProtocolConfigs { // TODO: This could be optimized by fetching all configs and flags in a single query. let configs: BTreeMap> = db .execute(move |conn| { - conn.results(move || { - p::protocol_configs - .select((p::config_name, p::config_value)) - .filter(p::protocol_version.eq(version as i64)) - }) + async move { + conn.results(move || { + p::protocol_configs + .select((p::config_name, p::config_value)) + .filter(p::protocol_version.eq(version as i64)) + }) + .await + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch protocol configs in db: {e}")))? @@ -125,11 +134,15 @@ impl ProtocolConfigs { let feature_flags: BTreeMap = db .execute(move |conn| { - conn.results(move || { - f::feature_flags - .select((f::flag_name, f::flag_value)) - .filter(f::protocol_version.eq(version as i64)) - }) + async move { + conn.results(move || { + f::feature_flags + .select((f::flag_name, f::flag_value)) + .filter(f::protocol_version.eq(version as i64)) + }) + .await + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch feature flags in db: {e}")))? diff --git a/crates/sui-graphql-rpc/src/types/suins_registration.rs b/crates/sui-graphql-rpc/src/types/suins_registration.rs index e7391a258346e..f3dfa9c368e54 100644 --- a/crates/sui-graphql-rpc/src/types/suins_registration.rs +++ b/crates/sui-graphql-rpc/src/types/suins_registration.rs @@ -31,6 +31,7 @@ use crate::{ error::Error, }; use async_graphql::{connection::Connection, *}; +use diesel_async::scoped_futures::ScopedFutureExt; use move_core_types::{ident_str, identifier::IdentStr, language_storage::StructTag}; use serde::{Deserialize, Serialize}; use sui_indexer::models::objects::StoredHistoryObject; @@ -495,24 +496,29 @@ impl NameService { let Some((checkpoint_timestamp_ms, results)) = db .execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else { - return Ok::<_, diesel::result::Error>(None); - }; - - let timestamp_ms = Checkpoint::query_timestamp(conn, checkpoint_viewed_at)?; - - let sql = build_objects_query( - View::Consistent, - range, - &page, - move |query| filter.apply(query), - move |newer| newer, - ); - - let objects: Vec = - conn.results(move || sql.clone().into_boxed())?; - - Ok(Some((timestamp_ms, objects))) + async move { + let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await? + else { + return Ok::<_, diesel::result::Error>(None); + }; + + let timestamp_ms = + Checkpoint::query_timestamp(conn, checkpoint_viewed_at).await?; + + let sql = build_objects_query( + View::Consistent, + range, + &page, + move |query| filter.apply(query), + move |newer| newer, + ); + + let objects: Vec = + conn.results(move || sql.clone().into_boxed()).await?; + + Ok(Some((timestamp_ms, objects))) + } + .scope_boxed() }) .await? else { diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs index 1573fe97cfeab..b03a9e3cd7cd9 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs @@ -23,6 +23,7 @@ use async_graphql::{connection::CursorType, dataloader::Loader, *}; use connection::Edge; use cursor::TxLookup; use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; +use diesel_async::scoped_futures::ScopedFutureExt; use fastcrypto::encoding::{Base58, Encoding}; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap}; @@ -333,56 +334,70 @@ impl TransactionBlock { Option, ) = db .execute_repeatable(move |conn| { - let Some(tx_bounds) = TxBounds::query( - conn, - filter.after_checkpoint.map(u64::from), - filter.at_checkpoint.map(u64::from), - filter.before_checkpoint.map(u64::from), - checkpoint_viewed_at, - scan_limit, - &page, - )? - else { - return Ok::<_, diesel::result::Error>((false, false, Vec::new(), None)); - }; - - // If no filters are selected, or if the filter is composed of only checkpoint - // filters, we can directly query the main `transactions` table. Otherwise, we first - // fetch the set of `tx_sequence_number` from a join over relevant lookup tables, - // and then issue a query against the `transactions` table to fetch the remaining - // contents. - let (prev, next, transactions) = if !filter.has_filters() { - let (prev, next, iter) = page.paginate_query::( + async move { + let Some(tx_bounds) = TxBounds::query( conn, + filter.after_checkpoint.map(u64::from), + filter.at_checkpoint.map(u64::from), + filter.before_checkpoint.map(u64::from), checkpoint_viewed_at, - move || { - tx::transactions - .filter(tx::tx_sequence_number.ge(tx_bounds.scan_lo() as i64)) - .filter(tx::tx_sequence_number.lt(tx_bounds.scan_hi() as i64)) - .into_boxed() - }, - )?; - - (prev, next, iter.collect()) - } else { - let subquery = subqueries(&filter, tx_bounds).unwrap(); - let (prev, next, results) = - page.paginate_raw_query::(conn, checkpoint_viewed_at, subquery)?; - - let tx_sequence_numbers = results - .into_iter() - .map(|x| x.tx_sequence_number) - .collect::>(); - - let transactions = conn.results(move || { - tx::transactions - .filter(tx::tx_sequence_number.eq_any(tx_sequence_numbers.clone())) - })?; - - (prev, next, transactions) - }; - - Ok::<_, diesel::result::Error>((prev, next, transactions, Some(tx_bounds))) + scan_limit, + &page, + ) + .await? + else { + return Ok::<_, diesel::result::Error>((false, false, Vec::new(), None)); + }; + + // If no filters are selected, or if the filter is composed of only checkpoint + // filters, we can directly query the main `transactions` table. Otherwise, we first + // fetch the set of `tx_sequence_number` from a join over relevant lookup tables, + // and then issue a query against the `transactions` table to fetch the remaining + // contents. + let (prev, next, transactions) = if !filter.has_filters() { + let (prev, next, iter) = page + .paginate_query::( + conn, + checkpoint_viewed_at, + move || { + tx::transactions + .filter( + tx::tx_sequence_number.ge(tx_bounds.scan_lo() as i64), + ) + .filter( + tx::tx_sequence_number.lt(tx_bounds.scan_hi() as i64), + ) + .into_boxed() + }, + ) + .await?; + + (prev, next, iter.collect()) + } else { + let subquery = subqueries(&filter, tx_bounds).unwrap(); + let (prev, next, results) = page + .paginate_raw_query::(conn, checkpoint_viewed_at, subquery) + .await?; + + let tx_sequence_numbers = results + .into_iter() + .map(|x| x.tx_sequence_number) + .collect::>(); + + let transactions = conn + .results(move || { + tx::transactions.filter( + tx::tx_sequence_number.eq_any(tx_sequence_numbers.clone()), + ) + }) + .await?; + + (prev, next, transactions) + }; + + Ok::<_, diesel::result::Error>((prev, next, transactions, Some(tx_bounds))) + } + .scope_boxed() }) .await?; @@ -431,14 +446,18 @@ impl Loader for Db { let transactions: Vec = self .execute(move |conn| { - conn.results(move || { - let join = ds::tx_sequence_number.eq(tx::tx_sequence_number); - - tx::transactions - .inner_join(ds::tx_digests.on(join)) - .select(StoredTransaction::as_select()) - .filter(ds::tx_digest.eq_any(digests.clone())) - }) + async move { + conn.results(move || { + let join = ds::tx_sequence_number.eq(tx::tx_sequence_number); + + tx::transactions + .inner_join(ds::tx_digests.on(join)) + .select(StoredTransaction::as_select()) + .filter(ds::tx_digest.eq_any(digests.clone())) + }) + .await + } + .scope_boxed() }) .await .map_err(|e| Error::Internal(format!("Failed to fetch transactions: {e}")))?; diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs index 8077f4b4d5da4..940df4309a2ec 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs @@ -82,8 +82,8 @@ impl TxBounds { /// `checkpoint_viewed_at`. The corresponding `tx_sequence_number` range is fetched from db, and /// further adjusted by cursors and scan limit. If there are any inconsistencies or invalid /// combinations, i.e. `after` cursor is greater than the upper bound, return None. - pub(crate) fn query( - conn: &mut Conn, + pub(crate) async fn query( + conn: &mut Conn<'_>, cp_after: Option, cp_at: Option, cp_before: Option, @@ -111,12 +111,14 @@ impl TxBounds { use checkpoints::dsl; let (tx_lo, tx_hi) = if let Some(cp_prev) = cp_lo.checked_sub(1) { - let res: Vec = conn.results(move || { - dsl::checkpoints - .select(dsl::network_total_transactions) - .filter(dsl::sequence_number.eq_any([cp_prev as i64, cp_hi as i64])) - .order_by(dsl::network_total_transactions.asc()) - })?; + let res: Vec = conn + .results(move || { + dsl::checkpoints + .select(dsl::network_total_transactions) + .filter(dsl::sequence_number.eq_any([cp_prev as i64, cp_hi as i64])) + .order_by(dsl::network_total_transactions.asc()) + }) + .await?; // If there are not two distinct results, it means that the transaction bounds are // empty (lo and hi are the same), or it means that the one or other of the checkpoints @@ -133,6 +135,7 @@ impl TxBounds { .select(dsl::network_total_transactions) .filter(dsl::sequence_number.eq(cp_hi as i64)) }) + .await .optional()?; // If there is no result, it means that the checkpoint doesn't exist, so we can return diff --git a/crates/sui-indexer/src/indexer_reader.rs b/crates/sui-indexer/src/indexer_reader.rs index 1775fc5496f5e..c69e24e4bba56 100644 --- a/crates/sui-indexer/src/indexer_reader.rs +++ b/crates/sui-indexer/src/indexer_reader.rs @@ -133,9 +133,13 @@ impl IndexerReader { .expect("propagate any panics") } - pub fn get_pool(&self) -> BlockingConnectionPool { + pub fn get_blocking_pool(&self) -> BlockingConnectionPool { self.blocking_pool.clone() } + + pub fn pool(&self) -> &ConnectionPool { + &self.pool + } } // Impl for reading data from the DB