Skip to content

Commit

Permalink
feat(binder): support create index statements (#865)
Browse files Browse the repository at this point in the history
part of #864

Add `create index` support. No indexes are created for now. It only
modifies the catalog. All storage engines now use a shared submodule
called `index` to manage in-memory indexes.

---------

Signed-off-by: Alex Chi Z <[email protected]>
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh authored Jan 11, 2025
1 parent c41cd65 commit 423d0e9
Show file tree
Hide file tree
Showing 19 changed files with 477 additions and 14 deletions.
100 changes: 100 additions & 0 deletions src/binder/create_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2025 RisingLight Project Authors. Licensed under Apache-2.0.

use std::fmt;
use std::str::FromStr;

use pretty_xmlish::helper::delegate_fmt;
use pretty_xmlish::Pretty;
use serde::{Deserialize, Serialize};

use super::*;
use crate::catalog::{ColumnId, SchemaId, TableId};

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
pub struct CreateIndex {
pub schema_id: SchemaId,
pub index_name: String,
pub table_id: TableId,
pub columns: Vec<ColumnId>,
}

impl fmt::Display for CreateIndex {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let explainer = Pretty::childless_record("CreateIndex", self.pretty_index());
delegate_fmt(&explainer, f, String::with_capacity(1000))
}
}

impl CreateIndex {
pub fn pretty_index<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> {
vec![
("schema_id", Pretty::display(&self.schema_id)),
("name", Pretty::display(&self.index_name)),
("table_id", Pretty::display(&self.table_id)),
(
"columns",
Pretty::Array(self.columns.iter().map(Pretty::display).collect()),
),
]
}
}

impl FromStr for Box<CreateIndex> {
type Err = ();

fn from_str(_s: &str) -> std::result::Result<Self, Self::Err> {
Err(())
}
}

