Skip to content

Commit

Permalink
refactor: seperate schema trait from record (#241)
Browse files Browse the repository at this point in the history
* refactor: add Schema trait

* chore: fix Schema

* chore: fix wasm & python ci

* Fix refactoring bug

* cache

* refactor: remove R type arg on `DbOption`

* refactor: move dyn schema to runtime mod

* refactor: rename Schema to DbStorage

* fmt

---------

Co-authored-by: Gwo Tzu-Hsing <[email protected]>
  • Loading branch information
KKould and ethe authored Dec 17, 2024
1 parent 1a48277 commit f60ba3f
Show file tree
Hide file tree
Showing 46 changed files with 1,333 additions and 1,093 deletions.
4 changes: 2 additions & 2 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl BenchDatabase for TonboS3BenchDataBase {
.disable_wal();

TonboS3BenchDataBase::new(
tonbo::DB::new(option, TokioExecutor::current())
tonbo::DB::new(option, TokioExecutor::current(), &CustomerSchema)
.await
.unwrap(),
)
Expand Down Expand Up @@ -324,7 +324,7 @@ impl BenchDatabase for TonboBenchDataBase {
DbOption::from(fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap())
.disable_wal();

let db = tonbo::DB::new(option, TokioExecutor::current())
let db = tonbo::DB::new(option, TokioExecutor::current(), &CustomerSchema)
.await
.unwrap();
TonboBenchDataBase::new(db)
Expand Down
10 changes: 5 additions & 5 deletions bindings/python/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use pyo3::{pyclass, pymethods};
use tonbo::record::{ColumnDesc, Datatype};
use tonbo::record::{Datatype, Value, ValueDesc};

use crate::datatype::DataType;

Expand Down Expand Up @@ -58,15 +58,15 @@ impl Display for Column {
}
}

impl From<Column> for ColumnDesc {
impl From<Column> for ValueDesc {
fn from(col: Column) -> Self {
let datatype = Datatype::from(col.datatype);
ColumnDesc::new(col.name, datatype, col.nullable)
ValueDesc::new(col.name, datatype, col.nullable)
}
}
impl From<Column> for tonbo::record::Column {
impl From<Column> for Value {
fn from(col: Column) -> Self {
let datatype = Datatype::from(col.datatype);
tonbo::record::Column::new(datatype, col.name, col.value, col.nullable)
Value::new(datatype, col.name, col.value, col.nullable)
}
}
21 changes: 6 additions & 15 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use pyo3::{
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
use tonbo::{
executor::tokio::TokioExecutor,
record::{ColumnDesc, DynRecord},
record::{DynRecord, DynSchema, Value, ValueDesc},
DB,
};

Expand Down Expand Up @@ -40,7 +40,6 @@ impl TonboDB {
let mut desc = vec![];
let mut cols = vec![];
let mut primary_key_index = None;
let mut primary_key_name = None;

for i in 0..values.len()? {
let value = values.get_item(i)?;
Expand All @@ -51,23 +50,15 @@ impl TonboDB {
panic!("Multiple primary keys is not allowed!")
}
primary_key_index = Some(desc.len());
primary_key_name = Some(col.name.clone());
}
cols.push(col.clone());
desc.push(ColumnDesc::from(col));
desc.push(ValueDesc::from(col));
}
}
let option = option.into_option(primary_key_index.unwrap(), primary_key_name.unwrap());
let schema = DynSchema::new(desc, primary_key_index.unwrap());
let option = option.into_option(&schema);
let db = get_runtime()
.block_on(async {
DB::with_schema(
option,
TokioExecutor::current(),
desc,
primary_key_index.unwrap(),
)
.await
})
.block_on(async { DB::new(option, TokioExecutor::current(), schema).await })
.unwrap();
Ok(Self {
db: Arc::new(db),
Expand All @@ -87,7 +78,7 @@ impl TonboDB {
for i in 0..values.len()? {
let value = values.get_item(i)?;
if let Ok(bound_col) = value.downcast::<Column>() {
let col = tonbo::record::Column::from(bound_col.extract::<Column>()?);
let col = Value::from(bound_col.extract::<Column>()?);
cols.push(col);
}
}
Expand Down
27 changes: 11 additions & 16 deletions bindings/python/src/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use fusio::path::Path;
use pyo3::{pyclass, pymethods, PyResult};
use tonbo::record::DynRecord;
use tonbo::record::Schema;

use crate::{ExceedsMaxLevelError, FsOptions};

Expand Down Expand Up @@ -72,21 +72,16 @@ impl DbOption {
}

impl DbOption {
pub(crate) fn into_option(
self,
primary_key_index: usize,
primary_key_name: String,
) -> tonbo::DbOption<DynRecord> {
let mut opt =
tonbo::DbOption::with_path(Path::from(self.path), primary_key_name, primary_key_index)
.clean_channel_buffer(self.clean_channel_buffer)
.immutable_chunk_num(self.immutable_chunk_num)
.level_sst_magnification(self.level_sst_magnification)
.major_default_oldest_table_num(self.major_default_oldest_table_num)
.major_threshold_with_sst_size(self.major_threshold_with_sst_size)
.max_sst_file_size(self.max_sst_file_size)
.version_log_snapshot_threshold(self.version_log_snapshot_threshold)
.base_fs(fusio_dispatch::FsOptions::from(self.base_fs));
pub(crate) fn into_option<S: Schema>(self, schema: &S) -> tonbo::DbOption {
let mut opt = tonbo::DbOption::new(Path::from(self.path), schema)
.clean_channel_buffer(self.clean_channel_buffer)
.immutable_chunk_num(self.immutable_chunk_num)
.level_sst_magnification(self.level_sst_magnification)
.major_default_oldest_table_num(self.major_default_oldest_table_num)
.major_threshold_with_sst_size(self.major_threshold_with_sst_size)
.max_sst_file_size(self.max_sst_file_size)
.version_log_snapshot_threshold(self.version_log_snapshot_threshold)
.base_fs(fusio_dispatch::FsOptions::from(self.base_fs));
for (level, path) in self.level_paths.into_iter().enumerate() {
if let Some((path, fs_options)) = path {
opt = opt
Expand Down
3 changes: 2 additions & 1 deletion bindings/python/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::ops;

use pyo3::{pyclass, FromPyObject, Py, PyAny, Python};
use tonbo::record::Value;

use crate::{utils::to_col, Column};

Expand All @@ -12,7 +13,7 @@ pub enum Bound {
}

impl Bound {
pub(crate) fn to_bound(&self, py: Python, col: &Column) -> ops::Bound<tonbo::record::Column> {
pub(crate) fn to_bound(&self, py: Python, col: &Column) -> ops::Bound<Value> {
match self {
Bound::Included { key } => ops::Bound::Included(to_col(py, col, key.clone_ref(py))),
Bound::Excluded { key } => ops::Bound::Excluded(to_col(py, col, key.clone_ref(py))),
Expand Down
8 changes: 4 additions & 4 deletions bindings/python/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ use pyo3::{
types::{PyAnyMethods, PyMapping, PyMappingMethods},
Py, PyAny, PyResult, Python,
};
use tonbo::record::DynRecord;
use tonbo::record::{DynRecord, Value};

use crate::Column;

#[derive(Clone)]
struct Record {
columns: Vec<tonbo::record::Column>,
columns: Vec<Value>,
primary_key_index: usize,
}

impl Record {
fn new(columns: Vec<tonbo::record::Column>, primary_key_index: usize) -> Self {
fn new(columns: Vec<Value>, primary_key_index: usize) -> Self {
Self {
columns,
primary_key_index,
Expand Down Expand Up @@ -58,7 +58,7 @@ impl RecordBatch {
if col.primary_key {
primary_key_index = col_idx;
}
let col = tonbo::record::Column::from(col);
let col = Value::from(col);
cols.push(col);
col_idx += 1;
}
Expand Down
21 changes: 11 additions & 10 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use pyo3::{
Bound, IntoPy, Py, PyAny, PyResult, Python,
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};
use tonbo::{
record::{DynRecord, Value},
transaction, Projection,
};

use crate::{
column::Column,
Expand Down Expand Up @@ -123,7 +126,7 @@ impl Transaction {
let tuple = x.downcast::<PyTuple>()?;
let col = tuple.get_item(1)?;
if let Ok(bound_col) = col.downcast::<Column>() {
let col = tonbo::record::Column::from(bound_col.extract::<Column>()?);
let col = Value::from(bound_col.extract::<Column>()?);
cols.push(col);
}
}
Expand Down Expand Up @@ -181,16 +184,14 @@ impl Transaction {
future_into_py(py, async move {
let mut scan = txn.scan((
unsafe {
transmute::<
std::ops::Bound<&tonbo::record::Column>,
std::ops::Bound<&'static tonbo::record::Column>,
>(lower.as_ref())
transmute::<std::ops::Bound<&Value>, std::ops::Bound<&'static Value>>(
lower.as_ref(),
)
},
unsafe {
transmute::<
std::ops::Bound<&tonbo::record::Column>,
std::ops::Bound<&'static tonbo::record::Column>,
>(high.as_ref())
transmute::<std::ops::Bound<&Value>, std::ops::Bound<&'static Value>>(
high.as_ref(),
)
},
));

Expand Down
17 changes: 5 additions & 12 deletions bindings/python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@ use pyo3::{
types::{PyBytes, PyDict, PyDictMethods},
Bound, Py, PyAny, Python,
};
use tonbo::record::Datatype;
use tonbo::record::{Datatype, Value};

use crate::{column::Column, datatype::DataType, range};

pub(crate) fn to_dict(
py: Python,
primary_key_index: usize,
record: Vec<tonbo::record::Column>,
) -> Bound<PyDict> {
pub(crate) fn to_dict(py: Python, primary_key_index: usize, record: Vec<Value>) -> Bound<PyDict> {
let dict = PyDict::new_bound(py);
for (idx, col) in record.iter().enumerate() {
match &col.datatype {
Expand Down Expand Up @@ -185,8 +181,8 @@ pub(crate) fn to_key(
}
}

pub(crate) fn to_col(py: Python, col: &Column, key: Py<PyAny>) -> tonbo::record::Column {
tonbo::record::Column::new(
pub(crate) fn to_col(py: Python, col: &Column, key: Py<PyAny>) -> Value {
Value::new(
Datatype::from(&col.datatype),
col.name.to_owned(),
to_key(py, &col.datatype, key),
Expand All @@ -199,10 +195,7 @@ pub(crate) fn to_bound(
col: &Column,
lower: Option<Py<range::Bound>>,
high: Option<Py<range::Bound>>,
) -> (
std::ops::Bound<tonbo::record::Column>,
std::ops::Bound<tonbo::record::Column>,
) {
) -> (std::ops::Bound<Value>, std::ops::Bound<Value>) {
let lower = match lower {
Some(bound) => bound.get().to_bound(py, col),
None => std::ops::Bound::Unbounded,
Expand Down
25 changes: 18 additions & 7 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use futures_core::Stream;
use futures_util::StreamExt;
use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB,
executor::tokio::TokioExecutor,
inmem::immutable::ArrowArrays,
record::{Record, Schema},
DbOption, DB,
};
use tonbo_macros::Record;

Expand All @@ -49,7 +52,10 @@ struct MusicExec {
db: Arc<DB<Music, TokioExecutor>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
range: (
Bound<<MusicSchema as Schema>::Key>,
Bound<<MusicSchema as Schema>::Key>,
),
}

struct MusicStream {
Expand All @@ -63,7 +69,7 @@ impl TableProvider for MusicProvider {
}

fn schema(&self) -> SchemaRef {
Music::arrow_schema().clone()
MusicSchema {}.arrow_schema().clone()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -96,7 +102,7 @@ impl TableProvider for MusicProvider {

impl MusicExec {
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
let schema = Music::arrow_schema();
let schema = MusicSchema {}.arrow_schema();
let schema = if let Some(projection) = &projection {
Arc::new(schema.project(projection).unwrap())
} else {
Expand Down Expand Up @@ -127,7 +133,7 @@ impl Stream for MusicStream {

impl RecordBatchStream for MusicStream {
fn schema(&self) -> SchemaRef {
Music::arrow_schema().clone()
MusicSchema {}.arrow_schema().clone()
}
}

Expand Down Expand Up @@ -215,9 +221,14 @@ async fn main() -> Result<()> {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/music").await;

let options = DbOption::from(Path::from_filesystem_path("./db_path/music").unwrap());
let options = DbOption::new(
Path::from_filesystem_path("./db_path/music").unwrap(),
&MusicSchema,
);

let db = DB::new(options, TokioExecutor::current()).await.unwrap();
let db = DB::new(options, TokioExecutor::current(), MusicSchema)
.await
.unwrap();
for (id, name, like) in [
(0, "welcome".to_string(), 0),
(1, "tonbo".to_string(), 999),
Expand Down
9 changes: 7 additions & 2 deletions examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ async fn main() {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/users").await;

let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap());
let options = DbOption::new(
Path::from_filesystem_path("./db_path/users").unwrap(),
&UserSchema,
);
// pluggable async runtime and I/O
let db = DB::new(options, TokioExecutor::current()).await.unwrap();
let db = DB::new(options, TokioExecutor::current(), UserSchema)
.await
.unwrap();

// insert with owned value
db.insert(User {
Expand Down
Loading

0 comments on commit f60ba3f

Please sign in to comment.