diff --git a/.gitignore b/.gitignore index 861fce8..86281ff 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,4 @@ target/ # We will check in all code-generated entity files, as newer versions of `sea-orm-cli` might # conflict with previous versions. -# **/entities \ No newline at end of file +# **/entities diff --git a/optd-mvp/src/entities/fingerprint.rs b/optd-mvp/src/entities/fingerprint.rs new file mode 100644 index 0000000..2ab6a7f --- /dev/null +++ b/optd-mvp/src/entities/fingerprint.rs @@ -0,0 +1,33 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "fingerprint")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub logical_expression_id: i32, + pub kind: i16, + pub hash: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::logical_expression::Entity", + from = "Column::LogicalExpressionId", + to = "super::logical_expression::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + LogicalExpression, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::LogicalExpression.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/optd-mvp/src/entities/logical_expression.rs b/optd-mvp/src/entities/logical_expression.rs index 1e85d1d..4c257f3 100644 --- a/optd-mvp/src/entities/logical_expression.rs +++ b/optd-mvp/src/entities/logical_expression.rs @@ -8,7 +8,6 @@ pub struct Model { #[sea_orm(primary_key)] pub id: i32, pub group_id: i32, - pub fingerprint: i64, pub kind: i16, pub data: Json, } @@ -23,10 +22,18 @@ pub enum Relation { on_delete = "Cascade" )] CascadesGroup, + #[sea_orm(has_many = "super::fingerprint::Entity")] + Fingerprint, #[sea_orm(has_many = "super::logical_children::Entity")] LogicalChildren, } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Fingerprint.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::LogicalChildren.def() diff --git a/optd-mvp/src/entities/mod.rs b/optd-mvp/src/entities/mod.rs index 701abe4..77d6b2c 100644 --- a/optd-mvp/src/entities/mod.rs +++ b/optd-mvp/src/entities/mod.rs @@ -3,6 +3,7 @@ pub mod prelude; pub mod cascades_group; +pub mod fingerprint; pub mod logical_children; pub mod logical_expression; pub mod physical_children; diff --git a/optd-mvp/src/entities/physical_expression.rs b/optd-mvp/src/entities/physical_expression.rs index 2d9a2ae..482227a 100644 --- a/optd-mvp/src/entities/physical_expression.rs +++ b/optd-mvp/src/entities/physical_expression.rs @@ -8,7 +8,6 @@ pub struct Model { #[sea_orm(primary_key)] pub id: i32, pub group_id: i32, - pub fingerprint: i64, pub kind: i16, pub data: Json, } diff --git a/optd-mvp/src/entities/prelude.rs b/optd-mvp/src/entities/prelude.rs index 0b8c910..5619363 100644 --- a/optd-mvp/src/entities/prelude.rs +++ b/optd-mvp/src/entities/prelude.rs @@ -1,8 +1,7 @@ //! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 -#![allow(unused_imports)] - pub use super::cascades_group::Entity as CascadesGroup; +pub use super::fingerprint::Entity as Fingerprint; pub use super::logical_children::Entity as LogicalChildren; pub use super::logical_expression::Entity as LogicalExpression; pub use super::physical_children::Entity as PhysicalChildren; diff --git a/optd-mvp/src/expression/logical_expression.rs b/optd-mvp/src/expression/logical_expression.rs new file mode 100644 index 0000000..9078113 --- /dev/null +++ b/optd-mvp/src/expression/logical_expression.rs @@ -0,0 +1,114 @@ +//! Definition of logical expressions / relations in the Cascades query optimization framework. +//! +//! FIXME: All fields are placeholders, and group IDs are just represented as i32 for now. +//! +//! TODO figure out if each relation should be in a different submodule. + +use crate::entities::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug)] +pub enum LogicalExpression { + Scan(Scan), + Filter(Filter), + Join(Join), +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Scan { + table_schema: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Filter { + child: i32, + expression: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Join { + left: i32, + right: i32, + expression: String, +} + +/// TODO Use a macro instead. +impl From for LogicalExpression { + fn from(value: logical_expression::Model) -> Self { + match value.kind { + 0 => Self::Scan( + serde_json::from_value(value.data) + .expect("unable to deserialize data into a logical `Scan`"), + ), + 1 => Self::Filter( + serde_json::from_value(value.data) + .expect("Unable to deserialize data into a logical `Filter`"), + ), + 2 => Self::Join( + serde_json::from_value(value.data) + .expect("Unable to deserialize data into a logical `Join`"), + ), + _ => panic!(), + } + } +} + +/// TODO Use a macro instead. +impl From for logical_expression::Model { + fn from(value: LogicalExpression) -> logical_expression::Model { + fn create_logical_expression( + kind: i16, + data: serde_json::Value, + ) -> logical_expression::Model { + logical_expression::Model { + id: -1, + group_id: -1, + kind, + data, + } + } + + match value { + LogicalExpression::Scan(scan) => create_logical_expression( + 0, + serde_json::to_value(scan).expect("unable to serialize logical `Scan`"), + ), + LogicalExpression::Filter(filter) => create_logical_expression( + 1, + serde_json::to_value(filter).expect("unable to serialize logical `Filter`"), + ), + LogicalExpression::Join(join) => create_logical_expression( + 2, + serde_json::to_value(join).expect("unable to serialize logical `Join`"), + ), + } + } +} + +#[cfg(test)] +pub use build::*; + +#[cfg(test)] +mod build { + use super::*; + use crate::expression::Expression; + + pub fn scan(table_schema: String) -> Expression { + Expression::Logical(LogicalExpression::Scan(Scan { table_schema })) + } + + pub fn filter(child_group: i32, expression: String) -> Expression { + Expression::Logical(LogicalExpression::Filter(Filter { + child: child_group, + expression, + })) + } + + pub fn join(left_group: i32, right_group: i32, expression: String) -> Expression { + Expression::Logical(LogicalExpression::Join(Join { + left: left_group, + right: right_group, + expression, + })) + } +} diff --git a/optd-mvp/src/expression/mod.rs b/optd-mvp/src/expression/mod.rs new file mode 100644 index 0000000..459e13b --- /dev/null +++ b/optd-mvp/src/expression/mod.rs @@ -0,0 +1,62 @@ +//! In-memory representation of Cascades logical and physical expression / operators / relations. +//! +//! TODO more docs. + +mod logical_expression; +pub use logical_expression::*; + +mod physical_expression; +pub use physical_expression::*; + +/// The representation of a Cascades expression. +/// +/// TODO more docs. +#[derive(Clone, Debug)] +pub enum Expression { + Logical(LogicalExpression), + Physical(PhysicalExpression), +} + +/// Converts the database / JSON representation of a logical expression into an in-memory one. +impl From for Expression { + fn from(value: crate::entities::logical_expression::Model) -> Self { + Self::Logical(value.into()) + } +} + +/// Converts the in-memory representation of a logical expression into the database / JSON version. +/// +/// # Panics +/// +/// This will panic if the [`Expression`] is [`Expression::Physical`]. +impl From for crate::entities::logical_expression::Model { + fn from(value: Expression) -> Self { + let Expression::Logical(expr) = value else { + panic!("Attempted to convert an in-memory physical expression into a logical database / JSON expression"); + }; + + expr.into() + } +} + +/// Converts the database / JSON representation of a physical expression into an in-memory one. +impl From for Expression { + fn from(value: crate::entities::physical_expression::Model) -> Self { + Self::Physical(value.into()) + } +} + +/// Converts the in-memory representation of a physical expression into the database / JSON version. +/// +/// # Panics +/// +/// This will panic if the [`Expression`] is [`Expression::Physical`]. +impl From for crate::entities::physical_expression::Model { + fn from(value: Expression) -> Self { + let Expression::Physical(expr) = value else { + panic!("Attempted to convert an in-memory logical expression into a physical database / JSON expression"); + }; + + expr.into() + } +} diff --git a/optd-mvp/src/expression/physical_expression.rs b/optd-mvp/src/expression/physical_expression.rs new file mode 100644 index 0000000..7908947 --- /dev/null +++ b/optd-mvp/src/expression/physical_expression.rs @@ -0,0 +1,114 @@ +//! Definition of physical expressions / operators in the Cascades query optimization framework. +//! +//! FIXME: All fields are placeholders, and group IDs are just represented as i32 for now. +//! +//! TODO figure out if each operator should be in a different submodule. + +use crate::entities::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug)] +pub enum PhysicalExpression { + TableScan(TableScan), + Filter(PhysicalFilter), + HashJoin(HashJoin), +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct TableScan { + table_schema: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct PhysicalFilter { + child: i32, + expression: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct HashJoin { + left: i32, + right: i32, + expression: String, +} + +/// TODO Use a macro instead. +impl From for PhysicalExpression { + fn from(value: physical_expression::Model) -> Self { + match value.kind { + 0 => Self::TableScan( + serde_json::from_value(value.data) + .expect("unable to deserialize data into a physical `TableScan`"), + ), + 1 => Self::Filter( + serde_json::from_value(value.data) + .expect("Unable to deserialize data into a physical `Filter`"), + ), + 2 => Self::HashJoin( + serde_json::from_value(value.data) + .expect("Unable to deserialize data into a physical `HashJoin`"), + ), + _ => panic!(), + } + } +} + +/// TODO Use a macro instead. +impl From for physical_expression::Model { + fn from(value: PhysicalExpression) -> physical_expression::Model { + fn create_physical_expression( + kind: i16, + data: serde_json::Value, + ) -> physical_expression::Model { + physical_expression::Model { + id: -1, + group_id: -1, + kind, + data, + } + } + + match value { + PhysicalExpression::TableScan(scan) => create_physical_expression( + 0, + serde_json::to_value(scan).expect("unable to serialize physical `TableScan`"), + ), + PhysicalExpression::Filter(filter) => create_physical_expression( + 1, + serde_json::to_value(filter).expect("unable to serialize physical `Filter`"), + ), + PhysicalExpression::HashJoin(join) => create_physical_expression( + 2, + serde_json::to_value(join).expect("unable to serialize physical `HashJoin`"), + ), + } + } +} + +#[cfg(test)] +pub use build::*; + +#[cfg(test)] +mod build { + use super::*; + use crate::expression::Expression; + + pub fn table_scan(table_schema: String) -> Expression { + Expression::Physical(PhysicalExpression::TableScan(TableScan { table_schema })) + } + + pub fn filter(child_group: i32, expression: String) -> Expression { + Expression::Physical(PhysicalExpression::Filter(PhysicalFilter { + child: child_group, + expression, + })) + } + + pub fn hash_join(left_group: i32, right_group: i32, expression: String) -> Expression { + Expression::Physical(PhysicalExpression::HashJoin(HashJoin { + left: left_group, + right: right_group, + expression, + })) + } +} diff --git a/optd-mvp/src/lib.rs b/optd-mvp/src/lib.rs index c5185cd..98c5f11 100644 --- a/optd-mvp/src/lib.rs +++ b/optd-mvp/src/lib.rs @@ -10,6 +10,8 @@ mod entities; mod memo; use memo::MemoError; +mod expression; + /// The filename of the SQLite database for migration. pub const DATABASE_FILENAME: &str = "sqlite.db"; /// The URL of the SQLite database for migration. @@ -35,3 +37,15 @@ pub type OptimizerResult = Result; pub async fn migrate(db: &DatabaseConnection) -> Result<(), DbErr> { Migrator::refresh(db).await } + +/// Helper function for hashing expression data. +/// +/// TODO remove this. +fn hash_expression(kind: i16, data: &serde_json::Value) -> i64 { + use std::hash::{DefaultHasher, Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + kind.hash(&mut hasher); + data.hash(&mut hasher); + hasher.finish() as i64 +} diff --git a/optd-mvp/src/memo/interface.rs b/optd-mvp/src/memo/interface.rs index a88740e..7c5413b 100644 --- a/optd-mvp/src/memo/interface.rs +++ b/optd-mvp/src/memo/interface.rs @@ -1,3 +1,6 @@ +//! This module defines the [`Memo`] trait, which defines shared behavior of all memo table that can +//! be used for query optimization in the Cascades framework. + use crate::OptimizerResult; use thiserror::Error; @@ -75,6 +78,18 @@ pub trait Memo { group_id: Self::GroupId, ) -> OptimizerResult>; + /// Checks if a given logical expression is a duplicate / already exists in the memo table. + /// + /// In order to prevent a large amount of duplicate work, the memo table must support duplicate + /// expression detection. + /// + /// Returns `Some(expression_id)` if the memo table detects that the expression already exists, + /// and `None` otherwise. + async fn is_duplicate_logical_expression( + &self, + logical_expression: &Self::LogicalExpression, + ) -> OptimizerResult>; + /// Updates / replaces a group's best physical plan (winner). Optionally returns the previous /// winner's physical expression ID. /// @@ -85,41 +100,49 @@ pub trait Memo { physical_expression_id: Self::PhysicalExpressionId, ) -> OptimizerResult>; - /// Adds a logical expression to an existing group via its [`Self::GroupId`]. This function - /// assumes that insertion of this expression would not create any duplicates. + /// Adds a physical expression to an existing group via its [`Self::GroupId`]. /// /// The caller is required to pass in a slice of `GroupId` that represent the child groups of /// the input expression. /// - /// The caller is also required to set the `group_id` field of the input `logical_expression` + /// The caller is also required to set the `group_id` field of the input `physical_expression` /// to be equal to `group_id`, otherwise this function will return a /// [`MemoError::InvalidExpression`] error. /// /// If the group does not exist, returns a [`MemoError::UnknownGroup`] error. - async fn add_logical_expression_to_group( + /// + /// On successful insertion, returns the ID of the physical expression. + async fn add_physical_expression_to_group( &self, group_id: Self::GroupId, - logical_expression: Self::LogicalExpression, + physical_expression: Self::PhysicalExpression, children: &[Self::GroupId], - ) -> OptimizerResult<()>; + ) -> OptimizerResult; - /// Adds a physical expression to an existing group via its [`Self::GroupId`]. This function - /// assumes that insertion of this expression would not create any duplicates. + /// Adds a logical expression to an existing group via its [`Self::GroupId`]. /// /// The caller is required to pass in a slice of `GroupId` that represent the child groups of /// the input expression. /// - /// The caller is also required to set the `group_id` field of the input `physical_expression` + /// The caller is also required to set the `group_id` field of the input `logical_expression` /// to be equal to `group_id`, otherwise this function will return a /// [`MemoError::InvalidExpression`] error. /// /// If the group does not exist, returns a [`MemoError::UnknownGroup`] error. - async fn add_physical_expression_to_group( + /// + /// If the memo table detects that the input logical expression is a duplicate expression, it + /// will **not** insert the expression into the memo table. Instead, it will return an + /// `Ok(Err(expression_id))`, which is a unique identifier of the expression that the input is a + /// duplicate of. The caller can use this ID to retrieve the group the original belongs to. + /// + /// If the memo table detects that the input is unique, it will insert the expression into the + /// input group and return an `Ok(Ok(expression_id))`. + async fn add_logical_expression_to_group( &self, group_id: Self::GroupId, - physical_expression: Self::PhysicalExpression, + logical_expression: Self::LogicalExpression, children: &[Self::GroupId], - ) -> OptimizerResult<()>; + ) -> OptimizerResult>; /// Adds a new logical expression into the memo table, creating a new group if the expression /// does not already exist. @@ -142,5 +165,10 @@ pub trait Memo { &self, expression: Self::LogicalExpression, children: &[Self::LogicalExpressionId], - ) -> OptimizerResult<(Self::GroupId, Self::LogicalExpressionId)>; + ) -> OptimizerResult< + Result< + (Self::GroupId, Self::LogicalExpressionId), + (Self::GroupId, Self::LogicalExpressionId), + >, + >; } diff --git a/optd-mvp/src/memo/persistent.rs b/optd-mvp/src/memo/persistent/implementation.rs similarity index 68% rename from optd-mvp/src/memo/persistent.rs rename to optd-mvp/src/memo/persistent/implementation.rs index 445ee6c..4c06c4e 100644 --- a/optd-mvp/src/memo/persistent.rs +++ b/optd-mvp/src/memo/persistent/implementation.rs @@ -1,28 +1,11 @@ +//! This module contains the implementation of the [`Memo`] trait for [`PersistentMemo`]. + +use super::*; use crate::{ - entities::{prelude::*, *}, + hash_expression, memo::{Memo, MemoError}, - OptimizerResult, DATABASE_URL, + OptimizerResult, }; -use sea_orm::*; - -/// A persistent memo table, backed by a database on disk. -/// -/// TODO more docs. -pub struct PersistentMemo { - /// This `PersistentMemo` is reliant on the SeaORM [`DatabaseConnection`] that stores all of the - /// objects needed for query optimization. - db: DatabaseConnection, -} - -impl PersistentMemo { - /// TODO remove dead code and write docs. - #[allow(dead_code)] - pub async fn new() -> Self { - Self { - db: Database::connect(DATABASE_URL).await.unwrap(), - } - } -} impl Memo for PersistentMemo { type Group = cascades_group::Model; @@ -91,6 +74,41 @@ impl Memo for PersistentMemo { .collect()) } + /// FIXME Check that all of the children are root groups? + async fn is_duplicate_logical_expression( + &self, + logical_expression: &Self::LogicalExpression, + ) -> OptimizerResult> { + // Lookup all expressions that have the same fingerprint and kind. There may be false + // positives, but we will check for those next. + let kind = logical_expression.kind; + let fingerprint = hash_expression(kind, &logical_expression.data); + + let potential_matches = Fingerprint::find() + .filter(fingerprint::Column::Hash.eq(fingerprint)) + .filter(fingerprint::Column::Kind.eq(kind)) + .all(&self.db) + .await?; + + if potential_matches.is_empty() { + return Ok(None); + } + + let mut match_id = None; + for potential_match in potential_matches { + let expr_id = potential_match.logical_expression_id; + let expr = self.get_logical_expression(expr_id).await?; + + if expr.data == logical_expression.data { + // There should be at most one duplicate expression. + match_id = Some(expr_id); + break; + } + } + + Ok(match_id) + } + /// FIXME: In the future, this should first check that we aren't overwriting a winner that was /// updated from another thread. async fn update_group_winner( @@ -110,13 +128,13 @@ impl Memo for PersistentMemo { Ok(old) } - async fn add_logical_expression_to_group( + async fn add_physical_expression_to_group( &self, group_id: Self::GroupId, - logical_expression: Self::LogicalExpression, + physical_expression: Self::PhysicalExpression, children: &[Self::GroupId], - ) -> OptimizerResult<()> { - if logical_expression.group_id != group_id { + ) -> OptimizerResult { + if physical_expression.group_id != group_id { Err(MemoError::InvalidExpression)? } @@ -125,9 +143,9 @@ impl Memo for PersistentMemo { // Insert the child groups of the expression into the junction / children table. if !children.is_empty() { - LogicalChildren::insert_many(children.iter().copied().map(|group_id| { - logical_children::ActiveModel { - logical_expression_id: Set(logical_expression.id), + PhysicalChildren::insert_many(children.iter().copied().map(|group_id| { + physical_children::ActiveModel { + physical_expression_id: Set(physical_expression.id), group_id: Set(group_id), } })) @@ -136,32 +154,41 @@ impl Memo for PersistentMemo { } // Insert the expression. - let _ = logical_expression + let res = physical_expression .into_active_model() .insert(&self.db) .await?; - Ok(()) + Ok(res.id) } - async fn add_physical_expression_to_group( + /// FIXME Check that all of the children are reduced groups? + async fn add_logical_expression_to_group( &self, group_id: Self::GroupId, - physical_expression: Self::PhysicalExpression, + logical_expression: Self::LogicalExpression, children: &[Self::GroupId], - ) -> OptimizerResult<()> { - if physical_expression.group_id != group_id { + ) -> OptimizerResult> { + if logical_expression.group_id != group_id { Err(MemoError::InvalidExpression)? } + // Check if the expression already exists in the memo table. + if let Some(existing_id) = self + .is_duplicate_logical_expression(&logical_expression) + .await? + { + return Ok(Err(existing_id)); + } + // Check if the group actually exists. let _ = self.get_group(group_id).await?; // Insert the child groups of the expression into the junction / children table. if !children.is_empty() { - PhysicalChildren::insert_many(children.iter().copied().map(|group_id| { - physical_children::ActiveModel { - physical_expression_id: Set(physical_expression.id), + LogicalChildren::insert_many(children.iter().copied().map(|group_id| { + logical_children::ActiveModel { + logical_expression_id: Set(logical_expression.id), group_id: Set(group_id), } })) @@ -170,45 +197,32 @@ impl Memo for PersistentMemo { } // Insert the expression. - let _ = physical_expression + let res = logical_expression .into_active_model() .insert(&self.db) .await?; - Ok(()) + Ok(Ok(res.id)) } + /// FIXME Check that all of the children are reduced groups? async fn add_logical_expression( &self, logical_expression: Self::LogicalExpression, children: &[Self::GroupId], - ) -> OptimizerResult<(Self::GroupId, Self::LogicalExpressionId)> { - // Lookup all expressions that have the same fingerprint. There may be false positives, but - // we will check for those later. - let fingerprint = logical_expression.fingerprint; - let potential_matches = LogicalExpression::find() - .filter(logical_expression::Column::Fingerprint.eq(fingerprint)) - .all(&self.db) - .await?; - - // Of the expressions that have the same fingerprint, check if there already exists an - // expression that is exactly identical to the input expression. - let mut matches: Vec<_> = potential_matches - .into_iter() - .filter(|expr| expr == &logical_expression) - .collect(); - assert!( - matches.len() <= 1, - "there cannot be more than 1 exact logical expression match" - ); - - // The expression already exists, so return its data. - if !matches.is_empty() { - let existing_expression = matches - .pop() - .expect("we just checked that an element exists"); - - return Ok((existing_expression.group_id, existing_expression.id)); + ) -> OptimizerResult< + Result< + (Self::GroupId, Self::LogicalExpressionId), + (Self::GroupId, Self::LogicalExpressionId), + >, + > { + // Check if the expression already exists in the memo table. + if let Some(existing_id) = self + .is_duplicate_logical_expression(&logical_expression) + .await? + { + let expr = self.get_logical_expression(existing_id).await?; + return Ok(Err((expr.group_id, expr.id))); } // The expression does not exist yet, so we need to create a new group and new expression. @@ -239,6 +253,18 @@ impl Memo for PersistentMemo { .await?; } - Ok((new_expr.group_id, new_expr.id)) + // Insert the fingerprint of the logical expression. + let hash = hash_expression(new_expr.kind, &new_expr.data); + let fingerprint = fingerprint::ActiveModel { + id: NotSet, + logical_expression_id: Set(new_expr.id), + kind: Set(new_expr.kind), + hash: Set(hash), + }; + let _ = fingerprint::Entity::insert(fingerprint) + .exec(&self.db) + .await?; + + Ok(Ok((new_expr.group_id, new_expr.id))) } } diff --git a/optd-mvp/src/memo/persistent/mod.rs b/optd-mvp/src/memo/persistent/mod.rs new file mode 100644 index 0000000..ae2577a --- /dev/null +++ b/optd-mvp/src/memo/persistent/mod.rs @@ -0,0 +1,66 @@ +//! This module contains the definition and implementation of the [`PersistentMemo`] type, which +//! implements the `Memo` trait and supports memo table operations necessary for query optimization. + +use crate::{ + entities::{prelude::*, *}, + DATABASE_URL, +}; +use sea_orm::*; + +#[cfg(test)] +mod tests; + +/// A persistent memo table, backed by a database on disk. +/// +/// TODO more docs. +pub struct PersistentMemo { + /// This `PersistentMemo` is reliant on the SeaORM [`DatabaseConnection`] that stores all of the + /// objects needed for query optimization. + db: DatabaseConnection, +} + +impl PersistentMemo { + /// Creates a new `PersistentMemo` struct by connecting to a database defined at + /// [`DATABASE_URL`]. + /// + /// TODO remove dead code and write docs. + #[allow(dead_code)] + pub async fn new() -> Self { + Self { + db: Database::connect(DATABASE_URL).await.unwrap(), + } + } + + /// Since there is no asynchronous drop yet in Rust, we must do this manually. + /// + /// TODO remove dead code and write docs. + #[allow(dead_code)] + pub async fn cleanup(&self) { + cascades_group::Entity::delete_many() + .exec(&self.db) + .await + .unwrap(); + fingerprint::Entity::delete_many() + .exec(&self.db) + .await + .unwrap(); + logical_expression::Entity::delete_many() + .exec(&self.db) + .await + .unwrap(); + logical_children::Entity::delete_many() + .exec(&self.db) + .await + .unwrap(); + physical_expression::Entity::delete_many() + .exec(&self.db) + .await + .unwrap(); + physical_children::Entity::delete_many() + .exec(&self.db) + .await + .unwrap(); + } +} + +mod implementation; diff --git a/optd-mvp/src/memo/persistent/tests.rs b/optd-mvp/src/memo/persistent/tests.rs new file mode 100644 index 0000000..7158b30 --- /dev/null +++ b/optd-mvp/src/memo/persistent/tests.rs @@ -0,0 +1,36 @@ +use super::*; +use crate::{expression::*, memo::Memo}; + +/// Tests is exact expression matches are detected and handled by the memo table. +#[ignore] +#[tokio::test] +async fn test_simple_duplicates() { + let memo = PersistentMemo::new().await; + memo.cleanup().await; + + let scan = scan("(a int, b int)".to_string()); + let scan1 = scan.clone(); + let scan2 = scan.clone(); + + let res0 = memo + .add_logical_expression(scan.into(), &[]) + .await + .unwrap() + .ok(); + let res1 = memo + .add_logical_expression(scan1.into(), &[]) + .await + .unwrap() + .err(); + let res2 = memo + .add_logical_expression(scan2.into(), &[]) + .await + .unwrap() + .err(); + + assert_eq!(res0, res1); + assert_eq!(res0, res2); + assert_eq!(res1, res2); + + memo.cleanup().await; +} diff --git a/optd-mvp/src/migrator/memo/m20241127_000001_fingerprint.rs b/optd-mvp/src/migrator/memo/m20241127_000001_fingerprint.rs new file mode 100644 index 0000000..bcba541 --- /dev/null +++ b/optd-mvp/src/migrator/memo/m20241127_000001_fingerprint.rs @@ -0,0 +1,47 @@ +//! TODO write docs. + +use crate::migrator::memo::logical_expression::LogicalExpression; +use sea_orm_migration::{prelude::*, schema::*}; + +#[derive(DeriveIden)] +pub enum Fingerprint { + Table, + Id, + LogicalExpressionId, + Kind, + Hash, +} + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Fingerprint::Table) + .if_not_exists() + .col(pk_auto(Fingerprint::Id)) + .col(unsigned(Fingerprint::LogicalExpressionId)) + .foreign_key( + ForeignKey::create() + .from(Fingerprint::Table, Fingerprint::LogicalExpressionId) + .to(LogicalExpression::Table, LogicalExpression::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) + .col(small_unsigned(Fingerprint::Kind)) + .col(big_unsigned(Fingerprint::Hash)) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Fingerprint::Table).to_owned()) + .await + } +} diff --git a/optd-mvp/src/migrator/memo/m20241127_000001_logical_expression.rs b/optd-mvp/src/migrator/memo/m20241127_000001_logical_expression.rs index 3682032..57356cf 100644 --- a/optd-mvp/src/migrator/memo/m20241127_000001_logical_expression.rs +++ b/optd-mvp/src/migrator/memo/m20241127_000001_logical_expression.rs @@ -19,21 +19,23 @@ //! Each `logical_expression` has a unique primary key ID, but it holds little importance other than //! helping distinguish between two different expressions. //! -//! The more interesting column is the `fingerprint` column, in which we store a hashed fingerprint -//! value that can be used to efficiently check equality between two potentially equivalent logical -//! expressions (hash-consing). See ???FIXME??? for more information on expression fingerprints. -//! //! Finally, since there are many different types of operators, we store a variant tag and a data //! column as JSON to represent the semi-structured data fields of logical operators. //! //! # Entity Relationships //! -//! The only relationship that `logical_expression` has is to [`cascades_group`]. It has **both** a +//! The main relationship that `logical_expression` has is to [`cascades_group`]. It has **both** a //! one-to-many **and** a many-to-many relationship with [`cascades_group`], and you can see more //! details about this in the module-level documentation for [`cascades_group`]. //! +//! The other relationship that `logical_expression` has is to [`fingerprint`]. This table stores +//! 1 or more fingerprints for every (unique) logical expression. The reason we have multiple +//! fingerprints is that an expression can belong to multiple groups during the exploration phase +//! before the merging of groups. +//! //! [`cascades_group`]: super::cascades_group //! [`physical_expression`]: super::physical_expression +//! [`fingerprint`]: super::fingerprint use crate::migrator::memo::cascades_group::CascadesGroup; use sea_orm_migration::{prelude::*, schema::*}; @@ -43,7 +45,6 @@ pub enum LogicalExpression { Table, Id, GroupId, - Fingerprint, Kind, Data, } @@ -68,7 +69,6 @@ impl MigrationTrait for Migration { .on_delete(ForeignKeyAction::Cascade) .on_update(ForeignKeyAction::Cascade), ) - .col(big_unsigned(LogicalExpression::Fingerprint)) .col(small_integer(LogicalExpression::Kind)) .col(json(LogicalExpression::Data)) .to_owned(), diff --git a/optd-mvp/src/migrator/memo/m20241127_000001_physical_expression.rs b/optd-mvp/src/migrator/memo/m20241127_000001_physical_expression.rs index 7653112..1e66195 100644 --- a/optd-mvp/src/migrator/memo/m20241127_000001_physical_expression.rs +++ b/optd-mvp/src/migrator/memo/m20241127_000001_physical_expression.rs @@ -20,9 +20,10 @@ //! Each `physical_expression` has a unique primary key ID, and other tables will store a foreign //! key reference to a specific `physical_expression`s. //! -//! The more interesting column is the `fingerprint` column, in which we store a hashed fingerprint -//! value that can be used to efficiently check equality between two potentially equivalent physical -//! expressions (hash-consing). See ???FIXME??? for more information on expression fingerprints. +//! Note that `physical_expression` does **not** store a fingerprint. Remember that we want to +//! detect duplicates in the logical exploration phase. If there are no duplicate logical +//! expressions in the memo table, then there cannot be any duplicate physical expressions, which +//! are derived from said deduplicated logical expressions. //! //! Finally, since there are many different types of operators, we store a variant tag and a data //! column as JSON to represent the semi-structured data fields of logical operators. @@ -44,7 +45,6 @@ pub enum PhysicalExpression { Table, Id, GroupId, - Fingerprint, Kind, Data, } @@ -69,7 +69,6 @@ impl MigrationTrait for Migration { .on_delete(ForeignKeyAction::Cascade) .on_update(ForeignKeyAction::Cascade), ) - .col(big_unsigned(PhysicalExpression::Fingerprint)) .col(small_integer(PhysicalExpression::Kind)) .col(json(PhysicalExpression::Data)) .to_owned(), diff --git a/optd-mvp/src/migrator/memo/mod.rs b/optd-mvp/src/migrator/memo/mod.rs index 8ed9390..7a60c9b 100644 --- a/optd-mvp/src/migrator/memo/mod.rs +++ b/optd-mvp/src/migrator/memo/mod.rs @@ -2,12 +2,14 @@ //! optimization framework. pub(crate) mod m20241127_000001_cascades_group; +pub(crate) mod m20241127_000001_fingerprint; pub(crate) mod m20241127_000001_logical_children; pub(crate) mod m20241127_000001_logical_expression; pub(crate) mod m20241127_000001_physical_children; pub(crate) mod m20241127_000001_physical_expression; pub(crate) use m20241127_000001_cascades_group as cascades_group; +pub(crate) use m20241127_000001_fingerprint as fingerprint; pub(crate) use m20241127_000001_logical_children as logical_children; pub(crate) use m20241127_000001_logical_expression as logical_expression; pub(crate) use m20241127_000001_physical_children as physical_children; diff --git a/optd-mvp/src/migrator/mod.rs b/optd-mvp/src/migrator/mod.rs index 179c406..0945423 100644 --- a/optd-mvp/src/migrator/mod.rs +++ b/optd-mvp/src/migrator/mod.rs @@ -9,6 +9,7 @@ impl MigratorTrait for Migrator { fn migrations() -> Vec> { vec![ Box::new(memo::cascades_group::Migration), + Box::new(memo::fingerprint::Migration), Box::new(memo::logical_expression::Migration), Box::new(memo::logical_children::Migration), Box::new(memo::physical_expression::Migration),