impl Binder {
pub(super) fn bind_create_index(&mut self, stat: crate::parser::CreateIndex) -> Result {
let Some(ref name) = stat.name else {
return Err(
ErrorKind::InvalidIndex("index must have a name".to_string()).with_spanned(&stat),
);
};
let crate::parser::CreateIndex {
table_name,
columns,
..
} = stat;
let index_name = lower_case_name(name);
let (_, index_name) = split_name(&index_name)?;
let table_obj: ObjectName = table_name.clone();
let table_name = lower_case_name(&table_name);
let (schema_name, table_name) = split_name(&table_name)?;
let schema = self
.catalog
.get_schema_by_name(schema_name)
.ok_or_else(|| ErrorKind::InvalidSchema(schema_name.into()).with_spanned(&table_obj))?;
let Some(table) = schema.get_table_by_name(table_name) else {
return Err(ErrorKind::InvalidTable(table_name.into()).with_spanned(&table_obj));
};
// Check if every column exists in the table and get the column ids
let mut column_ids = Vec::new();
for column in &columns {
// Ensure column expr is a column reference
let OrderByExpr { expr, .. } = column;
let Expr::Identifier(column_name) = expr else {
return Err(
ErrorKind::InvalidColumn("column reference expected".to_string())
.with_spanned(column),
);
};
let column_name = column_name.value.to_lowercase();
let column_catalog = table
.get_column_by_name(&column_name)
.ok_or_else(|| ErrorKind::InvalidColumn(column_name).with_spanned(column))?;
column_ids.push(column_catalog.id());
}

let create = self.egraph.add(Node::CreateIndex(Box::new(CreateIndex {
schema_id: schema.id(),
index_name: index_name.into(),
table_id: table.id(),
columns: column_ids,
})));
Ok(create)
}
}
2 changes: 2 additions & 0 deletions src/binder/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub enum ErrorKind {
InvalidSchema(String),
#[error("invalid table {0:?}")]
InvalidTable(String),
#[error("invalid index {0:?}")]
InvalidIndex(String),
#[error("invalid column {0:?}")]
InvalidColumn(String),
#[error("table {0:?} already exists")]
Expand Down
3 changes: 3 additions & 0 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::types::DataValue;

pub mod copy;
mod create_function;
mod create_index;
mod create_table;
mod create_view;
mod delete;
Expand All @@ -28,6 +29,7 @@ mod select;
mod table;

pub use self::create_function::CreateFunction;
pub use self::create_index::CreateIndex;
pub use self::create_table::CreateTable;
pub use self::error::BindError;
use self::error::ErrorKind;
Expand Down Expand Up @@ -226,6 +228,7 @@ impl Binder {

fn bind_stmt(&mut self, stmt: Statement) -> Result {
match stmt {
Statement::CreateIndex(create_index) => self.bind_create_index(create_index),
Statement::CreateTable(create_table) => self.bind_create_table(create_table),
Statement::CreateView {
name,
Expand Down
38 changes: 38 additions & 0 deletions src/catalog/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2025 RisingLight Project Authors. Licensed under Apache-2.0.

use super::*;

/// The catalog of an index.
pub struct IndexCatalog {
id: IndexId,
name: String,
table_id: TableId,
column_idxs: Vec<ColumnId>,
}

impl IndexCatalog {
pub fn new(id: IndexId, name: String, table_id: TableId, column_idxs: Vec<ColumnId>) -> Self {
Self {
id,
name,
table_id,
column_idxs,
}
}

pub fn table_id(&self) -> TableId {
self.table_id
}

pub fn column_idxs(&self) -> &[ColumnId] {
&self.column_idxs
}

pub fn id(&self) -> IndexId {
self.id
}

pub fn name(&self) -> &str {
&self.name
}
}
3 changes: 3 additions & 0 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};

pub use self::column::*;
pub use self::index::*;
pub use self::root::*;
pub use self::schema::*;
pub use self::table::*;
use crate::types::*;

mod column;
pub mod function;
mod index;
mod root;
mod schema;
mod table;

pub type SchemaId = u32;
pub type TableId = u32;
pub type IndexId = u32;
pub type ColumnId = u32;

pub type RootCatalogRef = Arc<RootCatalog>;
Expand Down
37 changes: 37 additions & 0 deletions src/catalog/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,34 @@ impl RootCatalog {
schema.add_view(name, columns, query)
}

pub fn add_index(
&self,
schema_id: SchemaId,
index_name: String,
table_id: TableId,
column_idxs: &[ColumnId],
) -> Result<IndexId, CatalogError> {
let mut inner = self.inner.lock().unwrap();
let schema = inner.schemas.get_mut(&schema_id).unwrap();
schema.add_index(index_name, table_id, column_idxs.to_vec())
}

pub fn get_index_on_table(&self, schema_id: SchemaId, table_id: TableId) -> Vec<IndexId> {
let mut inner = self.inner.lock().unwrap();
let schema = inner.schemas.get_mut(&schema_id).unwrap();
schema.get_indexes_on_table(table_id)
}

pub fn get_index_by_id(
&self,
schema_id: SchemaId,
index_id: IndexId,
) -> Option<Arc<IndexCatalog>> {
let mut inner = self.inner.lock().unwrap();
let schema = inner.schemas.get_mut(&schema_id).unwrap();
schema.get_index_by_id(index_id)
}

pub fn drop_table(&self, table_ref_id: TableRefId) {
let mut inner = self.inner.lock().unwrap();
let schema = inner.schemas.get_mut(&table_ref_id.schema_id).unwrap();
Expand Down Expand Up @@ -208,6 +236,15 @@ const CREATE_SYSTEM_TABLE_SQL: &str = "
table_id int not null,
table_name string not null
);
create table pg_indexes (
schema_id int not null,
schema_name string not null,
table_id int not null,
table_name string not null,
index_id int not null,
index_name string not null,
on_columns string not null
);
create table pg_attribute (
schema_name string not null,
table_name string not null,
Expand Down
49 changes: 43 additions & 6 deletions src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ pub struct SchemaCatalog {
name: String,
table_idxs: HashMap<String, TableId>,
tables: HashMap<TableId, Arc<TableCatalog>>,
next_table_id: TableId,
indexes_idxs: HashMap<String, IndexId>,
indexes: HashMap<IndexId, Arc<IndexCatalog>>,
next_id: u32,
/// Currently indexed by function name
functions: HashMap<String, Arc<FunctionCatalog>>,
}
Expand All @@ -26,7 +28,9 @@ impl SchemaCatalog {
name,
table_idxs: HashMap::new(),
tables: HashMap::new(),
next_table_id: 0,
indexes_idxs: HashMap::new(),
indexes: HashMap::new(),
next_id: 0,
functions: HashMap::new(),
}
}
Expand All @@ -40,8 +44,8 @@ impl SchemaCatalog {
if self.table_idxs.contains_key(&name) {
return Err(CatalogError::Duplicated("table", name));
}
let table_id = self.next_table_id;
self.next_table_id += 1;
let table_id = self.next_id;
self.next_id += 1;
let table_catalog = Arc::new(TableCatalog::new(
table_id,
name.clone(),
Expand All @@ -53,6 +57,23 @@ impl SchemaCatalog {
Ok(table_id)
}

pub(super) fn add_index(
&mut self,
name: String,
table_id: TableId,
columns: Vec<ColumnId>,
) -> Result<IndexId, CatalogError> {
if self.indexes_idxs.contains_key(&name) {
return Err(CatalogError::Duplicated("index", name));
}
let index_id = self.next_id;
self.next_id += 1;
let index_catalog = Arc::new(IndexCatalog::new(index_id, name.clone(), table_id, columns));
self.indexes_idxs.insert(name, index_id);
self.indexes.insert(index_id, index_catalog);
Ok(index_id)
}

pub(super) fn add_view(
&mut self,
name: String,
Expand All @@ -62,8 +83,8 @@ impl SchemaCatalog {
if self.table_idxs.contains_key(&name) {
return Err(CatalogError::Duplicated("view", name));
}
let table_id = self.next_table_id;
self.next_table_id += 1;
let table_id = self.next_id;
self.next_id += 1;
let table_catalog = Arc::new(TableCatalog::new_view(
table_id,
name.clone(),
Expand All @@ -84,6 +105,10 @@ impl SchemaCatalog {
self.tables.clone()
}

pub fn all_indexes(&self) -> HashMap<IndexId, Arc<IndexCatalog>> {
self.indexes.clone()
}

pub fn get_table_id_by_name(&self, name: &str) -> Option<TableId> {
self.table_idxs.get(name).cloned()
}
Expand All @@ -92,6 +117,18 @@ impl SchemaCatalog {
self.tables.get(&table_id).cloned()
}

pub fn get_indexes_on_table(&self, table_id: TableId) -> Vec<IndexId> {
self.indexes
.iter()
.filter(|(_, index)| index.table_id() == table_id)
.map(|(id, _)| *id)
.collect()
}

pub fn get_index_by_id(&self, index_id: IndexId) -> Option<Arc<IndexCatalog>> {
self.indexes.get(&index_id).cloned()
}

pub fn get_table_by_name(&self, name: &str) -> Option<Arc<TableCatalog>> {
self.table_idxs
.get(name)
Expand Down
1 change: 1 addition & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl Database {
let tokens = cmd.split_whitespace().collect::<Vec<_>>();
Ok(match tokens.as_slice() {
["dt"] => "SELECT * FROM pg_catalog.pg_tables".to_string(),
["di"] => "SELECT * FROM pg_catalog.pg_indexes".to_string(),
["d", table] => format!(
"SELECT * FROM pg_catalog.pg_attribute WHERE table_name = '{table}'",
),
Expand Down
29 changes: 29 additions & 0 deletions src/executor/create_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2025 RisingLight Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use super::*;
use crate::binder::CreateIndex;
use crate::storage::Storage;

/// The executor of `create index` statement.
pub struct CreateIndexExecutor<S: Storage> {
pub index: Box<CreateIndex>,
pub storage: Arc<S>,
}

impl<S: Storage> CreateIndexExecutor<S> {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self) {
self.storage
.create_index(
self.index.schema_id,
&self.index.index_name,
self.index.table_id,
&self.index.columns,
)
.await?;

yield DataChunk::single(1);
}
}
Loading

0 comments on commit 423d0e9

Please sign in to comment.