Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ add order_by and descending options to scan and fetch_all queries #291

Merged
merged 15 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions askar-storage/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc};

use super::{Backend, BackendSession, ManageBackend};
use crate::{
backend::OrderBy,
entry::{Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
error::Error,
future::BoxFuture,
Expand Down Expand Up @@ -72,9 +73,12 @@ impl<B: Backend> Backend for WrapBackend<B> {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
self.0
.scan(profile, kind, category, tag_filter, offset, limit)
self.0.scan(
profile, kind, category, tag_filter, offset, limit, order_by, descending,
)
}

#[inline]
Expand Down Expand Up @@ -142,9 +146,12 @@ impl Backend for AnyBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
self.0
.scan(profile, kind, category, tag_filter, offset, limit)
self.0.scan(
profile, kind, category, tag_filter, offset, limit, order_by, descending,
)
}

#[inline]
Expand Down Expand Up @@ -207,10 +214,13 @@ impl BackendSession for AnyBackendSession {
category: Option<&'q str>,
tag_filter: Option<TagFilter>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
for_update: bool,
) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
self.0
.fetch_all(kind, category, tag_filter, limit, for_update)
self.0.fetch_all(
kind, category, tag_filter, limit, order_by, descending, for_update,
)
}

/// Remove all matching records from the store
Expand Down
28 changes: 25 additions & 3 deletions askar-storage/src/backend/db_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use crate::{
},
};

use super::OrderBy;

/// cbindgen:ignore
pub const PAGE_SIZE: usize = 32;

Expand Down Expand Up @@ -453,6 +455,17 @@ pub trait QueryPrepare {
}
query
}

fn order_by_query<'q>(mut query: String, order_by: OrderBy, descending: bool) -> String {
query.push_str(" ORDER BY ");
match order_by {
OrderBy::Id => query.push_str("id"),
}
if descending {
query.push_str(" DESC");
}
query
}
}

pub fn replace_arg_placeholders<Q: QueryPrepare + ?Sized>(
Expand Down Expand Up @@ -625,6 +638,8 @@ pub fn extend_query<'q, Q: QueryPrepare>(
tag_filter: Option<(String, Vec<Vec<u8>>)>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> Result<String, Error>
where
i64: for<'e> Encode<'e, Q::DB> + Type<Q::DB>,
Expand All @@ -636,9 +651,16 @@ where
query.push_str(" AND "); // assumes WHERE already occurs
query.push_str(&filter_clause);
};
if offset.is_some() || limit.is_some() {
query = Q::limit_query(query, args, offset, limit);
};
// Only add ordering, and limit/offset, if the query starts with SELECT
if query.trim_start().to_uppercase().starts_with("SELECT") {
if let Some(order_by_value) = order_by {
query = Q::order_by_query(query, order_by_value, descending);
};

if offset.is_some() || limit.is_some() {
query = Q::limit_query(query, args, offset, limit);
};
}
Ok(query)
}

Expand Down
27 changes: 26 additions & 1 deletion askar-storage/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ pub mod postgres;
/// Sqlite database support
pub mod sqlite;

/// Enum to support custom ordering in record queries
#[derive(Debug)]
pub enum OrderBy {
/// Order by ID field
Id,
}

