Skip to content

Commit

Permalink
enh(tables): timeout for SQLite queries (#3398)
Browse files Browse the repository at this point in the history
Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Jan 23, 2024
1 parent d622065 commit a355571
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
7 changes: 6 additions & 1 deletion core/bin/sqlite_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use tracing::Level;
// Duration after which a database is considered inactive and can be removed from the registry.
const DATABASE_TIMEOUT_DURATION: Duration = std::time::Duration::from_secs(5 * 60); // 5 minutes

// Default number of milliseconds after which a query execution is considered timed out.
const DEFAULT_QUERY_TIMEOUT_MS: u64 = 10_000;

struct DatabaseEntry {
database: Arc<Mutex<SqliteDatabase>>,
last_accessed: Instant,
Expand Down Expand Up @@ -160,6 +163,7 @@ async fn index() -> &'static str {
struct DbQueryPayload {
query: String,
tables: Vec<Table>,
timeout_ms: Option<u64>,
}

async fn databases_query(
Expand All @@ -178,6 +182,7 @@ async fn databases_query(
entry.last_accessed = Instant::now();
entry.database.clone()
};
let timeout = payload.timeout_ms.unwrap_or(DEFAULT_QUERY_TIMEOUT_MS);

let mut guard = database.lock().await;

Expand All @@ -196,7 +201,7 @@ async fn databases_query(
}
}

match guard.query(&payload.query).await {
match guard.query(&payload.query, timeout).await {
Ok(results) => (
axum::http::StatusCode::OK,
Json(APIResponse {
Expand Down
37 changes: 27 additions & 10 deletions core/src/sqlite_workers/sqlite_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@ use anyhow::{anyhow, Result};
use futures::future::try_join_all;
use parking_lot::Mutex;
use rayon::prelude::*;
use rusqlite::{params_from_iter, Connection};
use tokio::task;
use rusqlite::{params_from_iter, Connection, InterruptHandle};
use tokio::{task, time::timeout};

#[derive(Clone)]
pub struct SqliteDatabase {
conn: Option<Arc<Mutex<Connection>>>,
interrupt_handle: Option<Arc<Mutex<InterruptHandle>>>,
}

impl SqliteDatabase {
pub fn new() -> Self {
Self { conn: None }
Self {
conn: None,
interrupt_handle: None,
}
}

pub async fn init(
Expand All @@ -30,20 +34,21 @@ impl SqliteDatabase {
match &self.conn {
Some(_) => Ok(()),
None => {
self.conn = Some(Arc::new(Mutex::new(
create_in_memory_sqlite_db(databases_store, tables).await?,
)));
let conn = create_in_memory_sqlite_db(databases_store, tables).await?;
let interrupt_handle = conn.get_interrupt_handle();
self.conn = Some(Arc::new(Mutex::new(conn)));
self.interrupt_handle = Some(Arc::new(Mutex::new(interrupt_handle)));

Ok(())
}
}
}

pub async fn query(&self, query: &str) -> Result<Vec<QueryResult>> {
pub async fn query(&self, query: &str, timeout_ms: u64) -> Result<Vec<QueryResult>> {
let query = query.to_string();
let conn = self.conn.clone();

task::spawn_blocking(move || {
let query_future = task::spawn_blocking(move || {
let conn = match conn {
Some(conn) => conn.clone(),
None => Err(anyhow!("Database not initialized"))?,
Expand Down Expand Up @@ -117,8 +122,20 @@ impl SqliteDatabase {
));

Ok(result_rows)
})
.await?
});

match timeout(std::time::Duration::from_millis(timeout_ms), query_future).await {
Ok(r) => r?,
Err(_) => {
let interrupt_handle = match &self.interrupt_handle {
Some(interrupt_handle) => interrupt_handle.clone(),
None => Err(anyhow!("Database not initialized"))?,
};
let interrupt_handle = interrupt_handle.lock();
interrupt_handle.interrupt();
Err(anyhow!("Query execution timed out after {}ms", timeout_ms))?
}
}
}
}

Expand Down

0 comments on commit a355571

Please sign in to comment.