Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graphql: migrate graphql to use async connections #19234

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-graphql-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ chrono.workspace = true
clap.workspace = true
const-str.workspace = true
diesel = { workspace = true, features = ["i-implement-a-third-party-backend-and-opt-into-breaking-changes"] }
diesel-async = { workspace = true, features = ["postgres"] }
either.workspace = true
fastcrypto = { workspace = true, features = ["copy_key"] }
fastcrypto-zkp.workspace = true
Expand Down
51 changes: 34 additions & 17 deletions crates/sui-graphql-rpc/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use async_graphql::dataloader::DataLoader as AGDataLoader;
use async_trait::async_trait;
use diesel::{
query_builder::{BoxedSelectStatement, FromClause, QueryFragment, QueryId},
query_dsl::{methods::LimitDsl, LoadQuery},
query_dsl::methods::LimitDsl,
QueryResult,
};

use diesel_async::{methods::LoadQuery, scoped_futures::ScopedBoxFuture};

use crate::error::Error;

/// Database Backend in use -- abstracting a specific implementation.
Expand Down Expand Up @@ -45,17 +47,21 @@ pub(crate) type Query<ST, QS, GB> =
#[async_trait]
pub(crate) trait QueryExecutor {
type Backend: diesel::backend::Backend;
type Connection: diesel::Connection;
type Connection: diesel_async::AsyncConnection;

type DbConnection<'c>: DbConnection<Connection = Self::Connection, Backend = Self::Backend>
where
Self: 'c;

/// Execute `txn` with read committed isolation. `txn` is supplied a database connection to
/// issue queries over.
async fn execute<T, U, E>(&self, txn: T) -> Result<U, Error>
async fn execute<'c, T, U, E>(&self, txn: T) -> Result<U, Error>
where
T: FnOnce(&mut Self::DbConnection<'_>) -> Result<U, E>,
T: for<'r> FnOnce(
&'r mut Self::DbConnection<'_>,
) -> ScopedBoxFuture<'static, 'r, Result<U, E>>
+ Send
+ 'c,
E: From<diesel::result::Error> + std::error::Error,
T: Send + 'static,
U: Send + 'static,
Expand All @@ -64,44 +70,55 @@ pub(crate) trait QueryExecutor {
/// Execute `txn` with repeatable reads and no phantom reads -- multiple calls to the same query
/// should produce the same results. `txn` is supplied a database connection to issue queries
/// over.
async fn execute_repeatable<T, U, E>(&self, txn: T) -> Result<U, Error>
async fn execute_repeatable<'c, T, U, E>(&self, txn: T) -> Result<U, Error>
where
T: FnOnce(&mut Self::DbConnection<'_>) -> Result<U, E>,
T: for<'r> FnOnce(
&'r mut Self::DbConnection<'_>,
) -> ScopedBoxFuture<'static, 'r, Result<U, E>>
+ Send
+ 'c,
E: From<diesel::result::Error> + std::error::Error,
T: Send + 'static,
U: Send + 'static,
E: Send + 'static;
}

#[async_trait]
pub(crate) trait DbConnection {
type Backend: diesel::backend::Backend;
type Connection: diesel::Connection<Backend = Self::Backend>;
type Connection: diesel_async::AsyncConnection<Backend = Self::Backend>;

/// Run a query that fetches a single value. `query` is a thunk that returns a query when
/// called.
fn result<Q, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<U>
async fn result<T, Q, U>(&mut self, query: T) -> QueryResult<U>
where
Q: diesel::query_builder::Query,
T: Fn() -> Q + Send,
Q: diesel::query_builder::Query + Send + 'static,
Q: LoadQuery<'static, Self::Connection, U>,
Q: QueryId + QueryFragment<Self::Backend>;
Q: QueryId + QueryFragment<Self::Backend>,
U: Send;

/// Run a query that fetches multiple values. `query` is a thunk that returns a query when
/// called.
fn results<Q, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<Vec<U>>
async fn results<T, Q, U>(&mut self, query: T) -> QueryResult<Vec<U>>
where
Q: diesel::query_builder::Query,
T: Fn() -> Q + Send,
Q: diesel::query_builder::Query + Send + 'static,
Q: LoadQuery<'static, Self::Connection, U>,
Q: QueryId + QueryFragment<Self::Backend>;
Q: QueryId + QueryFragment<Self::Backend>,
U: Send;

/// Helper to limit a query that fetches multiple values to return only its first value. `query`
/// is a thunk that returns a query when called.
fn first<Q: LimitDsl, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<U>
async fn first<T, Q: LimitDsl, U>(&mut self, query: T) -> QueryResult<U>
where
<Q as LimitDsl>::Output: diesel::query_builder::Query,
T: Fn() -> Q + Send,
<Q as LimitDsl>::Output: diesel::query_builder::Query + Send,
<Q as LimitDsl>::Output: LoadQuery<'static, Self::Connection, U>,
<Q as LimitDsl>::Output: QueryId + QueryFragment<Self::Backend>,
<Q as LimitDsl>::Output: QueryId + QueryFragment<Self::Backend> + 'static,
U: Send,
{
self.result(move || query().limit(1i64))
self.result(move || query().limit(1i64)).await
}
}

Expand Down
11 changes: 8 additions & 3 deletions crates/sui-graphql-rpc/src/data/package_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use async_graphql::dataloader::Loader;
use async_trait::async_trait;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::scoped_futures::ScopedFutureExt;
use move_core_types::account_address::AccountAddress;
use sui_indexer::models::packages::StoredPackage;
use sui_indexer::schema::packages;
Expand Down Expand Up @@ -59,9 +60,13 @@ impl Loader<PackageKey> for Db {
let ids: BTreeSet<_> = keys.iter().map(|PackageKey(id)| id.to_vec()).collect();
let stored_packages: Vec<StoredPackage> = self
.execute(move |conn| {
conn.results(move || {
dsl::packages.filter(dsl::package_id.eq_any(ids.iter().cloned()))
})
async move {
conn.results(move || {
dsl::packages.filter(dsl::package_id.eq_any(ids.iter().cloned()))
})
.await
}
.scope_boxed()
})
Comment on lines 62 to 70
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to have the transactional API execute, execute_repeatable etc accept a callback that it can call scope_boxed on? so that this could become:

            .execute(|conn| async move {
                conn.results(move || {
                    dsl::packages.filter(dsl::package_id.eq_any(ids.iter().cloned()))
                })
                .await
            })

If we can't elide it like that, I'm generally curious to learn more about scope_boxed and when we should and shouldn't use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried for a while to not require this as a part of the trait api but kept hitting lifetime issues bounding the lifetime of the produced future from the provided closure in order to meet the needs of the transaction api diesel-async required. Once i finish this migration i'll take another pass back at this to see if it can be simplified and not require the explicit bounding with scope_boxed

.await
.map_err(|e| PackageResolverError::Store {
Expand Down
Loading
Loading