impl Default for OrderBy {
fn default() -> Self {
OrderBy::Id
}
}
/// Represents a generic backend implementation
pub trait Backend: Debug + Send + Sync {
/// The type of session managed by this backend
Expand Down Expand Up @@ -54,6 +66,8 @@ pub trait Backend: Debug + Send + Sync {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>>;

/// Create a new session against the store
Expand Down Expand Up @@ -122,6 +136,8 @@ pub trait BackendSession: Debug + Send {
category: Option<&'q str>,
tag_filter: Option<TagFilter>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
for_update: bool,
) -> BoxFuture<'q, Result<Vec<Entry>, Error>>;

Expand Down Expand Up @@ -185,7 +201,16 @@ pub async fn copy_profile<A: Backend, B: Backend>(
to_profile: &str,
) -> Result<(), Error> {
let scan = from_backend
.scan(Some(from_profile.into()), None, None, None, None, None)
.scan(
Some(from_profile.into()),
None,
None,
None,
None,
None,
None,
false,
)
.await?;
if let Err(e) = to_backend.create_profile(Some(to_profile.into())).await {
if e.kind() != ErrorKind::Duplicate {
Expand Down
26 changes: 23 additions & 3 deletions askar-storage/src/backend/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::{
Backend, BackendSession,
};
use crate::{
backend::OrderBy,
entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
error::Error,
future::{unblock, BoxFuture},
Expand Down Expand Up @@ -268,6 +269,8 @@ impl Backend for PostgresBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
Box::pin(async move {
let session = self.session(profile, false)?;
Expand All @@ -282,6 +285,8 @@ impl Backend for PostgresBackend {
tag_filter,
offset,
limit,
order_by,
descending,
false,
);
let stream = scan.then(move |enc_rows| {
Expand Down Expand Up @@ -347,8 +352,15 @@ impl BackendSession for DbSession<Postgres> {
})
.await?;
params.push(enc_category);
let query =
extend_query::<PostgresBackend>(COUNT_QUERY, &mut params, tag_filter, None, None)?;
let query = extend_query::<PostgresBackend>(
COUNT_QUERY,
&mut params,
tag_filter,
None,
None,
None,
false,
)?;
let mut active = acquire_session(&mut *self).await?;
let count = sqlx::query_scalar_with(query.as_str(), params)
.fetch_one(active.connection_mut())
Expand Down Expand Up @@ -424,6 +436,8 @@ impl BackendSession for DbSession<Postgres> {
category: Option<&'q str>,
tag_filter: Option<TagFilter>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
for_update: bool,
) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
let category = category.map(|c| c.to_string());
Expand All @@ -440,6 +454,8 @@ impl BackendSession for DbSession<Postgres> {
tag_filter,
None,
limit,
order_by,
descending,
for_update,
);
pin!(scan);
Expand Down Expand Up @@ -483,6 +499,8 @@ impl BackendSession for DbSession<Postgres> {
tag_filter,
None,
None,
None,
false,
)?;

let mut active = acquire_session(&mut *self).await?;
Expand Down Expand Up @@ -752,6 +770,8 @@ fn perform_scan(
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
for_update: bool,
) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
try_stream! {
Expand All @@ -772,7 +792,7 @@ fn perform_scan(
}
}).await?;
params.push(enc_category);
let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit)?;
let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
if for_update {
query.push_str(" FOR NO KEY UPDATE");
}
Expand Down
26 changes: 23 additions & 3 deletions askar-storage/src/backend/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::{
Backend, BackendSession,
};
use crate::{
backend::OrderBy,
entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
error::Error,
future::{unblock, BoxFuture},
Expand Down Expand Up @@ -262,6 +263,8 @@ impl Backend for SqliteBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
Box::pin(async move {
let session = self.session(profile, false)?;
Expand All @@ -276,6 +279,8 @@ impl Backend for SqliteBackend {
tag_filter,
offset,
limit,
order_by,
descending,
);
let stream = scan.then(move |enc_rows| {
let category = category.clone();
Expand Down Expand Up @@ -330,8 +335,15 @@ impl BackendSession for DbSession<Sqlite> {
})
.await?;
params.push(enc_category);
let query =
extend_query::<SqliteBackend>(COUNT_QUERY, &mut params, tag_filter, None, None)?;
let query = extend_query::<SqliteBackend>(
COUNT_QUERY,
&mut params,
tag_filter,
None,
None,
None,
false,
)?;
let mut active = acquire_session(&mut *self).await?;
let count = sqlx::query_scalar_with(query.as_str(), params)
.fetch_one(active.connection_mut())
Expand Down Expand Up @@ -398,6 +410,8 @@ impl BackendSession for DbSession<Sqlite> {
category: Option<&'q str>,
tag_filter: Option<TagFilter>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
_for_update: bool,
) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
let category = category.map(|c| c.to_string());
Expand All @@ -413,6 +427,8 @@ impl BackendSession for DbSession<Sqlite> {
tag_filter,
None,
limit,
order_by,
descending,
);
pin!(scan);
let mut enc_rows = vec![];
Expand Down Expand Up @@ -455,6 +471,8 @@ impl BackendSession for DbSession<Sqlite> {
tag_filter,
None,
None,
None,
false,
)?;

let mut active = acquire_session(&mut *self).await?;
Expand Down Expand Up @@ -703,6 +721,8 @@ fn perform_scan(
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
try_stream! {
let mut params = QueryParams::new();
Expand All @@ -720,7 +740,7 @@ fn perform_scan(
}
}).await?;
params.push(enc_category);
let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit)?;
let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;

let mut batch = Vec::with_capacity(PAGE_SIZE);

Expand Down
Loading
Loading